My ETL Stack is JSON File => Filebeat => Logstash => Elasticsearch
I am trying to specify an action in my JSON object that is either update or delete
Here's a partial of my current JSON
{ "key":123,"index_name":"companies"}
Here's my current Pipeline .conf that does inserts or updates just fine
input {
beats {
id => "filebeat-input"
port => 5044
codec => "json"
include_codec_tag => false
}
}
output {
elasticsearch {
id => "elasticsearch-output"
hosts => ["localhost:9200"]
document_id => "%{key}"
index => "%{[@metadata][index_name]}"
action => "update"
doc_as_upsert => true
manage_template => false
}
}
As mentioned, I now want to delete documents using this same pattern.
Updated JSON with new action
field
{ "key":123,"index_name":"companies","action":"update"}
{ "key":123,"index_name":"companies","action":"delete"}
Updated Pipeline .conf file
output {
elasticsearch {
id => "elasticsearch-output"
hosts => ["localhost:9200"]
document_id => "%{key}"
index => "%{[@metadata][index_name]}"
action => "%{action}"
doc_as_upsert => true
manage_template => false
}
}
My Elasticsearch ETL stack takes JSON objects from files using Filebeat and passes them through Logstash to Elasticsearch. I want to pass an action property in my JSON object that designates either "update" or "delete as the action I want to perform against the document in the index. My current config has the action hardwired to "update" and will do upserts just fine.
JSON Sample
{ "key":123,"index_name":"companies","action":"update"}
{ "key":123,"index_name":"companies","action":"delete"}
Current Pipeline.conf
input {
beats {
id => "filebeat-input"
port => 5044
codec => "json"
include_codec_tag => false
}
}
output {
elasticsearch {
id => "elasticsearch-output"
hosts => ["localhost:9200"]
document_id => "%{key}"
index => "%{[@metadata][index_name]}"
action => "update"
doc_as_upsert => true
manage_template => false
}
}
I tried to do this:
output {
elasticsearch {
id => "elasticsearch-output"
hosts => ["localhost:9200"]
document_id => "%{key}"
index => "%{[@metadata][index_name]}"
action => "%{action}"
doc_as_upsert => true
manage_template => false
}
}
When I run that conf and the document doesn't exist, it is not doing an update as expected.
It throws this error:
[2019-08-21T15:21:28,879][WARN ][logstash.outputs.elasticsearch]
Could not index event to Elasticsearch.
{:status=>404, :action=>["update", {:_id=>"123", :_index=>"companies", :_type=>"_doc", :routing=>nil, :retry_on_conflict=>1}, #],
:response=>{"update"=>{"_index"=>"companies", "_type"=>"_doc", "_id"=>"123", "status"=>404,
"error"=>{"type"=>"document_missing_exception", "reason"=>"[_doc][123]: document missing", "index_uuid"=>"uU9oXFtZSXGodoh70YG3Ng", "shard"=>"0", "index"=>"companies"}}}}
Why are my upserts no longer working?
I know I can use an if statement inside the .conf file and have two different output paths based on the action
field, but I don't understand why the solution above doesn't work.