Hi i would like to configure logstash with kafka in a way that based on the content of the message that logstash collect send to kafka in a different topic : for example if the message contains the string XXX i want send the message to kafka in the topic XXX, if the message collected from logstach containg the string YYY i want send this message to kafka in the topic YYY.
Is it possible configure logstash to do that ? if yes can you share with me an example of the config files ?
Thanks in advance
Try this output
kafka {
bootstrap_servers => "kafka:9092"
topic_id => "Topic name"
}
Hi Bouraoui .. thanks this was my initial setting . I need something of more complicated like :
input {
tcp {
port => 5400
codec => json
}
}
output {
if [message] =~ "XXX" {
kafka {
codec => json{}
topic_id => "XXX"
}
}
else if [message] =~ "YYY" {
kafka {
codec => json{}
topic_id => "YYY"
}
}
else {
kafka {
codec => json{}
topic_id => "ZZZ"
}
}
}
It is likely more efficient to do something like
filter
if [message] =~ "XXX" {
mutate { add_field => { "[@metadata][topic]" => "XXX" } }
} else if [message] =~ "YYY" {
mutate { add_field => { "[@metadata][topic]" => "YYY" } }
} else {
mutate { add_field => { "[@metadata][topic]" => "ZZZ" } }
}
}
output {
kafka {
codec => json {}
topic_id => "[@metadata][topic]"
}
}
Ok thanks Badger
a lot
Try to add a field to the message int the filter like this after extract the topicName :
add_field => {
"[@metadata][topic]" => XXX
}
Then you test in the output by :
If([@metadata][topic]" == XXX) {
}
I tried that with file type but not with kafka, i think it is the same
which is the variable that contains the complete event received from logstash ?
i ask you this because it always match the 3rd condition .
This is an example in JSON format that arrive to Kafka ..
{"SessID":16843159,"APID":16843159,"Pgm":"JVMLDM80","Reads":1,"Type":"Directory","ACEELOGU":false,"JobNm":"TOMCAT","JSauth":false,"TokPRIV":false,"ACEEADSP":false,"TokRSPEC":false,"TokUDUS":false,"@version":"1","DirBlks":6,"Group":"OMVSGRP","Name":"####################","HostName":"XXXX","StepNm":"JAVAJVM","JobID":"S0012440","AGPID":16843159,"OUid":900005,"Open":"2020-07-09T17:15:54.298","ACEEAUDT":false,"@timestamp":"2020-07-09T17:15:54.442Z","DevNo":"00000a","Close":"2020-07-09T17:15:54.298","Inode":185,"Cat":"FS","ACEEROA":false,"Time":"2020-07-09T17:15:54.299","ASessID":16843159,"RType":92,"OGroup":1,"SubT":"File close","Token":7407136,"TokTRST":false,"ACEE":true,"PID":16843159,"FName":"/SND1/u/tomcat/conf/Catalina/snd1.bmc.com","SAF":1,"SAFD":"XXXX","PGroup":16843159,"ACEEPRIV":false,"host":"192.168.0.1","ACEESPEC":false,"TokSUS":false,"SessType":"Started Procedure","Start":"2020-07-05T23:07:23.230","ACEEFLG1":"Defined","Severity":"Info","port":12679,"TokFlg1":"Pre 1.9","WorkTypeD":"Started task","PrivStatD":"Normal user","BlksRead":13,"SID":"SND1","ACEEOPER":false,"TokFlg3":"Default SECLABEL, Default Group","UserID":"TOMCAT"}
the config suggested is not able to find a match in the above msg received as i expect .
Thanks
Are you using a json codec on the input? If you are then you will not have a field called [message]
yes i have json codec in input .
input {
tcp {
port => 5400
codec => json
}
}
which variable can i use ? instead of message ?
If you want to check whether a particular field contains XXX/YYY/etc. then test that field. If you want to check whether any field contains it, then change the codec to plain, test [message] to set [@metadata][topic] and then after that use
json { source => "message" remove_field => [ "message" ] }
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.