Logstash, query not executing inside of the elasticsearch input plugin

Hi,
I have a logstash pipeline where I'm reading data in from Kafka and then inside of the filter plugin I have a conditional to call the elasticsearch input plugin if a certain condition is met.

The condition:
If serial_no is missing from the current event, then I want to go back 2 mins and replace serial_no with the value of serial_no from an event where the slot.keyword was equal to "LM-1".

So if the current event has these fields:

"serial_no" = " "
"slot.keyword" = "LM-1"

I want to go back in time 2 min and pull out the serial_no for this specific slot. I know currently if slot LM-2 came in with a missing serial_no then if this pipeline worked it would replace the missing serial_no with the serial number of "LM-1", but I'll worry about fixing that until I can get the elasticsearch query to function properly.

This is what my pipeline looks like:

input {
    kafka {
      client_id => "sandbox"
      topics => ["sandbox-topic"]
      group_id => "id2"
      bootstrap_servers => "server1.kafka.com:9093"
      consumer_threads => 1
      codec => "json"
      security_protocol => "SSL"
      ssl_keystore_location => "/usr/share/logstash/config/myKey.jks"
      ssl_keystore_password => "password"
      ssl_keystore_type => PKCS12
      ssl_truststore_location => "/usr/share/logstash/config/myCA.jks"
    }
}

filter { 
    json { 
        source => "message" 
        remove_field => [ "message" ] 
    }

    if [fields] {
        ruby {
        code => '
            event.get("fields").each { |k, v|
            event.set(k,v)
            }
            event.remove("fields")
        '
        }
    }
    
    if [tags] {
        ruby {
            code => '
            event.get("tags").each { |k, v|
                event.set(k,v)
            }
                event.remove("tags")
            '
        }
    }
    
    if [name] == "components" and !("serial_no" in [fields]) {
        elasticsearch {
            hosts => ["https://ESaddress:443"]
            index => "myIndex-2023.01.21"
            user => 'username'
            password => '12345'
            query => '{"query":{"bool": {"must":[{"term": {"slot.keyword": "LM-1"}}], "filter": {"range": {"@timestamp": {"gte": "now-2m"}}}}}}'
        }
        
    } else {
        drop {}
    }
}

output {
    stdout { }
}

This is the warning message that I'm getting:

[2023-01-21T15:38:27,662][WARN ][logstash.filters.elasticsearch][main][75f950dce3774ae3a277ce0d7a2c3d741be832c4a8c35890090f7460439151f6] Failed to query elasticsearch for previous event {:index=>"ciena-component-lab-2023.01.21", :error=>"[400] {\"error\":{\"root_cause\":[{\"type\":\"parse_exception\",\"reason\":\"parse_exception: Encountered \\\" <RANGE_GOOP> \\\"{\\\\\\\"must\\\\\\\":[{\\\\\\\"term\\\\\\\": \\\"\\\" at line 1, column 18.\\nWas expecting:\\n    \\\"TO\\\" ...\\n    \"}],\"type\":\"search_phase_execution_exception\",\"reason\":\"all shards failed\",\"phase\":\"query\",\"grouped\":true,\"failed_shards\":[{\"shard\":0,\"index\":\"ciena-component-lab-2023.01.21\",\"node\":\"sxBSFm94R2ebBweDKruTQw\",\"reason\":{\"type\":\"query_shard_exception\",\"reason\":\"Failed to parse query [{\\\"query\\\":{\\\"bool\\\": {\\\"must\\\":[{\\\"term\\\": {\\\"slot.keyword\\\": \\\"LM-1\\\"}}], \\\"filter\\\": {\\\"range\\\": {\\\"@timestamp\\\": {\\\"gte\\\": \\\"now-2m\\\"}}}}}}]\",\"index_uuid\":\"kUGWlXKGQF6Qw8JnwN1WIw\",\"index\":\"ciena-component-lab-2023.01.21\",\"caused_by\":{\"type\":\"parse_exception\",\"reason\":\"parse_exception: Cannot parse '{\\\"query\\\":{\\\"bool\\\": {\\\"must\\\":[{\\\"term\\\": {\\\"slot.keyword\\\": \\\"LM-1\\\"}}], \\\"filter\\\": {\\\"range\\\": {\\\"@timestamp\\\": {\\\"gte\\\": \\\"now-2m\\\"}}}}}}': Encountered \\\" <RANGE_GOOP> \\\"{\\\\\\\"must\\\\\\\":[{\\\\\\\"term\\\\\\\": \\\"\\\" at line 1, column 18.\\nWas expecting:\\n    \\\"TO\\\" ...\\n    \",\"caused_by\":{\"type\":\"parse_exception\",\"reason\":\"parse_exception: Encountered \\\" <RANGE_GOOP> \\\"{\\\\\\\"must\\\\\\\":[{\\\\\\\"term\\\\\\\": \\\"\\\" at line 1, column 18.\\nWas expecting:\\n    \\\"TO\\\" ...\\n    \"}}}}],\"caused_by\":{\"type\":\"parse_exception\",\"reason\":\"parse_exception: Encountered \\\" <RANGE_GOOP> \\\"{\\\\\\\"must\\\\\\\":[{\\\\\\\"term\\\\\\\": \\\"\\\" at line 1, column 18.\\nWas expecting:\\n    \\\"TO\\\" ...\\n    \"}},\"status\":400}"}

I am able to successfully run the query in the Dev Console. It just doesn't run inside Logstash.

Hello,

You are not using the Elasticsearch input plugin but the Elasticsearch filter plugin. Unfortunately, they have the same parameter query with diffferent meanings: In the input plugin, you can set the JSON query here, in the filter plugin you can use this to set the query string(e.g. status:active)

So I think that you have the wrong parameter:

Elasticsearch query string. More information is available in the Elasticsearch query string documentation.

What you want is the query template:

File path to elasticsearch query in DSL format. More information is available in the Elasticsearch query documentation.

Best regards
Wolfram

Something is not working right. I updated my elasticsearch filter plugin to this:

if [name] == "components" and !("serial_no" in [fields]) {
        elasticsearch {
            hosts => ["https://ESaddress:443"]
            index => "myIndex-2023.01.21"
            user => 'username'
            password => '12345'
            query => 'source:"10.10.10.20"'
            fields => {'serial_no' => 'new_serial_no'}
    } else {
        drop {}
    }

I'm assuming when name is equal to "components" and when serial_no is blank then the serial_no for the event with source = "10.10.10.20" will get stored to the new_serial_no field.

However, I'm getting new_serial_no" => nil in my std_out.

If I change the query to query => 'slot:"LM-1"' then new_serial_no gets assigned the serial_no from source 10.10.10.20 or 10.10.10.30.

If I change the query to query => 'source:"10.10.10.20" slot:"LM-1"' to try and get the serial_no from 10.10.10.20 then I still get new_serial_no" => nil in my std_out.

Also, is there a way to add a time range filter in the query?

A range filter indeed exists (although I havent't used it so I don't know if it works with times or only with dates): Query string query | Elasticsearch Guide [8.6] | Elastic

Regarding the source filter problem: Is it possible that source itself is not searchable?

I believe source is searchable because I can use the following query in the Dev Console and I'm able to get back events:

{"query":{"bool": {"must":[{"term": {"source": "10.10.10.20"}}], "filter": {"range": {"@timestamp": {"gte": "now-2m"}}}}}}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.