How to delete a document in Elasticsearch using Logstash

My ETL Stack is JSON File => Filebeat => Logstash => Elasticsearch
I am trying to specify an action in my JSON object that is either update or delete

Here's a partial of my current JSON

{ "key":123,"index_name":"companies"}

Here's my current Pipeline .conf that does inserts or updates just fine

input { 
    beats {
        id => "filebeat-input"
        port => 5044
        codec => "json"
        include_codec_tag => false
    }
} 
output { 
    elasticsearch { 
      id => "elasticsearch-output"
      hosts => ["localhost:9200"]
      document_id => "%{key}"
      index => "%{[@metadata][index_name]}"
      action => "update"
      doc_as_upsert => true
      manage_template => false
    }
}

As mentioned, I now want to delete documents using this same pattern.

Updated JSON with new action field

{ "key":123,"index_name":"companies","action":"update"}
{ "key":123,"index_name":"companies","action":"delete"}

Updated Pipeline .conf file

output { 
    elasticsearch { 
      id => "elasticsearch-output"
      hosts => ["localhost:9200"]
      document_id => "%{key}"
      index => "%{[@metadata][index_name]}"
      action => "%{action}"
      doc_as_upsert => true
      manage_template => false
    }
}

My Elasticsearch ETL stack takes JSON objects from files using Filebeat and passes them through Logstash to Elasticsearch. I want to pass an action property in my JSON object that designates either "update" or "delete as the action I want to perform against the document in the index. My current config has the action hardwired to "update" and will do upserts just fine.

JSON Sample

{ "key":123,"index_name":"companies","action":"update"}
{ "key":123,"index_name":"companies","action":"delete"}

Current Pipeline.conf

input { 
    beats {
        id => "filebeat-input"
        port => 5044
        codec => "json"
        include_codec_tag => false
    }
} 
output { 
    elasticsearch { 
      id => "elasticsearch-output"
      hosts => ["localhost:9200"]
      document_id => "%{key}"
      index => "%{[@metadata][index_name]}"
      action => "update"
      doc_as_upsert => true
      manage_template => false
    }
}

I tried to do this:

output { 
    elasticsearch { 
      id => "elasticsearch-output"
      hosts => ["localhost:9200"]
      document_id => "%{key}"
      index => "%{[@metadata][index_name]}"
      action => "%{action}"
      doc_as_upsert => true
      manage_template => false
    }
}

When I run that conf and the document doesn't exist, it is not doing an update as expected.
It throws this error:

[2019-08-21T15:21:28,879][WARN ][logstash.outputs.elasticsearch]
Could not index event to Elasticsearch.
{:status=>404, :action=>["update", {:_id=>"123", :_index=>"companies", :_type=>"_doc", :routing=>nil, :retry_on_conflict=>1}, #],
:response=>{"update"=>{"_index"=>"companies", "_type"=>"_doc", "_id"=>"123", "status"=>404,
"error"=>{"type"=>"document_missing_exception", "reason"=>"[_doc][123]: document missing", "index_uuid"=>"uU9oXFtZSXGodoh70YG3Ng", "shard"=>"0", "index"=>"companies"}}}}

Why are my upserts no longer working?

I know I can use an if statement inside the .conf file and have two different output paths based on the action field, but I don't understand why the solution above doesn't work.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.