Feed file input to an ES query

Dear all,

I'm trying to achieve the following:

  1. Read a file: One value per line
  2. Transform this value into a wildcard value, e.g. "*value*"
  3. Feed the value into an ES query
  4. Return the results of the query

These are my settings so far:

logstash.conf

input {
	file {
		path => "d:/test.csv"
		start_position => "beginning"
		sincedb_path => "NULL"
		mode => "read"
	}
}

filter {
	mutate {
		gsub => ["message","(.*)", "*\1"]
	}
}

filter {
	elasticsearch {
		query_template => "c:/logstash/config/searchSpares.json"
		index => "pmaparts"
		fields => {
			"ApprovalBasis" => "[ApprovalBasis]"
			"ReplacedPartNumber" => "[ReplacedPartNumber]"
  }
}

output { 
	stdout {}
}

Query Template:

{
	"query": {
		"bool": {
			"must": [
				{
					"wildcard": {
						"ReplacedPartNumber": "message"
					}
				}
			],
			"must_not": [],
			"should": []
		}
	},
	"from": 0,
	"sort": [],
	"aggs": {}
}
This is the error I'm getting
C:\logstash\bin>logstash -f c:\logstash\config\readSpares.conf --config.test_and_exit  --config.debug --log.level=debug
Sending Logstash logs to C:/logstash/logs which is now configured via log4j2.properties
[2020-03-08T13:23:59,815][DEBUG][logstash.modules.scaffold] Found module {:module_name=>"fb_apache", :directory=>"C:/logstash/modules/fb_apache/configuration"}
[2020-03-08T13:23:59,945][DEBUG][logstash.plugins.registry] Adding plugin to the registry {:name=>"fb_apache", :type=>:modules, :class=>#<LogStash::Modules::Scaffold:0x3dcf0 @directory="C:/logstash/modules/fb_apache/configuration", @module_name="fb_apache", @kibana_version_parts=["6", "0", "0"]>}
[2020-03-08T13:23:59,952][DEBUG][logstash.modules.scaffold] Found module {:module_name=>"netflow", :directory=>"C:/logstash/modules/netflow/configuration"}
[2020-03-08T13:23:59,961][DEBUG][logstash.plugins.registry] Adding plugin to the registry {:name=>"netflow", :type=>:modules, :class=>#<LogStash::Modules::Scaffold:0x131d9f3 @directory="C:/logstash/modules/netflow/configuration", @module_name="netflow", @kibana_version_parts=["6", "0", "0"]>}
[2020-03-08T13:24:00,074][DEBUG][logstash.runner          ] -------- Logstash Settings (* means modified) ---------
[2020-03-08T13:24:00,079][DEBUG][logstash.runner          ] node.name: "DESKTOP-H3VM3G3"
[2020-03-08T13:24:00,090][DEBUG][logstash.runner          ] *path.config: "c:\\logstash\\config\\readSpares.conf"
[2020-03-08T13:24:00,092][DEBUG][logstash.runner          ] path.data: "C:/logstash/data"
[2020-03-08T13:24:00,095][DEBUG][logstash.runner          ] modules.cli: []
[2020-03-08T13:24:00,097][DEBUG][logstash.runner          ] modules: []
[2020-03-08T13:24:00,110][DEBUG][logstash.runner          ] modules_list: []
[2020-03-08T13:24:00,112][DEBUG][logstash.runner          ] modules_variable_list: []
[2020-03-08T13:24:00,116][DEBUG][logstash.runner          ] modules_setup: false
[2020-03-08T13:24:00,118][DEBUG][logstash.runner          ] *config.test_and_exit: true (default: false)
[2020-03-08T13:24:00,131][DEBUG][logstash.runner          ] config.reload.automatic: false
[2020-03-08T13:24:00,133][DEBUG][logstash.runner          ] config.reload.interval: 3000000000
[2020-03-08T13:24:00,135][DEBUG][logstash.runner          ] config.support_escapes: false
[2020-03-08T13:24:00,137][DEBUG][logstash.runner          ] config.field_reference.parser: "STRICT"
[2020-03-08T13:24:00,139][DEBUG][logstash.runner          ] metric.collect: true
[2020-03-08T13:24:00,153][DEBUG][logstash.runner          ] pipeline.id: "main"
[2020-03-08T13:24:00,155][DEBUG][logstash.runner          ] pipeline.system: false
[2020-03-08T13:24:00,157][DEBUG][logstash.runner          ] pipeline.workers: 6
[2020-03-08T13:24:00,159][DEBUG][logstash.runner          ] pipeline.batch.size: 125
[2020-03-08T13:24:00,173][DEBUG][logstash.runner          ] pipeline.batch.delay: 50
[2020-03-08T13:24:00,175][DEBUG][logstash.runner          ] pipeline.unsafe_shutdown: false
[2020-03-08T13:24:00,176][DEBUG][logstash.runner          ] pipeline.java_execution: true
[2020-03-08T13:24:00,178][DEBUG][logstash.runner          ] pipeline.reloadable: true
[2020-03-08T13:24:00,180][DEBUG][logstash.runner          ] pipeline.plugin_classloaders: false
[2020-03-08T13:24:00,195][DEBUG][logstash.runner          ] pipeline.separate_logs: false
[2020-03-08T13:24:00,197][DEBUG][logstash.runner          ] path.plugins: []
[2020-03-08T13:24:00,199][DEBUG][logstash.runner          ] *config.debug: true (default: false)
[2020-03-08T13:24:00,200][DEBUG][logstash.runner          ] *log.level: "debug" (default: "info")
[2020-03-08T13:24:00,203][DEBUG][logstash.runner          ] version: false
[2020-03-08T13:24:00,217][DEBUG][logstash.runner          ] help: false
[2020-03-08T13:24:00,219][DEBUG][logstash.runner          ] log.format: "plain"
[2020-03-08T13:24:00,221][DEBUG][logstash.runner          ] http.host: "127.0.0.1"
[2020-03-08T13:24:00,222][DEBUG][logstash.runner          ] http.port: 9600..9700
[2020-03-08T13:24:00,225][DEBUG][logstash.runner          ] http.environment: "production"
[2020-03-08T13:24:00,241][DEBUG][logstash.runner          ] queue.type: "memory"
[2020-03-08T13:24:00,243][DEBUG][logstash.runner          ] queue.drain: false
[2020-03-08T13:24:00,245][DEBUG][logstash.runner          ] queue.page_capacity: 67108864
[2020-03-08T13:24:00,247][DEBUG][logstash.runner          ] queue.max_bytes: 1073741824
[2020-03-08T13:24:00,261][DEBUG][logstash.runner          ] queue.max_events: 0
[2020-03-08T13:24:00,263][DEBUG][logstash.runner          ] queue.checkpoint.acks: 1024
[2020-03-08T13:24:00,264][DEBUG][logstash.runner          ] queue.checkpoint.writes: 1024
[2020-03-08T13:24:00,266][DEBUG][logstash.runner          ] queue.checkpoint.interval: 1000
[2020-03-08T13:24:00,267][DEBUG][logstash.runner          ] queue.checkpoint.retry: false
[2020-03-08T13:24:00,269][DEBUG][logstash.runner          ] dead_letter_queue.enable: false
[2020-03-08T13:24:00,284][DEBUG][logstash.runner          ] dead_letter_queue.max_bytes: 1073741824
[2020-03-08T13:24:00,285][DEBUG][logstash.runner          ] slowlog.threshold.warn: -1
[2020-03-08T13:24:00,287][DEBUG][logstash.runner          ] slowlog.threshold.info: -1
[2020-03-08T13:24:00,289][DEBUG][logstash.runner          ] slowlog.threshold.debug: -1
[2020-03-08T13:24:00,290][DEBUG][logstash.runner          ] slowlog.threshold.trace: -1
[2020-03-08T13:24:00,292][DEBUG][logstash.runner          ] keystore.classname: "org.logstash.secret.store.backend.JavaKeyStore"
[2020-03-08T13:24:00,306][DEBUG][logstash.runner          ] keystore.file: "C:/logstash/config/logstash.keystore"
[2020-03-08T13:24:00,308][DEBUG][logstash.runner          ] path.queue: "C:/logstash/data/queue"
[2020-03-08T13:24:00,309][DEBUG][logstash.runner          ] path.dead_letter_queue: "C:/logstash/data/dead_letter_queue"
[2020-03-08T13:24:00,310][DEBUG][logstash.runner          ] path.settings: "C:/logstash/config"
[2020-03-08T13:24:00,312][DEBUG][logstash.runner          ] path.logs: "C:/logstash/logs"
[2020-03-08T13:24:00,313][DEBUG][logstash.runner          ] xpack.management.enabled: false
[2020-03-08T13:24:00,329][DEBUG][logstash.runner          ] xpack.management.logstash.poll_interval: 5000000000
[2020-03-08T13:24:00,331][DEBUG][logstash.runner          ] xpack.management.pipeline.id: ["main"]
[2020-03-08T13:24:00,332][DEBUG][logstash.runner          ] xpack.management.elasticsearch.username: "logstash_system"
[2020-03-08T13:24:00,334][DEBUG][logstash.runner          ] xpack.management.elasticsearch.hosts: ["https://localhost:9200"]
[2020-03-08T13:24:00,335][DEBUG][logstash.runner          ] xpack.management.elasticsearch.ssl.verification_mode: "certificate"
[2020-03-08T13:24:00,351][DEBUG][logstash.runner          ] xpack.management.elasticsearch.sniffing: false
[2020-03-08T13:24:00,354][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.hosts: ["http://localhost:9200"]
[2020-03-08T13:24:00,356][DEBUG][logstash.runner          ] xpack.monitoring.collection.interval: 10000000000
[2020-03-08T13:24:00,357][DEBUG][logstash.runner          ] xpack.monitoring.collection.timeout_interval: 600000000000
[2020-03-08T13:24:00,359][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.username: "logstash_system"
[2020-03-08T13:24:00,373][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.ssl.verification_mode: "certificate"
[2020-03-08T13:24:00,374][DEBUG][logstash.runner          ] xpack.monitoring.elasticsearch.sniffing: false
[2020-03-08T13:24:00,375][DEBUG][logstash.runner          ] xpack.monitoring.collection.pipeline.details.enabled: true
[2020-03-08T13:24:00,377][DEBUG][logstash.runner          ] xpack.monitoring.collection.config.enabled: true
[2020-03-08T13:24:00,378][DEBUG][logstash.runner          ] node.uuid: ""
[2020-03-08T13:24:00,379][DEBUG][logstash.runner          ] --------------- Logstash Settings -------------------
[2020-03-08T13:24:00,431][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-03-08T13:24:00,481][DEBUG][logstash.config.source.local.configpathloader] Skipping the following files while reading config since they don't match the specified glob pattern {:files=>[...]}
[2020-03-08T13:24:00,487][DEBUG][logstash.config.source.local.configpathloader] Reading config file {:config_file=>"c:/logstash/config/readSpares.conf"}
[2020-03-08T13:24:00,524][DEBUG][logstash.config.pipelineconfig] -------- Logstash Config ---------
[2020-03-08T13:24:00,528][DEBUG][logstash.config.pipelineconfig] Config from source {:source=>LogStash::Config::Source::Local, :pipeline_id=>:main}
[2020-03-08T13:24:00,540][DEBUG][logstash.config.pipelineconfig] Config string {:protocol=>"file", :id=>"c:/logstash/config/readSpares.conf"}
[2020-03-08T13:24:00,542][DEBUG][logstash.config.pipelineconfig]

input {
        file {
                path => "d:/test.csv"
                start_position => "beginning"
                sincedb_path => "NULL"
                mode => "read"
        }
}

filter {
        mutate {
                gsub => ["message","(.*)", "*\1"]
        }
}

filter {
        elasticsearch {
                query_template => "c:/logstash/config/searchSpares.json"
                index => "pmaparts"
                fields => {
                        "ApprovalBasis" => "[ApprovalBasis]"
                        "ReplacedPartNumber" => "[ReplacedPartNumber]"
  }
}

output {
        stdout {}
}
[2020-03-08T13:24:00,544][DEBUG][logstash.config.pipelineconfig] Merged config
[2020-03-08T13:24:00,549][DEBUG][logstash.config.pipelineconfig]

input {
        file {
                path => "d:/test.csv"
                start_position => "beginning"
                sincedb_path => "NULL"
                mode => "read"
        }
}

filter {
        mutate {
                gsub => ["message","(.*)", "*\1"]
        }
}

filter {
        elasticsearch {
                query_template => "c:/logstash/config/searchSpares.json"
                index => "pmaparts"
                fields => {
                        "ApprovalBasis" => "[ApprovalBasis]"
                        "ReplacedPartNumber" => "[ReplacedPartNumber]"
  }
}

output {
        stdout {}
}
[2020-03-08T13:24:01,131][FATAL][logstash.runner          ] The given configuration is invalid. Reason: Expected one of [ \t\r\n], "#", "=>" at line 27, column 9 (byte 418) after filter {
        elasticsearch {
                query_template => "c:/logstash/config/searchSpares.json"
                index => "pmaparts"
                fields => {
                        "ApprovalBasis" => "[ApprovalBasis]"
                        "ReplacedPartNumber" => "[ReplacedPartNumber]"
  }
}

output {
        stdout
[2020-03-08T13:24:01,138][ERROR][org.logstash.Logstash    ] java.lang.IllegalStateException: Logstash stopped processing because of an error: (SystemExit) exit

My questions:

  1. Is this, generally speaking, a valid approach?
  2. Where is my error?
  3. How do I correctly pass the variable part from the file as argument to the ES query?

Many thanks for any help in advance.

Ok, after fixing some stupid errors in the config, logstash starts up fine but does not seem to execute the query template.

input {
	file {
		path => "d:/test.csv"
		start_position => "beginning"
		sincedb_path => "NULL"
		mode => "read"
	}
}

filter {
	mutate {
		gsub => ["message","(.*)", "*\1"]
	}
}

filter {
	elasticsearch {
		hosts => "http://localhost:9200"
		query_template => "c:/logstash/config/searchSpares.json"
		index => "pmaparts"
		fields => {
			"ApprovalBasis" => "[ApprovalBasis]"
			"ReplacedPartNumber" => "[ReplacedPartNumber]"
		}
	}
}

output { 
	stdout {}
}

Passing the variable in the wrong way?

Ok, for reference, this seems to work like intended

{
	"query": {
		"bool": {
			"must": [
				{
					"wildcard": {
						"ReplacedPartNumber": "%{[message]}"
					}
				}
			],
			"must_not": [],
			"should": []
		}
	},
	"from": 0,
	"sort": [],
	"aggs": {}
}