HTTP output plugin to update datastreams entire document

Hi,

MY use case is as follows. I have a document which is getting inserted into elasticsearch datastreams. The same document is getting updated multiple times in the application and I would like to use _update_query API to query and update the entire document using logstash http output plugin ( Since datastreams only support insert by default- It seems the only option for me is to use update_query API plugin to update a datastream document.

I think my configuration works for most part, the only area where it is not working is trying to replace ctx._source with my entire input event document. I feel it is because the input json is getting parsed into fields and even though I used "target" in filter json, it still doesnt work. Any idea how to achieve this?

         input{
    beats{
        port => 5044
    }
  
} 
     
       
           
 
        filter{
           json {
             source => "message"
             target => "inputdoc"
               }
                                        
             }
        
              
          output {
    if[inputdoc][key] == "elasticsearch" and [inputdoc][sequence] > 0 {       
             http {
               url => "http://xxxxxxx/logs-rave-stage-stream-default/_update_by_query"
               http_method => "post"
               content_type => "application/json"
               format => "message"
               message => '{
  "script": {
    "source": "ctx._source = [inputdoc]",
    "lang": "painless"
   
  },
  "query": {
    "term": {
      "uuid": "0558fa46-c24b-46b9-a83d-d1c3c5ceb53e"
    }
  }
}'
                  }
             elasticsearch    {
                hosts => ["xxxxxxxxxxxx"]
                index => "xxxxxxx"
                action => "index"
                doc_as_upsert => "true"
                document_id => "%{[inputdoc][uuid]}"
                version => "%{[inputdoc][sequence]}"
                version_type => "external_gte"
                                  }
                 }
             
     else if[inputdoc][key] == "elasticsearch" and [inputdoc][sequence] == 0 {

              elasticsearch    {
                hosts => ["xxxxxxxxxxxx"]
                data_stream => "true"
                data_stream_dataset => "xxxxxx"
                 }
                elasticsearch    {
                hosts => ["xxxxxxxxxx"]
                index => "xxxxxxxxxxx"
                action => "index"
                doc_as_upsert => "true"
                document_id => "%{[inputdoc][uuid]}"
                version => "%{[inputdoc][sequence]}"
                version_type => "external_gte"
                         }



     }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.