Hi,
MY use case is as follows. I have a document which is getting inserted into elasticsearch datastreams. The same document is getting updated multiple times in the application and I would like to use _update_query API to query and update the entire document using logstash http output plugin ( Since datastreams only support insert by default- It seems the only option for me is to use update_query API plugin to update a datastream document.
I think my configuration works for most part, the only area where it is not working is trying to replace ctx._source with my entire input event document. I feel it is because the input json is getting parsed into fields and even though I used "target" in filter json, it still doesnt work. Any idea how to achieve this?
input{
beats{
port => 5044
}
}
filter{
json {
source => "message"
target => "inputdoc"
}
}
output {
if[inputdoc][key] == "elasticsearch" and [inputdoc][sequence] > 0 {
http {
url => "http://xxxxxxx/logs-rave-stage-stream-default/_update_by_query"
http_method => "post"
content_type => "application/json"
format => "message"
message => '{
"script": {
"source": "ctx._source = [inputdoc]",
"lang": "painless"
},
"query": {
"term": {
"uuid": "0558fa46-c24b-46b9-a83d-d1c3c5ceb53e"
}
}
}'
}
elasticsearch {
hosts => ["xxxxxxxxxxxx"]
index => "xxxxxxx"
action => "index"
doc_as_upsert => "true"
document_id => "%{[inputdoc][uuid]}"
version => "%{[inputdoc][sequence]}"
version_type => "external_gte"
}
}
else if[inputdoc][key] == "elasticsearch" and [inputdoc][sequence] == 0 {
elasticsearch {
hosts => ["xxxxxxxxxxxx"]
data_stream => "true"
data_stream_dataset => "xxxxxx"
}
elasticsearch {
hosts => ["xxxxxxxxxx"]
index => "xxxxxxxxxxx"
action => "index"
doc_as_upsert => "true"
document_id => "%{[inputdoc][uuid]}"
version => "%{[inputdoc][sequence]}"
version_type => "external_gte"
}
}
}