How to remove fields in logstash output

I want to pass my json data from kafka to elasticseaarch. Judged by the operation param to different action of elasticsearch.
my json data is like this:
{
"operation": "create",
"fields": {
"sign": "im_messages",
"from_id": 12797,
"to_id": 12796,
"content": "hello",
"accept": 0,
"is_revoke": 0,
"created_at": "2019-03-21 13:53:16"
}
}
now i have successfully transfer the data into es by different operation,such as create/update/delete/index. The problem is, I cant remove the operation param from elasticsearch, because if i remove operation in the filter, then i will cant use it for the output elasticsearch action. How can i do it?
my logstash filter and output config is like this:
filter {
mutate {
add_field => {"@fields" => "%{fields}"}
}
json {
source => "@fields"
remove_field => ["@fields","@version","@timestamp","fields"]
}
}

output {
if [operation] == "create" {
elasticsearch {
hosts => ["xx.xx.xx.xx:9200"]
index => "im"
timeout => 300
#user => "elastic"
#password => "changeme"
}
.....
and now the update and create operation result in elasticsearch is like this:
-{
"_id": "oyl5pGkBReZGJ_hya8ck",
"_index": "im",
"_score": 1,
-"_source": {
"accept": 0,
"content": "ssssssssssssss",
"created_at": "2019-03-22 16:14:45",
"from_id": 12796,
"id": "oyl5pGkBReZGJ_hya8ck",
"is_revoke": 0,
"operation": "update",
"sign": "im_messages",
"to_id": 12797
},
"_type": "doc"
},
-{
"_id": "An-BpGkBUAnQ0IgRfic8",
"_index": "im",
"_score": 1,
-"_source": {
"accept": 0,
"content": "???",
"created_at": "2019-03-22 10:21:56",
"from_id": 12796,
"is_revoke": 0,
"operation": "create",
"sign": "im_messages",
"to_id": 12797
},
"_type": "doc"
}
The operation and id params is what i want to remove in the source field.
what can i do?

If you want to sprintf reference a field in the output but do not want it as part of the event then move it to [@metadata].

 mutate { rename => { "[operation]" => "[@metadata][operation]" } }

yeah, it works, but new problem occurs.
I add metadata in the filter:
filter {
mutate {
add_field => {"@fields" => "%{fields}"}
rename => { "[operation]" => "[@metadata][operation]" }
rename => { "[id]" => "[@metadata][id]" }
}
json {
source => "@fields"
remove_field => ["@fields","@version","@timestamp","fields"]
}
}
and i use the id metadata to update a doc when the operation is set to update:
if [@metadata][operation] == "update" {
elasticsearch {
hosts => ["xx.xx.xx.xx:9200"]
index => "im"
action => "update"
doc_as_upsert => true
timeout => 300
document_id => "%{[@metadata][id]}"
#user => "elastic"
#password => "changeme"
}
}

the result is create a new doc other than update the exists one:

That suggests that the [id] field does not exist at the point where you do the mutate+rename. Is it created by the json filter? If so, move the rename+mutate of [id] after the json filter.

Also, please do not post pictures of text. Just post the text.

No, the [id] field is not created by json filter ,it is tranfer from the massage data, and i want the [id] for elasticsearch document_id which depends which doc to update, but not added to the source field.
my json data for update is like this:
{"operation":"update","fields":{"id":"sCmcs2kBReZGJ_hybsd4","content":"xxxxxxxxxxxxx","updated_at":"2019-03-25 14:14:45"}}
and i want to update this doc:
{

  • "_id": "sCmcs2kBReZGJ_hybsd4",
  • "_index": "im",
  • "_score": 1,

"_source": {

  • "accept": 0,
  • "content": "bbbbbbbbb",
  • "created_at": "2019-03-25 10:12:56",
  • "from_id": 12796,
  • "is_revoke": 0,
  • "sign": "im_messages",
  • "to_id": 12797},
  • "_type": "doc"

},

So when you say "No", you mean "Yes". The [id] field does not exist until the json filter is executed. Move the rename+mutate of [id] after the json filter

Yeah, I put the codec to json in the input settings, and remove the rename+mutate of [id], this can update the exact doc of elasticsearch, but the document source has a [id] field which i dont want to set in the source.
my logstash config file is setting like this:

input {
kafka {
bootstrap_servers => "xx.xx.xx.xx:9092"
#client_id => "logstash1" # The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included.
group_id => "im_group"
auto_offset_reset => "latest" #automatically reset the offset to the latest offset
consumer_threads => 3
topics => ["im"]
#type => "message"
codec => json {
charset => "UTF-8"
}
}
}

filter {
mutate {
add_field => {"@fields" => "%{fields}"}
rename => { "[operation]" => "[@metadata][operation]" }
#rename => { "[id]" => "[@metadata][id]" }
}
json {
source => "@fields"
remove_field => ["@fields","@version","@timestamp","fields"]
}
}

output {
if [@metadata][operation] == "create" {
elasticsearch {
hosts => ["xx.xx.xx.xx:9200"]
index => "im"
timeout => 300
#user => "elastic"
#password => "changeme"
}
} else if [@metadata][operation] == "update" {
elasticsearch {
hosts => ["xx.xx.xx.xx:9200"]
index => "im"
action => "update"
doc_as_upsert => true
timeout => 300
document_id => "%{[id]}"
#document_id => "%{[@metadata][id]}"
#user => "elastic"
#password => "changeme"
}
the result of update is like this:
{

  • "_id": "Bn-Jt2kBUAnQ0IgRcSc2",
  • "_index": "im",
  • "_score": 1,

"_source": {

  • "accept": 0,
  • "content": "xxxxxxxxxxxxx",
  • "created_at": "2019-03-25 10:12:56",
  • "from_id": 12796,
  • "id": "Bn-Jt2kBUAnQ0IgRcSc2",
  • "is_revoke": 0,
  • "sign": "im_messages",
  • "to_id": 12797,
  • "updated_at": "2019-03-25 14:14:45"},
  • "_type": "doc"

}
the [id] field in both the doc and the inside source of doc, i dont want it to add to the source

Finally, I give up this issue, and i find these extra fields have no influence in my normal function works. Thanks :smiley:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.