Question about logstash and kafka

Hi i would like to configure logstash with kafka in a way that based on the content of the message that logstash collect send to kafka in a different topic : for example if the message contains the string XXX i want send the message to kafka in the topic XXX, if the message collected from logstach containg the string YYY i want send this message to kafka in the topic YYY.
Is it possible configure logstash to do that ? if yes can you share with me an example of the config files ?
Thanks in advance

Try this output

 kafka {
     bootstrap_servers => "kafka:9092"
     topic_id => "Topic name"
  }

Hi Bouraoui .. thanks this was my initial setting . I need something of more complicated like :

input {
tcp {
port => 5400
codec => json
}
}
output {
if [message] =~ "XXX" {
kafka {
codec => json{}
topic_id => "XXX"
}
}
else if [message] =~ "YYY" {
kafka {
codec => json{}
topic_id => "YYY"
}
}
else {
kafka {
codec => json{}
topic_id => "ZZZ"
}
}
}

It is likely more efficient to do something like

filter
    if [message] =~ "XXX" {
        mutate { add_field => { "[@metadata][topic]" => "XXX" } }
    } else if [message] =~ "YYY" {
        mutate { add_field => { "[@metadata][topic]" => "YYY" } }
    } else {
        mutate { add_field => { "[@metadata][topic]" => "ZZZ" } }
    }
}
output {
    kafka {
        codec => json {}
        topic_id => "[@metadata][topic]"
    }
}
1 Like

Ok thanks Badger
a lot

Try to add a field to the message int the filter like this after extract the topicName :
add_field => {
"[@metadata][topic]" => XXX
}

Then you test in the output by :
If([@metadata][topic]" == XXX) {

            }

I tried that with file type but not with kafka, i think it is the same

which is the variable that contains the complete event received from logstash ?
i ask you this because it always match the 3rd condition .

This is an example in JSON format that arrive to Kafka ..

{"SessID":16843159,"APID":16843159,"Pgm":"JVMLDM80","Reads":1,"Type":"Directory","ACEELOGU":false,"JobNm":"TOMCAT","JSauth":false,"TokPRIV":false,"ACEEADSP":false,"TokRSPEC":false,"TokUDUS":false,"@version":"1","DirBlks":6,"Group":"OMVSGRP","Name":"####################","HostName":"XXXX","StepNm":"JAVAJVM","JobID":"S0012440","AGPID":16843159,"OUid":900005,"Open":"2020-07-09T17:15:54.298","ACEEAUDT":false,"@timestamp":"2020-07-09T17:15:54.442Z","DevNo":"00000a","Close":"2020-07-09T17:15:54.298","Inode":185,"Cat":"FS","ACEEROA":false,"Time":"2020-07-09T17:15:54.299","ASessID":16843159,"RType":92,"OGroup":1,"SubT":"File close","Token":7407136,"TokTRST":false,"ACEE":true,"PID":16843159,"FName":"/SND1/u/tomcat/conf/Catalina/snd1.bmc.com","SAF":1,"SAFD":"XXXX","PGroup":16843159,"ACEEPRIV":false,"host":"192.168.0.1","ACEESPEC":false,"TokSUS":false,"SessType":"Started Procedure","Start":"2020-07-05T23:07:23.230","ACEEFLG1":"Defined","Severity":"Info","port":12679,"TokFlg1":"Pre 1.9","WorkTypeD":"Started task","PrivStatD":"Normal user","BlksRead":13,"SID":"SND1","ACEEOPER":false,"TokFlg3":"Default SECLABEL, Default Group","UserID":"TOMCAT"}

the config suggested is not able to find a match in the above msg received as i expect .
Thanks

Are you using a json codec on the input? If you are then you will not have a field called [message]

yes i have json codec in input .
input {
tcp {
port => 5400
codec => json
}
}

which variable can i use ? instead of message ?

If you want to check whether a particular field contains XXX/YYY/etc. then test that field. If you want to check whether any field contains it, then change the codec to plain, test [message] to set [@metadata][topic] and then after that use

 json { source => "message" remove_field => [ "message" ] }

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.