we are trying to add a new feild to the json document which we get from kafka, where this field needs to have an unique id
we tried with following options with in filter
mutate
grok
json
nothing worked for us below is the input message that we get from kafka
{
"timeMillis" : 1468957083192,
"thread" : "restartedMain",
"level" : "ERROR",
"message" : "Started Application in 5.19 seconds (JVM running for 5.866)",
"source" : {
"class" : "org.springframework.boot.StartupInfoLogger",
"method" : "logStarted",
"file" : "StartupInfoLogger.java",
"line" : 57
},
"endOfBatch" : false,
"logEventId" : "1c9f2239-543d-436e-873d-7e143f33ac7c",
"loggerFqcn" : "org.apache.commons.logging.impl.SLF4JLocationAwareLog",
"test" : "tesssssssing",
"hostName" : "24a074f2dde8",
"applicationName" : "@project.artifactId@",
"errorId" : 101
}
below is our logstash cofiguration we are using ansible to deploy it
input {
kafka {
topic_id => "{{kafka_log_consumer_topic_id}}"
group_id => "{{kafka_consumer_group_id}}"
zk_connect => "{{zk_consumer_connect_url}}"
}
}
filter{
json{
add_field => {"testId" => "sample"}
}
}
output {
if [level] == "ERROR" or [level] == "WARN" or [level] == "FATAL" {
elasticsearch {
hosts => "{{elastic_search_hosts}}"
}
kafka {
topic_id => "{{kafka_error_publish_topic}}"
bootstrap_servers => "{{kafka_error_publish_host}}"
}
}
else {
elasticsearch {
hosts => "{{elastic_search_hosts}}"
}
}
}
tried with
filter{
json{
add_field => {"testId" => "sample"}
}
}
filter{
mutate{
add_field => {"testId" => "sample"}
}
}
filter {
uuid {
add_field => {"testId" => "@uuid"}
}
}