Hi, i have this json message:
{id:123456,time:20190225,color:black,result:1}
i want append filed to this json and send to kafka. What i want:
{id:123456,time:20190225,color:black,result:1,tag:server1}
How should I do this? thanks.
How are you sending it to kafka?
Hi.
my logstash config
is:
input {
kafka {
bootstrap_servers => "172.16.70.4:9092"
group_id => "all_topic"
topics => ["topic1", "topic2"]
auto_offset_reset => "earliest"
decorate_events => true
}
}
output {
if ([@metadata][kafka][topic] == "topic1") {
kafka {
bootstrap_servers => "172.16.70.44:9092"
topic_id => "new_topic1"
codec => line { format => "%{message}" }
}
}
if ([@metadata][kafka][topic] == "topic2") {
kafka {
bootstrap_servers => "172.16.70.44:9092"
topic_id => "new_topic2"
codec => line { format => "%{message}" }
}
}
}
and message is json format from server 172.16.70.4
:
{id:123456,time:20190225,color:black,result:1}
I want after send to server 172.16.70.44
add field to message, this like:
{id:123456,time:20190225,color:black,result:1,tag:server1}
I add filter like this:
filter {
json {
source => "message"
}
mutate { add_field => {"tag" => "%{[message][server1]}"}}
mutate { add_field => {"tag" => "server1"} }
mutate { add_field => {"[@metadata][tag]" => "server1"} }
}
But nothing happens. No fields are added in any way.
my new config:
input {
kafka {
bootstrap_servers => "172.16.70.4:9092"
group_id => "all_topic"
topics => ["topic1", "topic2"]
auto_offset_reset => "earliest"
decorate_events => true
}
}
filter {
json {
source => "message"
add_field => {"tag" => "hello"}
add_tag => [ "mytag" ]
id => "my_id"
remove_field => [ "color" ]
}
mutate { add_field => {"tag" => "%{[message][my_tag]}"}}
mutate { add_field => {"tag" => "my_tag"} }
mutate { add_field => {"[@metadata][tag]" => "my_tag"} }
mutate { add_field => {"[docs][test]" => "my_tag"} }
}
output {
if ([@metadata][kafka][topic] == "topic1") {
kafka {
bootstrap_servers => "172.16.70.44:9092"
topic_id => "new_topic1"
codec => line { format => "%{message}" }
}
}
if ([@metadata][kafka][topic] == "topic2") {
kafka {
bootstrap_servers => "172.16.70.44:9092"
topic_id => "new_topic2"
codec => line { format => "%{message}" }
}
}
}
I checked all method
and filter
but Field or tag not added
or field not remove
and output
is the original json
message.
You are confusing the event and the message field on the event.
Let's start with a generator input and a stdout output.
input { generator { count => 1 message => '{"id": 123456, "time": "20190225", "color": "black", "result": 1}' } }
filter {
}
output { stdout { codec => rubydebug { metadata => false } } }
This gets us this event
{
"host" => "my.example.com",
"@version" => "1",
"message" => "{\"id\": 123456, \"time\": \"20190225\", \"color\": \"black\", \"result\": 1}",
"@timestamp" => 2019-04-09T12:13:19.831Z,
"sequence" => 0
}
The message field is what we added with the generator input, and logstash has added 4 other fields to the event. If we then add a json filter
filter {
json { source => "message" }
}
Our event acquires additional fields, but the [message] field is unchanged.
{
"sequence" => 0,
"host" => "my.example.com",
"time" => "20190225",
"color" => "black",
"@timestamp" => 2019-04-09T12:15:31.618Z,
"message" => "{\"id\": 123456, \"time\": \"20190225\", \"color\": \"black\", \"result\": 1}",
"@version" => "1",
"id" => 123456,
"result" => 1
}
We can then add a field
mutate { add_field => { "tag" => "server1" } }
and the [message] field is still unchanged, but the event acquires another field
"tag" => "server1",
You are referencing the [message] field in your outputs, so you get what is in that field.
We can switch the output codec
output { stdout { codec => json_lines } }
and we start getting JSON output
{"sequence":0,"host":"ip-172-31-37-147.us-east-2.compute.internal","time":"20190225","color":"black","@timestamp":"2019-04-09T12:18:13.010Z","message":"{\"id\": 123456, \"time\": \"20190225\", \"color\": \"black\", \"result\": 1}","tag":"server1","@version":"1","id":123456,"result":1}
If you do not want all the extra fields then you can
mutate { remove_field => [ "@timestamp", "@version", "host", "message", "sequence" ] }
and just get
{"time":"20190225","color":"black","tag":"server1","id":123456,"result":1}
thanks a lot @Badger. You were right. I was confused.With your good training; my problem was solved
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.