Hi Team,
I have one pipeline with multiple output, Elasticsearch and kafka.
Now, the requirement is to sent a field to Elasticsearch, but while sending to kafka that field needs to drop.
Here my config says input is from Elasticsearch and update one field from that set of data and again save it to Elasticsearch and send to kafka. But I am not able to delete that field([customcol][flag]) while sending to kafka.
<
Elasticsearch {
hosts => ["https://10.1.1.X
:9200","https://10.1.2.X:9200","https://10.1.3.X:9200"]
index => "iece*"
user => "myuser"
password => "mypass"
query => '{"query": { "bool": { "filter": [ { "bool": { "must_not": [ { "match_phrase": { "customcol.flag": "read" } } ], "minimum_should_match": 1 } }, { "range": { "createdon": { "format": "strict_date_optional_time", "gte": "now-1d", "lte": "now" } } } ] } }, "sort": [ { "createdon": { "order": "desc", "unmapped_type": "boolean" } } ] }'
size => 500
scroll => "5m"
docinfo => true
docinfo_target => "[@metadata][doc]"
schedule => "/10 * * * * *"
}
}
filter{
mutate {update => { "[customcol][flag]" => "read"}}
mutate {remove_field => ["@version","process","tags", "splitby_copy","flag", "@timestamp"] }
}
output {
Elasticsearch {
hosts => ["https://10.1.2.X:9200","https://10.1.4.X:9200","https://10.1.3.X:9200"]
index => "%{[@metadata][doc][_index]}"
document_id => "%{[@metadata][doc][_id]}"
action => "update"
doc_as_upsert => true
manage_template => true
user => "myuser"
password => "mypass"
}
stdout { codec => rubydebug { metadata => true }}
}
output {
kafka {
bootstrap_servers => "10.1.X.X:9092,10.2.X.X:9092,10.3.X.X:9092"
topic_id => "envdbuat_allevents_%{toolcustomername}_%{toolmanager}"
codec => json
security_protocol => "SSL"
ssl_endpoint_identification_algorithm => ""
ssl_key_password => "abcdef"
ssl_keystore_location => "/etc/logstash/conf.d/extraconf/lib/kafka_certi_new/kafka.client.keystore.jks"
ssl_keystore_password => "abcdef"
ssl_truststore_location => "/etc/logstash/conf.d/extraconf/lib/kafka_certi_new/kafka.client.keystore.jks"
ssl_truststore_password => "abcdef"
}
}