Unable to push messages to Kafka streams

Hi Team,

I'm trying to send application logs (filebeat -> logstash-> kafka topic) to kafka stream. For some reason I'm not able to see logs in kafka.

Here is my filebeat.conf

filebeat.spool_size: 2048
filebeat.idle_timeout: 5s
fields:
  envName: gxpbt
output.logstash:
  hosts: ['xyz.logstash.com:6045']

filebeat.inputs:
- type: log
  paths:
   - /var/log/osquery/xyz.results.log
  multiline.type: pattern
  multiline.pattern: '^{'
  multiline.negate: true
  multiline.match:  after
  fields:
    host_ip: "10.0.0.1"
    hostname: "test-compute"
    logtype: osquerylog
  json.message_key: log

Logstash conf file:

input {
  beats {
    port => 6045
  }
}
filter {
  if [fields][logtype] == "osquerylog" {
       grok {
        match => ["[log][file][path]", "%{GREEDYDATA:message}"]
        add_tag => "osquerylog"
   }
  }
}

output {
  if "osquerylog" in [tags] {
    kafka {
    codec => "json"
    bootstrap_servers => "https://xyz.kafka.com:9092"
    ssl_truststore_location => "/etc/certs/kafkatruststore.jks"
    ssl_truststore_password => "1234567"
    ssl_truststore_type => "JKS"
    ssl_keystore_location => "/etc/certs/kafkakeystore.jks"
    ssl_keystore_password => "1234567"
    ssl_keystore_type => "JKS"
    sasl_mechanism => "PLAIN"
    security_protocol => "SASL_SSL"
    request_timeout_ms => "5000"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='test' password='test123';"
    topic_id => "osquery_log"
    }
  }
}

I'm seeing following log messages in logstash logs,

[2022-07-12T20:19:48,773][INFO ][org.apache.kafka.common.security.authenticator.AbstractLogin][main] Successfully logged in.
[2022-07-12T20:19:48,846][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka version: 2.5.1
[2022-07-12T20:19:48,846][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka commitId: 0efa8fb0f4c73d92
[2022-07-12T20:19:48,846][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka startTimeMs: 1657657188843

Please help me understand why I am not able to see events in kafka.

The first step would be to add

output { stdout { codec => rubydebug } }

(or a file output) to make sure that the events are reaching logstash and have the fields that you expect.

Thanks for your reply. I can see logs being sent, because of the kafka stream offset all events were not reaching.

I'have following filebeat.conf file,

fields:
  envName: gxpbt
output.logstash:
  hosts: ['xyz.logstash.com:6045']

filebeat.inputs:
- type: log
  paths:
   - /var/log/osquery/osquery.results.log
  multiline.type: pattern
  multiline.pattern: '^{'
  multiline.negate: true
  multiline.match:  after
  fields:
    host_ip: "10.0.0.1"
    hostname: "test-compute"
    regio: "ashburn"  
  json.message_key: log'
- type: log
  paths:
   - /var/log/hostname/forwarded-logs.log
  fields:
    logtype: rsyslog
- type: log
  paths:
    - /u01/data/domains/*_domain/servers/*/logs/access.log
  fields:
    logtype: wlsAccess

Logstash.conf:

input {
  beats {
    port => 6045
  }
}

  if [fields][region] == "ashburn" {
       grok {
        match => ["message", "%{GREEDYDATA:message}"]
        add_tag => "osquerylog"
   }
  }
 
  if [fields][logtype] == "wlsAccess" {
        grok {
                match => ["[log][file][path]","%{GREEDYDATA}/%{DATA:servername}/logs/%{DATA:dirname}/%{GREEDYDATA:filename}"]
                }
  }
  if [fields][logtype] == "rsyslog" {
       grok {
        match => ["message", "%{GREEDYDATA:message}"]
        add_tag => "rsyslog"
   }
  }

  else {
   grok {
    match => ["[log][file][path]","%{GREEDYDATA}/%{DATA:servername}/logs/%{GREEDYDATA:filename}"]
   }
  }

output {
  if "osquerylog" in [tags] {
    kafka {
    codec => "json"
    bootstrap_servers => "https://xyz.kafka.com:9092"
    ssl_truststore_location => "/etc/certs/kafkatruststore.jks"
    ssl_truststore_password => "1234567"
    ssl_truststore_type => "JKS"
    ssl_keystore_location => "/etc/certs/kafkakeystore.jks"
    ssl_keystore_password => "1234567"
    ssl_keystore_type => "JKS"
    sasl_mechanism => "PLAIN"
    security_protocol => "SASL_SSL"
    request_timeout_ms => "5000"
    sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='test' password='test123';"
    topic_id => "osquery_log"
    }
  }
  if "rsyslog" in [tags] {
       syslog {
        appname => "SIEM"
        host => "10.x.x.52"
        port => "514"
        protocol => "tcp"
        codec => line { format => "%{message}" }
    }
  }
  stdout{}
  else{
   file {
     codec => line {
       format => "%{[message]}"
     }
     path => "/mnt/shared_fs_03/%{[fields][envName]}/%{[host][name]}/%{[servername]}/%{[filename]}-%{+YYYY-MM-dd}"
   }
    #stdout{}
 }
}

Could you please confirm if I can use IF and else conditions and filters?

Yes, logstash supports conditionals in the filter section.

@Badger
For example I have field "region is ashburn" under filebeat.conf. can I use the if condition as above. Can I write if else conditions as above?

Provided that the names match (region vs. regio) then the answer to both questions is yes.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.