Unable to push messages to Kafka streams

Hi Team,

I'm trying to send application logs (filebeat -> logstash-> kafka topic) to kafka stream. For some reason I'm not able to see logs in kafka.

Here is my filebeat.conf

filebeat.spool_size: 2048
filebeat.idle_timeout: 5s
fields:
  envName: gxpbt
output.logstash:
  hosts: ['xyz.logstash.com:6045']

filebeat.inputs:
- type: log
  paths:
   - /var/log/osquery/xyz.results.log
  multiline.type: pattern
  multiline.pattern: '^{'
  multiline.negate: true
  multiline.match:  after
  fields:
    host_ip: "10.0.0.1"
    hostname: "test-compute"
    logtype: osquerylog
  json.message_key: log

Logstash conf file:

input {
  beats {
    port => 6045
  }
}
filter {
  if [fields][logtype] == "osquerylog" {
       grok {
        match => ["[log][file][path]", "%{GREEDYDATA:message}"]
        add_tag => "osquerylog"
   }
  }
}

output {
  if "osquerylog" in [tags] {
    kafka {
    codec => "json"
    bootstrap_servers => "https://xyz.kafka.com:9092"
    ssl_truststore_location => "/etc/certs/kafkatruststore.jks"
    ssl_truststore_password => "1234567"
    ssl_truststore_type => "JKS"
    ssl_keystore_location => "/etc/certs/kafkakeystore.jks"
    ssl_keystore_password => "1234567"
    ssl_keystore_type => "JKS"
    sasl_mechanism => "PLAIN"
    security_protocol => "SASL_SSL"
    request_timeout_ms => "5000"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='test' password='test123';"
    topic_id => "osquery_log"
    }
  }
}

I'm seeing following log messages in logstash logs,

[2022-07-12T20:19:48,773][INFO ][org.apache.kafka.common.security.authenticator.AbstractLogin][main] Successfully logged in.
[2022-07-12T20:19:48,846][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka version: 2.5.1
[2022-07-12T20:19:48,846][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka commitId: 0efa8fb0f4c73d92
[2022-07-12T20:19:48,846][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka startTimeMs: 1657657188843

Please help me understand why I am not able to see events in kafka.

The first step would be to add

output { stdout { codec => rubydebug } }

(or a file output) to make sure that the events are reaching logstash and have the fields that you expect.

Thanks for your reply. I can see logs being sent, because of the kafka stream offset all events were not reaching.

I'have following filebeat.conf file,

fields:
  envName: gxpbt
output.logstash:
  hosts: ['xyz.logstash.com:6045']

filebeat.inputs:
- type: log
  paths:
   - /var/log/osquery/osquery.results.log
  multiline.type: pattern
  multiline.pattern: '^{'
  multiline.negate: true
  multiline.match:  after
  fields:
    host_ip: "10.0.0.1"
    hostname: "test-compute"
    regio: "ashburn"  
  json.message_key: log'
- type: log
  paths:
   - /var/log/hostname/forwarded-logs.log
  fields:
    logtype: rsyslog
- type: log
  paths:
    - /u01/data/domains/*_domain/servers/*/logs/access.log
  fields:
    logtype: wlsAccess

Logstash.conf:

input {
  beats {
    port => 6045
  }
}

  if [fields][region] == "ashburn" {
       grok {
        match => ["message", "%{GREEDYDATA:message}"]
        add_tag => "osquerylog"
   }
  }
 
  if [fields][logtype] == "wlsAccess" {
        grok {
                match => ["[log][file][path]","%{GREEDYDATA}/%{DATA:servername}/logs/%{DATA:dirname}/%{GREEDYDATA:filename}"]
                }
  }
  if [fields][logtype] == "rsyslog" {
       grok {
        match => ["message", "%{GREEDYDATA:message}"]
        add_tag => "rsyslog"
   }
  }

  else {
   grok {
    match => ["[log][file][path]","%{GREEDYDATA}/%{DATA:servername}/logs/%{GREEDYDATA:filename}"]
   }
  }

output {
  if "osquerylog" in [tags] {
    kafka {
    codec => "json"
    bootstrap_servers => "https://xyz.kafka.com:9092"
    ssl_truststore_location => "/etc/certs/kafkatruststore.jks"
    ssl_truststore_password => "1234567"
    ssl_truststore_type => "JKS"
    ssl_keystore_location => "/etc/certs/kafkakeystore.jks"
    ssl_keystore_password => "1234567"
    ssl_keystore_type => "JKS"
    sasl_mechanism => "PLAIN"
    security_protocol => "SASL_SSL"
    request_timeout_ms => "5000"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='test' password='test123';"
    topic_id => "osquery_log"
    }
  }
  if "rsyslog" in [tags] {
       syslog {
        appname => "SIEM"
        host => "10.x.x.52"
        port => "514"
        protocol => "tcp"
        codec => line { format => "%{message}" }
    }
  }
  stdout{}
  else{
   file {
     codec => line {
       format => "%{[message]}"
     }
     path => "/mnt/shared_fs_03/%{[fields][envName]}/%{[host][name]}/%{[servername]}/%{[filename]}-%{+YYYY-MM-dd}"
   }
    #stdout{}
 }
}

Could you please confirm if I can use IF and else conditions and filters?

Yes, logstash supports conditionals in the filter section.

@Badger
For example I have field "region is ashburn" under filebeat.conf. can I use the if condition as above. Can I write if else conditions as above?

Provided that the names match (region vs. regio) then the answer to both questions is yes.