Hi all,
My configuration looks like this:
input{
rabbitmq {
key => "mykey"
queue => "myqueue"
host => "myhost"
exchange => "amq.topic"
ack => true
auto_delete => false
automatic_recovery => true
codec => "plain"
durable => true
exclusive => false
metadata_enabled => true
subscription_retry_interval_seconds => 5
type => "nginx"
prefetch_count => 20
port => 4872
heartbeat => 5
passive => false
connection_timeout => 60000
}
rabbitmq {
key => "mykey"
queue => "myqueue"
host => "host"
exchange => "amq.topic"
ack => true
auto_delete => false
automatic_recovery => true
codec => "plain"
durable => true
exclusive => false
metadata_enabled => true
subscription_retry_interval_seconds => 5
type => "DPBANK_DPAPI"
prefetch_count => 20
port => 5672
heartbeat => 5
passive => false
connection_timeout => 60000
}
elasticsearch {
hosts => "myhost"
index => "dp_business-*"
docinfo => true #if not specified, we cannot acces to the [@metadata][_id]
query => '{"query": {"bool": {"must": [{"match": {"type": "DPBANK_DPAPINTERNAL"}},{"range": {"@timestamp": {"from": "now-10h","to": "now"}}}],"must_not": { "match": { "tags": "enriched"}}}}}'
tags => ["to_enrich"]
}
rabbitmq {
key => "mykey"
queue => "myqueue"
host => "host"
exchange => "amq.topic"
ack => true
auto_delete => false
automatic_recovery => true
codec => "plain"
durable => true
exclusive => false
metadata_enabled => true
subscription_retry_interval_seconds => 5
type => "DPBANK_DPAPINTERNAL"
prefetch_count => 100
port => 5672
heartbeat => 5
passive => false
connection_timeout => 60000
}
}
filter {
if [type] == "DPBANK_DPAPINTERNAL" {
json {
source => "message"
target => "target"
}
if "to_enrich" in [tags]{
mutate{
remove_field => ["message"]
}
elasticsearch {
hosts => "my host"
index => "dp_audit-*"
query_template => "query.json"
fields => { "[request]" => "new_key" }
add_tag => ["enriched", "output_splunk"]
remove_tag => [ "to_enrich" ]
}
if "_elasticsearch_lookup_failure" in [tags] {
mutate{
remove_tag => [ "_elasticsearch_lookup_failure", "enriched", "output_splunk" ]
add_tag => ["to_enrich"]
}
}
}
else {
mutate {
add_field => {"index_name" => "dp_business"}
add_tag => ["output_elastic"]
remove_field => ["message"]
}
}
}
}
output{
if "output_elastic" in [tags] {
elasticsearch {
hosts => "10.1.10.16"
index => "%{index_name}-%{+YYYY.MM.dd}"
}
}
if "enriched" in [tags] {
elasticsearch {
hosts => "10.1.10.16"
index => "%{index_name}-%{+YYYY.MM.dd}"
document_id => "%{[@metadata][_id]}"
action => "update"
}
}
}
Ok, so the idea is to create a single pipeline, and as long as i put data in elastic everything is fine.
But then I want to take the data from elastic again, using elastic input, and "enrich" this data with more information taken from another event that matches the id(the event not enriched are indexed in dp-business in this case, and they match(using another field) with event present in another index called dp-audit). Put in elastic again updating the older one.
Running the first time this pipeline, the elastic input seems not finding any documents. Running the second time yes. I think is that elastic input is calling elastic once, and obviously at first there are no data available and then is not going to listen elastic if something changed.
Hope I have explained my problem correctly.
thank you