Hi everyone!
I'm using the ELK stack with Kafka to collect and analyze logs from my K8s environment.
We have different log formats, mainly plaintext and JSON.
How can I process them in Logstash properly?
Right now, plain logs have the wrong breakdown.
For example, a plaintext message is breaking into two or more messages.
My simplified config is :
input {
kafka {
bootstrap_servers => "kafka_ip:port"
topics => ["sample_topic"]
group_id => "elk"
client_id => "logstash-sample"
auto_offset_reset => "earliest"
consumer_threads => 4
codec => json
decorate_events => true
add_field => { "kafka_topic" => "sample_topic"}
}
output {
if [kafka_topic] == "sample_topic" {
elasticsearch {
hosts => ["localhost:9200"]
index => "sample_topic-%{+YYYY.MM.dd}"
}
}
Would appreciate any help,
Thanks in advance.