I want to pass my json data from kafka to elasticseaarch. Judged by the operation param to different action of elasticsearch.
my json data is like this:
{
"operation": "create",
"fields": {
"sign": "im_messages",
"from_id": 12797,
"to_id": 12796,
"content": "hello",
"accept": 0,
"is_revoke": 0,
"created_at": "2019-03-21 13:53:16"
}
}
now i have successfully transfer the data into es by different operation,such as create/update/delete/index. The problem is, I cant remove the operation param from elasticsearch, because if i remove operation in the filter, then i will cant use it for the output elasticsearch action. How can i do it?
my logstash filter and output config is like this:
filter {
mutate {
add_field => {"@fields" => "%{fields}"}
}
json {
source => "@fields"
remove_field => ["@fields","@version","@timestamp","fields"]
}
}
output {
if [operation] == "create" {
elasticsearch {
hosts => ["xx.xx.xx.xx:9200"]
index => "im"
timeout => 300 #user => "elastic" #password => "changeme"
}
.....
and now the update and create operation result in elasticsearch is like this:
-{
"_id": "oyl5pGkBReZGJ_hya8ck",
"_index": "im",
"_score": 1,
-"_source": {
"accept": 0,
"content": "ssssssssssssss",
"created_at": "2019-03-22 16:14:45",
"from_id": 12796,
"id": "oyl5pGkBReZGJ_hya8ck",
"is_revoke": 0,
"operation": "update",
"sign": "im_messages",
"to_id": 12797
},
"_type": "doc"
},
-{
"_id": "An-BpGkBUAnQ0IgRfic8",
"_index": "im",
"_score": 1,
-"_source": {
"accept": 0,
"content": "???",
"created_at": "2019-03-22 10:21:56",
"from_id": 12796,
"is_revoke": 0,
"operation": "create",
"sign": "im_messages",
"to_id": 12797
},
"_type": "doc"
}
The operation and id params is what i want to remove in the source field.
what can i do?
yeah, it works, but new problem occurs.
I add metadata in the filter:
filter {
mutate {
add_field => {"@fields" => "%{fields}"}
rename => { "[operation]" => "[@metadata][operation]" }
rename => { "[id]" => "[@metadata][id]" }
}
json {
source => "@fields"
remove_field => ["@fields","@version","@timestamp","fields"]
}
}
and i use the id metadata to update a doc when the operation is set to update:
if [@metadata][operation] == "update" {
elasticsearch {
hosts => ["xx.xx.xx.xx:9200"]
index => "im"
action => "update"
doc_as_upsert => true
timeout => 300
document_id => "%{[@metadata][id]}" #user => "elastic" #password => "changeme"
}
}
the result is create a new doc other than update the exists one:
That suggests that the [id] field does not exist at the point where you do the mutate+rename. Is it created by the json filter? If so, move the rename+mutate of [id] after the json filter.
Also, please do not post pictures of text. Just post the text.
No, the [id] field is not created by json filter ,it is tranfer from the massage data, and i want the [id] for elasticsearch document_id which depends which doc to update, but not added to the source field.
my json data for update is like this:
{"operation":"update","fields":{"id":"sCmcs2kBReZGJ_hybsd4","content":"xxxxxxxxxxxxx","updated_at":"2019-03-25 14:14:45"}}
and i want to update this doc:
{
So when you say "No", you mean "Yes". The [id] field does not exist until the json filter is executed. Move the rename+mutate of [id] after the json filter
Yeah, I put the codec to json in the input settings, and remove the rename+mutate of [id], this can update the exact doc of elasticsearch, but the document source has a [id] field which i dont want to set in the source.
my logstash config file is setting like this:
input {
kafka {
bootstrap_servers => "xx.xx.xx.xx:9092" #client_id => "logstash1" # The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included.
group_id => "im_group"
auto_offset_reset => "latest" #automatically reset the offset to the latest offset
consumer_threads => 3
topics => ["im"] #type => "message"
codec => json {
charset => "UTF-8"
}
}
}
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.