Send different event to multiple output plugins in same config

Hi Team,

I have one pipeline with multiple output, elasticsearch and kafka.
Now, the requirement is to sent a field to elasticsearch, but while sending to kafka that field needs to drop.
Here my config says input is from elasticsearch and update one field from that set of data and again save it to elasticsearch and send to kafka. But I am not able to delete that field([customcol][flag]) while sending to kafka.
<
elasticsearch {
hosts => ["https://10.1.1.X
:9200","https://10.1.2.X:9200","https://10.1.3.X:9200"]
index => "iece*"
user => "myuser"
password => "mypass"
query => '{"query": { "bool": { "filter": [ { "bool": { "must_not": [ { "match_phrase": { "customcol.flag": "read" } } ], "minimum_should_match": 1 } }, { "range": { "createdon": { "format": "strict_date_optional_time", "gte": "now-1d", "lte": "now" } } } ] } }, "sort": [ { "createdon": { "order": "desc", "unmapped_type": "boolean" } } ] }'
size => 500
scroll => "5m"
docinfo => true
docinfo_target => "[@metadata][doc]"
schedule => "/10 * * * * *"
}
}

filter{
mutate {update => { "[customcol][flag]" => "read"}}

mutate {remove_field => ["@version","process","tags", "splitby_copy","flag", "@timestamp"] }

}
output {
elasticsearch {
hosts => ["https://10.1.2.X:9200","https://10.1.4.X:9200","https://10.1.3.X:9200"]
index => "%{[@metadata][doc][_index]}"
document_id => "%{[@metadata][doc][_id]}"
action => "update"
doc_as_upsert => true
manage_template => true
user => "myuser"
password => "mypass"
}

stdout { codec => rubydebug { metadata => true }}
}
output {
kafka {
bootstrap_servers => "10.1.X.X:9092,10.2.X.X:9092,10.3.X.X:9092"
topic_id => "envdbuat_allevents_%{toolcustomername}_%{toolmanager}"
codec => json
security_protocol => "SSL"
ssl_endpoint_identification_algorithm => ""
ssl_key_password => "abcdef"
ssl_keystore_location => "/etc/logstash/conf.d/extraconf/lib/kafka_certi_new/kafka.client.keystore.jks"
ssl_keystore_password => "abcdef"
ssl_truststore_location => "/etc/logstash/conf.d/extraconf/lib/kafka_certi_new/kafka.client.keystore.jks"
ssl_truststore_password => "abcdef"

}

}

Use pipeline-to-pipeline communication. See the forked-path pattern.

Thanks @Badger for your reply!

I solved it by calling API to update in Elasticsearch and remove that flag field from the event and then send it to kafka. below is my config:

input {
Elasticsearch {
hosts => ["https://10.1.2.X:9200","https://10.1.4.X:9200","https://10.1.3.X:9200"]
index => "iece*"
user => "myuser"
password => "mypass"
query => '{"query": { "bool": { "filter": [ { "bool": { "must_not": [ { "match_phrase": { "flag": "read" } } ], "minimum_should_match": 1 } }, { "range": { "createdon": { "format": "strict_date_optional_time", "gte": "now-1d", "lte": "now" } } } ] } }, "sort": [ { "createdon": { "order": "desc", "unmapped_type": "boolean" } } ] }'
size => 500
scroll => "5m"
docinfo => true
docinfo_target => "[@metadata][doc]"
schedule => "/10 * * * * *"
}
}

filter{

ruby { code => "event.set('[@metadata][currentdate]', Time.now.strftime('%d-%m-%Y'))" }
mutate { add_field => { "[@metadata][indextime]" => "iece%{[@metadata][currentdate]}" } }

mutate {remove_field => ["@version","process","tags", "splitby_copy","flag", "@timestamp"] }

}
filter {
http {
verb => "POST"
url => "https://elkurl:9200/%{[@metadata][doc][_index]}/_update/%{[@metadata][doc][_id]}"
user => "myuser"
password => "mypass"
body_format => "json"
body => '{"doc": {"flag": "read"}}'
ecs_compatibility => "disabled"
}

mutate {remove_field => ["body", "headers"] }
}

output {

stdout { codec => rubydebug { metadata => true }}
}
output {
kafka {
bootstrap_servers => "10.1.X.X:9092,10.2.X.X:9092,10.3.X.X:9092"
topic_id => "envdbuat_allevents_%{toolcustomername}_%{toolmanager}"
codec => json
security_protocol => "SSL"
ssl_endpoint_identification_algorithm => ""
ssl_key_password => "abcdef"
ssl_keystore_location => "/etc/logstash/conf.d/extraconf/lib/kafka_certi_new/kafka.client.keystore.jks"
ssl_keystore_password => "abcdef"
ssl_truststore_location => "/etc/logstash/conf.d/extraconf/lib/kafka_certi_new/kafka.client.keystore.jks"
ssl_truststore_password => "abcdef"

}

}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.