Feed file input to an ES query

Dear all,

I'm trying to achieve the following:

  1. Read a file: One value per line
  2. Transform this value into a wildcard value, e.g. "*value*"
  3. Feed the value into an ES query
  4. Return the results of the query

These are my settings so far:

logstash.conf

input {
	file {
		path => "d:/test.csv"
		start_position => "beginning"
		sincedb_path => "NULL"
		mode => "read"
	}
}

filter {
	mutate {
		gsub => ["message","(.*)", "*\1"]
	}
}

filter {
	elasticsearch {
		query_template => "c:/logstash/config/searchSpares.json"
		index => "pmaparts"
		fields => {
			"ApprovalBasis" => "[ApprovalBasis]"
			"ReplacedPartNumber" => "[ReplacedPartNumber]"
  }
}

output { 
	stdout {}
}

Query Template:

{
	"query": {
		"bool": {
			"must": [
				{
					"wildcard": {
						"ReplacedPartNumber": "message"
					}
				}
			],
			"must_not": [],
			"should": []
		}
	},
	"from": 0,
	"sort": [],
	"aggs": {}
}
This is the error I'm getting
C:\logstash\bin>logstash -f c:\logstash\config\readSpares.conf --config.test_and_exit  --config.debug --log.level=debug
Sending Logstash logs to C:/logstash/logs which is now configured via log4j2.properties
[2020-03-08T13:23:59,815][DEBUG][logstash.modules.scaffold] Found module {:module_name=>"fb_apache", :directory=>"C:/logstash/modules/fb_apache/configuration"}
[2020-03-08T13:23:59,945][DEBUG][logstash.plugins.registry] Adding plugin to the registry {:name=>"fb_apache", :type=>:modules, :class=>#<LogStash::Modules::Scaffold:0x3dcf0 @directory="C:/logstash/modules/fb_apache/configuration", @module_name="fb_apache", @kibana_version_parts=["6", "0", "0"]>}
[2020-03-08T13:23:59,952][DEBUG][logstash.modules.scaffold] Found module {:module_name=>"netflow", :directory=>"C:/logstash/modules/netflow/configuration"}
[2020-03-08T13:23:59,961][DEBUG][logstash.plugins.registry] Adding plugin to the registry {:name=>"netflow", :type=>:modules, :class=>#<LogStash::Modules::Scaffold:0x131d9f3 @directory="C:/logstash/modules/netflow/configuration", @module_name="netflow", @kibana_version_parts=["6", "0", "0"]>}
[2020-03-08T13:24:00,074][DEBUG][logstash.runner          ] -------- Logstash Settings (* means modified) ---------
[2020-03-08T13:24:00,079][DEBUG][logstash.runner          ] node.name: "DESKTOP-H3VM3G3"
[2020-03-08T13:24:00,090][DEBUG][logstash.runner          ] *path.config: "c:\\logstash\\config\\readSpares.conf"
[2020-03-08T13:24:00,092][DEBUG][logstash.runner          ] path.data: "C:/logstash/data"
[2020-03-08T13:24:00,095][DEBUG][logstash.runner          ] modules.cli: []
[2020-03-08T13:24:00,097][DEBUG][logstash.runner          ] modules: []
[2020-03-08T13:24:00,110][DEBUG][logstash.runner          ] modules_list: []
[2020-03-08T13:24:00,112][DEBUG][logstash.runner          ] modules_variable_list: []
[2020-03-08T13:24:00,116][DEBUG][logstash.runner          ] modules_setup: false
[2020-03-08T13:24:00,118][DEBUG][logstash.runner          ] *config.test_and_exit: true (default: false)
[2020-03-08T13:24:00,131][DEBUG][logstash.runner          ] config.reload.automatic: false
[2020-03-08T13:24:00,133][DEBUG][logstash.runner          ] config.reload.interval: 3000000000
[2020-03-08T13:24:00,135][DEBUG][logstash.runner          ] config.support_escapes: false
[2020-03-08T13:24:00,137][DEBUG][logstash.runner          ] config.field_reference.parser: "STRICT"
[2020-03-08T13:24:00,139][DEBUG][logstash.runner          ] metric.collect: true
[2020-03-08T13:24:00,153][DEBUG][logstash.runner          ] pipeline.id: "main"
[2020-03-08T13:24:00,155][DEBUG][logstash.runner          ] pipeline.system: false
[2020-03-08T13:24:00,157][DEBUG][logstash.runner          ] pipeline.workers: 6
[2020-03-08T13:24:00,159][DEBUG][logstash.runner          ] pipeline.batch.size: 125
[2020-03-08T13:24:00,173][DEBUG][logstash.runner          ] pipeline.batch.delay: 50
[2020-03-08T13:24:00,175][DEBUG][logstash.runner          ] pipeline.unsafe_shutdown: false
[2020-03-08T13:24:00,176][DEBUG][logstash.runner          ] pipeline.java_execution: true
[2020-03-08T13:24:00,178][DEBUG][logstash.runner          ] pipeline.reloadable: true
[2020-03-08T13:24:00,180][DEBUG][logstash.runner          ] pipeline.plugin_classloaders: false
[2020-03-08T13:24:00,195][DEBUG][logstash.runner          ] pipeline.separate_logs: false
[2020-03-08T13:24:00,197][DEBUG][logstash.runner          ] path.plugins: []
[2020-03-08T13:24:00,199][DEBUG][logstash.runner          ] *config.debug: true (default: false)
[2020-03-08T13:24:00,200][DEBUG][logstash.runner          ] *log.level: "debug" (default: "info")
[2020-03-08T13:24:00,203][DEBUG][logstash.runner          ] version: false
[2020-03-08T13:24:00,217][DEBUG][logstash.runner          ] help: false
[2020-03-08T13:24:00,219][DEBUG][logstash.runner          ] log.format: "plain"
[2020-03-08T13:24:00,221][DEBUG][logstash.runner          ] http.host: "127.0.0.1"
[2020-03-08T13:24:00,222][DEBUG][logstash.runner          ] http.port: 9600..9700
[2020-03-08T13:24:00,225][DEBUG][logstash.runner          ] http.environment: "production"
[2020-03-08T13:24:00,241][DEBUG][logstash.runner          ] queue.type: "memory"
[2020-03-08T13:24:00,243][DEBUG][logstash.runner          ] queue.drain: false
[2020-03-08T13:24:00,245][DEBUG][logstash.runner          ] queue.page_capacity: 67108864
[2020-03-08T13:24:00,247][DEBUG][logstash.runner          ] queue.max_bytes: 1073741824
[2020-03-08T13:24:00,261][DEBUG][logstash.runner          ] queue.max_events: 0
[2020-03-08T13:24:00,263][DEBUG][logstash.runner          ] queue.checkpoint.acks: 1024
[2020-03-08T13:24:00,264][DEBUG][logstash.runner          ] queue.checkpoint.writes: 1024
[2020-03-08T13:24:00,266][DEBUG][logstash.runner          ] queue.checkpoint.interval: 1000
[2020-03-08T13:24:00,267][DEBUG][logstash.runner          ] queue.checkpoint.retry: false
[2020-03-08T13:24:00,269][DEBUG][logstash.runner          ] dead_letter_queue.enable: false
[2020-03-08T13:24:00,284][DEBUG][logstash.runner          ] dead_letter_queue.max_bytes: 1073741824
[2020-03-08T13:24:00,285][DEBUG][logstash.runner          ] slowlog.threshold.warn: -1
[2020-03-08T13:24:00,287][DEBUG][logstash.runner          ] slowlog.threshold.info: -1
[2020-03-08T13:24:00,289][DEBUG][logstash.runner          ] slowlog.threshold.debug: -1
[2020-03-08T13:24:00,290][DEBUG][logstash.runner          ] slowlog.threshold.trace: -1
[2020-03-08T13:24:00,292][DEBUG][logstash.runner          ] keystore.classname: "org.logstash.secret.store.backend.JavaKeyStore"
[2020-03-08T13:24:00,306][DEBUG][logstash.runner          ] keystore.file: "C:/logstash/config/logstash.keystore"
[2020-03-08T13:24:00,308][DEBUG][logstash.runner          ] path.queue: "C:/logstash/data/queue"
[2020-03-08T13:24:00,309][DEBUG][logstash.runner          ] path.dead_letter_queue: "C:/logstash/data/dead_letter_queue"
[2020-03-08T13:24:00,310][DEBUG][logstash.runner          ] path.settings: "C:/logstash/config"
[2020-03-08T13:24:00,312][DEBUG][logstash.runner          ] path.logs: "C:/logstash/logs"
[2020-03-08T13:24:00,313][DEBUG][logstash.runner          ] xpack.management.enabled: false
[2020-03-08T13:24:00,329][DEBUG][logstash.runner          ] xpack.management.logstash.poll_interval: 5000000000
[2020-03-08T13:24:00,331][DEBUG][logstash.runner          ] xpack.management.pipeline.id: ["main"]
[2020-03-08T13:24:00,332][DEBUG][logstash.runner          ] xpack.management.elasticsearch.username: "logstash_system"
[2020-03-08T13:24:00,334][DEBUG][logstash.runner          ] xpack.management.elasticsearch.hosts: ["https://localhost:9200"]
[2020-03-08T13:24:00,335][DEBUG][logstash.runner          ] xpack.management.elasticsearch.ssl.verification_mode: "certificate"
[2020-03-08T13:24:00,351][DEBUG][logstash.runner          ] xpack.management.elasticsearch.sniffing: false
[2020-03-08T13:24:00,354][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.hosts: ["http://localhost:9200"]
[2020-03-08T13:24:00,356][DEBUG][logstash.runner          ] xpack.monitoring.collection.interval: 10000000000
[2020-03-08T13:24:00,357][DEBUG][logstash.runner          ] xpack.monitoring.collection.timeout_interval: 600000000000
[2020-03-08T13:24:00,359][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.username: "logstash_system"
[2020-03-08T13:24:00,373][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.ssl.verification_mode: "certificate"
[2020-03-08T13:24:00,374][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.sniffing: false
[2020-03-08T13:24:00,375][DEBUG][logstash.runner          ] xpack.monitoring.collection.pipeline.details.enabled: true
[2020-03-08T13:24:00,377][DEBUG][logstash.runner          ] xpack.monitoring.collection.config.enabled: true
[2020-03-08T13:24:00,378][DEBUG][logstash.runner          ] node.uuid: ""
[2020-03-08T13:24:00,379][DEBUG][logstash.runner          ] --------------- Logstash Settings -------------------
[2020-03-08T13:24:00,431][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-03-08T13:24:00,481][DEBUG][logstash.config.source.local.configpathloader] Skipping the following files while reading config since they don't match the specified glob pattern {:files=>[...]}
[2020-03-08T13:24:00,487][DEBUG][logstash.config.source.local.configpathloader] Reading config file {:config_file=>"c:/logstash/config/readSpares.conf"}
[2020-03-08T13:24:00,524][DEBUG][logstash.config.pipelineconfig] -------- Logstash Config ---------
[2020-03-08T13:24:00,528][DEBUG][logstash.config.pipelineconfig] Config from source {:source=>LogStash::Config::Source::Local, :pipeline_id=>:main}
[2020-03-08T13:24:00,540][DEBUG][logstash.config.pipelineconfig] Config string {:protocol=>"file", :id=>"c:/logstash/config/readSpares.conf"}
[2020-03-08T13:24:00,542][DEBUG][logstash.config.pipelineconfig]

input {
        file {
                path => "d:/test.csv"
                start_position => "beginning"
                sincedb_path => "NULL"
                mode => "read"
        }
}

filter {
        mutate {
                gsub => ["message","(.*)", "*\1"]
        }
}

filter {
        elasticsearch {
                query_template => "c:/logstash/config/searchSpares.json"
                index => "pmaparts"
                fields => {
                        "ApprovalBasis" => "[ApprovalBasis]"
                        "ReplacedPartNumber" => "[ReplacedPartNumber]"
  }
}

output {
        stdout {}
}
[2020-03-08T13:24:00,544][DEBUG][logstash.config.pipelineconfig] Merged config
[2020-03-08T13:24:00,549][DEBUG][logstash.config.pipelineconfig]

input {
        file {
                path => "d:/test.csv"
                start_position => "beginning"
                sincedb_path => "NULL"
                mode => "read"
        }
}

filter {
        mutate {
                gsub => ["message","(.*)", "*\1"]
        }
}

filter {
        elasticsearch {
                query_template => "c:/logstash/config/searchSpares.json"
                index => "pmaparts"
                fields => {
                        "ApprovalBasis" => "[ApprovalBasis]"
                        "ReplacedPartNumber" => "[ReplacedPartNumber]"
  }
}

output {
        stdout {}
}
[2020-03-08T13:24:01,131][FATAL][logstash.runner          ] The given configuration is invalid. Reason: Expected one of [ \t\r\n], "#", "=>" at line 27, column 9 (byte 418) after filter {
        elasticsearch {
                query_template => "c:/logstash/config/searchSpares.json"
                index => "pmaparts"
                fields => {
                        "ApprovalBasis" => "[ApprovalBasis]"
                        "ReplacedPartNumber" => "[ReplacedPartNumber]"
  }
}

output {
        stdout
[2020-03-08T13:24:01,138][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

My questions:

  1. Is this, generally speaking, a valid approach?
  2. Where is my error?
  3. How do I correctly pass the variable part from the file as argument to the ES query?

Many thanks for any help in advance.

Ok, after fixing some stupid errors in the config, logstash starts up fine but does not seem to execute the query template.

input {
	file {
		path => "d:/test.csv"
		start_position => "beginning"
		sincedb_path => "NULL"
		mode => "read"
	}
}

filter {
	mutate {
		gsub => ["message","(.*)", "*\1"]
	}
}

filter {
	elasticsearch {
		hosts => "http://localhost:9200"
		query_template => "c:/logstash/config/searchSpares.json"
		index => "pmaparts"
		fields => {
			"ApprovalBasis" => "[ApprovalBasis]"
			"ReplacedPartNumber" => "[ReplacedPartNumber]"
		}
	}
}

output { 
	stdout {}
}

Passing the variable in the wrong way?

Ok, for reference, this seems to work like intended

{
	"query": {
		"bool": {
			"must": [
				{
					"wildcard": {
						"ReplacedPartNumber": "%{[message]}"
					}
				}
			],
			"must_not": [],
			"should": []
		}
	},
	"from": 0,
	"sort": [],
	"aggs": {}
}
1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.