Hi Team,
I'm trying to send application logs (filebeat -> logstash-> kafka topic) to kafka stream. For some reason I'm not able to see logs in kafka.
Here is my filebeat.conf
filebeat.spool_size: 2048
filebeat.idle_timeout: 5s
fields:
envName: gxpbt
output.logstash:
hosts: ['xyz.logstash.com:6045']
filebeat.inputs:
- type: log
paths:
- /var/log/osquery/xyz.results.log
multiline.type: pattern
multiline.pattern: '^{'
multiline.negate: true
multiline.match: after
fields:
host_ip: "10.0.0.1"
hostname: "test-compute"
logtype: osquerylog
json.message_key: log
Logstash conf file:
input {
beats {
port => 6045
}
}
filter {
if [fields][logtype] == "osquerylog" {
grok {
match => ["[log][file][path]", "%{GREEDYDATA:message}"]
add_tag => "osquerylog"
}
}
}
output {
if "osquerylog" in [tags] {
kafka {
codec => "json"
bootstrap_servers => "https://xyz.kafka.com:9092"
ssl_truststore_location => "/etc/certs/kafkatruststore.jks"
ssl_truststore_password => "1234567"
ssl_truststore_type => "JKS"
ssl_keystore_location => "/etc/certs/kafkakeystore.jks"
ssl_keystore_password => "1234567"
ssl_keystore_type => "JKS"
sasl_mechanism => "PLAIN"
security_protocol => "SASL_SSL"
request_timeout_ms => "5000"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='test' password='test123';"
topic_id => "osquery_log"
}
}
}
I'm seeing following log messages in logstash logs,
[2022-07-12T20:19:48,773][INFO ][org.apache.kafka.common.security.authenticator.AbstractLogin][main] Successfully logged in.
[2022-07-12T20:19:48,846][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka version: 2.5.1
[2022-07-12T20:19:48,846][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka commitId: 0efa8fb0f4c73d92
[2022-07-12T20:19:48,846][INFO ][org.apache.kafka.common.utils.AppInfoParser][main] Kafka startTimeMs: 1657657188843
Please help me understand why I am not able to see events in kafka.
Badger
July 12, 2022, 8:44pm
2
The first step would be to add
output { stdout { codec => rubydebug } }
(or a file output) to make sure that the events are reaching logstash and have the fields that you expect.
Thanks for your reply. I can see logs being sent, because of the kafka stream offset all events were not reaching.
I'have following filebeat.conf file,
fields:
envName: gxpbt
output.logstash:
hosts: ['xyz.logstash.com:6045']
filebeat.inputs:
- type: log
paths:
- /var/log/osquery/osquery.results.log
multiline.type: pattern
multiline.pattern: '^{'
multiline.negate: true
multiline.match: after
fields:
host_ip: "10.0.0.1"
hostname: "test-compute"
regio: "ashburn"
json.message_key: log'
- type: log
paths:
- /var/log/hostname/forwarded-logs.log
fields:
logtype: rsyslog
- type: log
paths:
- /u01/data/domains/*_domain/servers/*/logs/access.log
fields:
logtype: wlsAccess
Logstash.conf:
input {
beats {
port => 6045
}
}
if [fields][region] == "ashburn" {
grok {
match => ["message", "%{GREEDYDATA:message}"]
add_tag => "osquerylog"
}
}
if [fields][logtype] == "wlsAccess" {
grok {
match => ["[log][file][path]","%{GREEDYDATA}/%{DATA:servername}/logs/%{DATA:dirname}/%{GREEDYDATA:filename}"]
}
}
if [fields][logtype] == "rsyslog" {
grok {
match => ["message", "%{GREEDYDATA:message}"]
add_tag => "rsyslog"
}
}
else {
grok {
match => ["[log][file][path]","%{GREEDYDATA}/%{DATA:servername}/logs/%{GREEDYDATA:filename}"]
}
}
output {
if "osquerylog" in [tags] {
kafka {
codec => "json"
bootstrap_servers => "https://xyz.kafka.com:9092"
ssl_truststore_location => "/etc/certs/kafkatruststore.jks"
ssl_truststore_password => "1234567"
ssl_truststore_type => "JKS"
ssl_keystore_location => "/etc/certs/kafkakeystore.jks"
ssl_keystore_password => "1234567"
ssl_keystore_type => "JKS"
sasl_mechanism => "PLAIN"
security_protocol => "SASL_SSL"
request_timeout_ms => "5000"
sasl_jaas_config => "org.apache.kafka.common.security.plain.PlainLoginModule required username='test' password='test123';"
topic_id => "osquery_log"
}
}
if "rsyslog" in [tags] {
syslog {
appname => "SIEM"
host => "10.x.x.52"
port => "514"
protocol => "tcp"
codec => line { format => "%{message}" }
}
}
stdout{}
else{
file {
codec => line {
format => "%{[message]}"
}
path => "/mnt/shared_fs_03/%{[fields][envName]}/%{[host][name]}/%{[servername]}/%{[filename]}-%{+YYYY-MM-dd}"
}
#stdout{}
}
}
Could you please confirm if I can use IF and else conditions and filters?
Badger
July 15, 2022, 4:23pm
4
Yes, logstash supports conditionals in the filter section.
@Badger
For example I have field "region is ashburn" under filebeat.conf. can I use the if condition as above. Can I write if else conditions as above?
Badger
July 15, 2022, 4:39pm
6
Provided that the names match (region vs. regio) then the answer to both questions is yes.
system
(system)
Closed
August 12, 2022, 4:39pm
7
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.