Logstash Kafka input : conditonnal consuming?

Hello !

I have multiple firewall log sources. All these sources push the logs in a dedicated topic named "firewall".

In logstash, I would like to have a different pipeline for each of these sources to apply different processing and use different index. For example, one pipeline for Fortigate and one pipeline for Juniper.

Let's check with the following Fortigate pipeline (I didn't changed kafka group id which is by default "logstash") :

input {
  kafka {
   topics => ["firewall"]
   codec => json
   tags => ["Fortigate"]
}
}

filter{
}

output {
if "Fortigate" in [tags] {
  elasticsearch {
  hosts => ["elastic1:9200"]
 index => "firewall"
}
}
}

It works. BUT if I specify a different filter in my output as this :

output {
if "TEST" in [tags] {
  elasticsearch {
  hosts => ["elastic1:9200"]
}
}
}

I can see that topic is still consumed (lag is not increasing). For me it should not be consumed because the tag is not the good one in the output.

This way, If i create another pipeline "Juniper", I think that data will be already consumed by the Fortigate pipeline.

So, does it means that, in any case, all data is consumed whatever is the filter used in my output ?

How can I deal with my needs ?

Thanks for your help !

Not sure I understood the question, but if an output is conditional

output {
    if "Fortigate" in [tags] {
        elasticsearch {

if the condition does not evaluate to true then the event is not sent to an output, it is discarded.

Yes I agree with this.

My question is : Even if I don't use an output, logs will be consumed ?

If you do not define an output section then I do not think the pipeline will be executed, however, the output section does not have to send anything to an output. It is OK if the conditional is never true.

I made a test (no output,no filter, just my kafka input) and when looking at kafka metrics on a kafka machine (kafka-consumer-group.sh ...), I can see logs are consumed as lag is not increasing. If it was not consumed, this would not be the case (in my understanding of how kafka works)

So even if I don't have any output, pipeline is executed.

I think that you misunderstand how Kafka works. If you want two different pipelines to both be able to consume all of the events in a topic, each pipeline must be configured for a separate consumer group using the group_id option.

Thank you Rob. It makes sense after reading more in depth Kafka documentation.

So, ok I use a different group id for each pipeline (cisco-pipeline.conf, juniper-pipeline.conf..).

But I'm asking what becomes the messages that does not match my filter ?

Are they discarded once consumed/acknowledged ? They are never wrote ? even temporary in memory ?

You haven't really filtered on anything. At the moment it doesn't look like you are thinking about this problem the right way. I believe what you really are trying to build is this...kafka_logstash_siem.pdf

1 Like

Not agree with that : I filtered using the tag :

output {
    if "Fortigate" in [tags] {
        elasticsearch {

If I put an invalid tag, no data is written to Elasticsearch so for me it's filtering/working

Your project looks great. Will take some time to have a look on it !

Not really. In your kafka input you assign the tag "Fortigate" to all of the messages consumed from the "firewall" topic. Then in the output you check to see if "Fortigate" is a tag. Of course it is always a tag because you assigned it to every event in the input. So the end result is that you haven't filtered anything.

Yes... You've got a point ! :slight_smile: I didn't updated my post but after some tests I removed the tag at this input and I added it on the logstash which as a collector :

Fortigate -> Logstash collector (where I add the tag for the fortigate syslog input) -> Kakfa -> Logstash (which do processing and use the tag added in the previous logstash for the kafka input)

If the collector is adding the tag, why use a tag at all? Why not just produce the record to a Kafka topic called "fortigate"? The other pipeline consumes from the "fortigate" topic. You then no longer need a filter in the output, because this pipeline gets ONLY fortigate events. You also avoid consuming other logs that aren't fortigate and having to discard them.

Yes. I could do that. I thought that, maybe, it was better to minimize the number of topics... To facilitate maintenance. Maybe this is not a good idea after all :thinking: