[Logstash] [Filebeat] Using codec Plain and JSON for the same input

Hi,

Actually we have this architecture : Filebeat --> Kafaka --> Logstash

The Logstash is using this input config :

file1.conf

input {
  kafka {
    codec => json
    bootstrap_servers => "....."
    topics_pattern => [ "prd*" ]
    auto_offset_reset => earliest
    decorate_events => true
  }
}
filter {
...
}

file2.conf

input {
  kafka {
    codec => plain
    bootstrap_servers => "....."
    topics_pattern => [ "prd*" ]
    auto_offset_reset => earliest
    decorate_events => true
  }
}
filter {
...
}

So for file1 we use JSON codec and for file2 we use PLAIN.

Now we are going to remove the Kafka so will have this : Filebeat --> Logstash


input {
  beats {
    port => 5044
    codec => plain
    ssl => true
    ssl_certificate_authorities => ["/etc/ca.crt"]
    ssl_certificate => "/etc/logstash.crt"
    ssl_key => "/etc/logstash.key"
    ssl_verify_mode => "force_peer"
  }
}

The problem is when previously we had 2 input : one for JSON and one for PLAIN, i can't see how to that with a single INPUT Beats ?

Thank you

Any idea ?

hi @hdryx Welcome to the community, apologies for taking so long to get back to you.

Are the plain and json coming from 2 different sources / filebeat?

If so just use 2 ports / 2 inputs like below and of course set the logstash output in each filebeat to the correct port.

file2.conf

input {
  beats {
    port => 5044
    codec => json

file2.conf

input {
  beats {
    port => 5045
    codec => plain

Non they are coming from the same beat, so i have to expose only 1 port

With the old configuration did the same events go through both kafka inputs? If not, what determined whether they were treated as json or plain?

1 Like

Yes, Logstash gets all logs from Kafka

@hdryx Thanks but that is not what @Badger is asking

We are asking what determines the file is plain vs json how did you identify which is which?

Can you share your current filebeat configs for the 2 different types?

Sorry I don't have access to filebeat config files.

I think that each input configuration defines how should data be considered (see examples of file1 and file2)

It is not possible to use two input codecs, you can use only one, so you will need to use the plain codec in the beats input and use a json filter to parse your json messages.

You will need to be able to determine which log is a plain text message and which log is a json message, which is what was already asked.

This should be done in Filebeat, so you need to share your filebeat configuration.

The kafka inputs that you shared does not help, they are basically the same, pointing to the same topic patterns, it is not possible to know if they point to the same kafka cluster or not.

You will probably need to add a tag or a new field in each filebeat input so you can use this information in Logstash and apply the json filter.

Something like this:

if "json" in [tags] {
    json {
        source => "message"
    }
}
1 Like