Hi,
I have a logstash pipeline where I'm reading data in from Kafka and then inside of the filter plugin I have a conditional to call the elasticsearch input plugin if a certain condition is met.
The condition:
If serial_no
is missing from the current event, then I want to go back 2 mins and replace serial_no
with the value of serial_no
from an event where the slot.keyword
was equal to "LM-1".
So if the current event has these fields:
"serial_no" = " "
"slot.keyword" = "LM-1"
I want to go back in time 2 min and pull out the serial_no
for this specific slot. I know currently if slot LM-2 came in with a missing serial_no
then if this pipeline worked it would replace the missing serial_no
with the serial number of "LM-1", but I'll worry about fixing that until I can get the elasticsearch query to function properly.
This is what my pipeline looks like:
input {
kafka {
client_id => "sandbox"
topics => ["sandbox-topic"]
group_id => "id2"
bootstrap_servers => "server1.kafka.com:9093"
consumer_threads => 1
codec => "json"
security_protocol => "SSL"
ssl_keystore_location => "/usr/share/logstash/config/myKey.jks"
ssl_keystore_password => "password"
ssl_keystore_type => PKCS12
ssl_truststore_location => "/usr/share/logstash/config/myCA.jks"
}
}
filter {
json {
source => "message"
remove_field => [ "message" ]
}
if [fields] {
ruby {
code => '
event.get("fields").each { |k, v|
event.set(k,v)
}
event.remove("fields")
'
}
}
if [tags] {
ruby {
code => '
event.get("tags").each { |k, v|
event.set(k,v)
}
event.remove("tags")
'
}
}
if [name] == "components" and !("serial_no" in [fields]) {
elasticsearch {
hosts => ["https://ESaddress:443"]
index => "myIndex-2023.01.21"
user => 'username'
password => '12345'
query => '{"query":{"bool": {"must":[{"term": {"slot.keyword": "LM-1"}}], "filter": {"range": {"@timestamp": {"gte": "now-2m"}}}}}}'
}
} else {
drop {}
}
}
output {
stdout { }
}
This is the warning message that I'm getting:
[2023-01-21T15:38:27,662][WARN ][logstash.filters.elasticsearch][main][75f950dce3774ae3a277ce0d7a2c3d741be832c4a8c35890090f7460439151f6] Failed to query elasticsearch for previous event {:index=>"ciena-component-lab-2023.01.21", :error=>"[400] {\"error\":{\"root_cause\":[{\"type\":\"parse_exception\",\"reason\":\"parse_exception: Encountered \\\" <RANGE_GOOP> \\\"{\\\\\\\"must\\\\\\\":[{\\\\\\\"term\\\\\\\": \\\"\\\" at line 1, column 18.\\nWas expecting:\\n \\\"TO\\\" ...\\n \"}],\"type\":\"search_phase_execution_exception\",\"reason\":\"all shards failed\",\"phase\":\"query\",\"grouped\":true,\"failed_shards\":[{\"shard\":0,\"index\":\"ciena-component-lab-2023.01.21\",\"node\":\"sxBSFm94R2ebBweDKruTQw\",\"reason\":{\"type\":\"query_shard_exception\",\"reason\":\"Failed to parse query [{\\\"query\\\":{\\\"bool\\\": {\\\"must\\\":[{\\\"term\\\": {\\\"slot.keyword\\\": \\\"LM-1\\\"}}], \\\"filter\\\": {\\\"range\\\": {\\\"@timestamp\\\": {\\\"gte\\\": \\\"now-2m\\\"}}}}}}]\",\"index_uuid\":\"kUGWlXKGQF6Qw8JnwN1WIw\",\"index\":\"ciena-component-lab-2023.01.21\",\"caused_by\":{\"type\":\"parse_exception\",\"reason\":\"parse_exception: Cannot parse '{\\\"query\\\":{\\\"bool\\\": {\\\"must\\\":[{\\\"term\\\": {\\\"slot.keyword\\\": \\\"LM-1\\\"}}], \\\"filter\\\": {\\\"range\\\": {\\\"@timestamp\\\": {\\\"gte\\\": \\\"now-2m\\\"}}}}}}': Encountered \\\" <RANGE_GOOP> \\\"{\\\\\\\"must\\\\\\\":[{\\\\\\\"term\\\\\\\": \\\"\\\" at line 1, column 18.\\nWas expecting:\\n \\\"TO\\\" ...\\n \",\"caused_by\":{\"type\":\"parse_exception\",\"reason\":\"parse_exception: Encountered \\\" <RANGE_GOOP> \\\"{\\\\\\\"must\\\\\\\":[{\\\\\\\"term\\\\\\\": \\\"\\\" at line 1, column 18.\\nWas expecting:\\n \\\"TO\\\" ...\\n \"}}}}],\"caused_by\":{\"type\":\"parse_exception\",\"reason\":\"parse_exception: Encountered \\\" <RANGE_GOOP> \\\"{\\\\\\\"must\\\\\\\":[{\\\\\\\"term\\\\\\\": \\\"\\\" at line 1, column 18.\\nWas expecting:\\n \\\"TO\\\" ...\\n \"}},\"status\":400}"}
I am able to successfully run the query in the Dev Console. It just doesn't run inside Logstash.