I have a logstash setup in which the data comes from beats then it will output to elasticsearch and kafka.
I used avro codec to both output plugin.
Upon checking, the output in elasticsearch is just like a json c0dec and there's a tag "beats_input_codec_plain_applied"
. From the tag, does it mean that it did not convert to avro format?
In kafka, the messages that i received are unreadable. I'm not sure if what are these messages.
`JjIwMTgtMDgtMTQgMjM6NTk6NTkQMjAxNy4wMTIQMS4xMDcwOTkIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjEQMTE3Mi45MzEQMy4zMDg3NDUIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjIQMTA0MC41NzESMC45ODQ1NDc1CDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MjMQMTAwMi4wMjMSMC45MDQ2ODI2CDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MjQQMTIwNC45NDEOMi40NzIyMQgyMTEyDkRBIFRlYW0MWW9kYSAyBlNWVAIwCDkwMDYGU1ZUCkFXU1BI
JjIwMTgtMDgtMTUgMTE6NTM6MjYQMTE1Mi43MDkSMC45ODAyNTUzCDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MjgQMTEwOS42OTEQMS4wNTUxOTgIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjcQMTE4Mi4yMzcQMi4xNzg4NjMIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjUQMTEzMS4xODQQMS4wNTc3NzQIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjkMMTI0NC41EDcuMTg1ODM3CDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MzEQMTMwMy4zNjkQNi42MjAzMDkIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MzAQMTM0MS44MDIQMTIuMDYxMDYIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MzQQMTA5NC44ODgSMC43MDk5MDIzCDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MzIQMTEwOC4yNDQQMS4yNDA3MTgIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA= `
Here is the sample of my config:
input{
beats{
port => 5044
}
}
filter {
if [module_name] == "Module1"{
csv {
separator => ","
columns => ["logtime", "name", "address", "age"]
}
}
if [module_name] == "Module2"{
csv {
separator => ","
columns => ["logtime", "data1", "data2", "data3" ]
}
}
mutate{
add_field => {
"version" => "logs v.20.1"
}
}
fingerprint{
target => "generated_id"
method => "UUID"
}
}
output {
if [module_name] == "Module1"{
elasticsearch {
hosts => ["192.168.200.64:9200"]
index => "logstash-%{+YYYY.MM.dd}"
codec => avro{
schema_uri => "../avro_schemas/module1_avro.avsc"
}
}
kafka{
topic_id => "logstash_output"
bootstrap_servers => "192.168.200.64:9092"
acks => all
codec => avro{
schema_uri => "../avro_schemas/module1_avro.avsc"
}
}
}
if [module_name] == "Module2"{
elasticsearch {
hosts => ["192.168.200.64:9200"]
index => "logstash-%{+YYYY.MM.dd}"
codec => avro{
schema_uri => "../avro_schemas/module2.avsc"
}
}
kafka{
topic_id => "logstash_output"
bootstrap_servers => "192.168.200.64:9092"
acks => all
codec => avro{
schema_uri => "../avro_schemas/module2.avsc"
}
}
}
}