I have a logstash setup in which the data comes from beats then it will output to elasticsearch and kafka.
I used avro codec to both output plugin.
Upon checking, the output in elasticsearch is just like a json c0dec and there's a tag "beats_input_codec_plain_applied". From the tag, does it mean that it did not convert to avro format?
In kafka, the messages that i received are unreadable. I'm not sure if what are these messages.
`JjIwMTgtMDgtMTQgMjM6NTk6NTkQMjAxNy4wMTIQMS4xMDcwOTkIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjEQMTE3Mi45MzEQMy4zMDg3NDUIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjIQMTA0MC41NzESMC45ODQ1NDc1CDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MjMQMTAwMi4wMjMSMC45MDQ2ODI2CDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MjQQMTIwNC45NDEOMi40NzIyMQgyMTEyDkRBIFRlYW0MWW9kYSAyBlNWVAIwCDkwMDYGU1ZUCkFXU1BI
JjIwMTgtMDgtMTUgMTE6NTM6MjYQMTE1Mi43MDkSMC45ODAyNTUzCDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MjgQMTEwOS42OTEQMS4wNTUxOTgIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjcQMTE4Mi4yMzcQMi4xNzg4NjMIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjUQMTEzMS4xODQQMS4wNTc3NzQIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjkMMTI0NC41EDcuMTg1ODM3CDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MzEQMTMwMy4zNjkQNi42MjAzMDkIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MzAQMTM0MS44MDIQMTIuMDYxMDYIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MzQQMTA5NC44ODgSMC43MDk5MDIzCDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MzIQMTEwOC4yNDQQMS4yNDA3MTgIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA=  `
Here is the sample of my config:
input{
 beats{
  port => 5044
 }
}
filter {
 if [module_name] == "Module1"{
 csv {
  separator => ","
  columns => ["logtime", "name", "address", "age"]
 }
 }
 
 if [module_name] == "Module2"{
 csv {
  separator => ","
  columns => ["logtime", "data1", "data2", "data3" ]
 }
 }
 
 mutate{
		add_field => {
			"version" => "logs v.20.1"
	}
 }
 
 fingerprint{
	target => "generated_id"
	method => "UUID"
 }
 
}
output {
	if [module_name] == "Module1"{
		elasticsearch {
			hosts => ["192.168.200.64:9200"]
			index => "logstash-%{+YYYY.MM.dd}"
			codec => avro{
			schema_uri => "../avro_schemas/module1_avro.avsc"
			}
		}
		
		kafka{
			topic_id => "logstash_output"
			bootstrap_servers => "192.168.200.64:9092"
			acks => all
			codec => avro{
			schema_uri => "../avro_schemas/module1_avro.avsc"
			}
		}
	}
	
	if [module_name] == "Module2"{
		elasticsearch {
			hosts => ["192.168.200.64:9200"]
			index => "logstash-%{+YYYY.MM.dd}"
			codec => avro{
			schema_uri => "../avro_schemas/module2.avsc"
			}
		}
		
		kafka{
			topic_id => "logstash_output"
			bootstrap_servers => "192.168.200.64:9092"
			acks => all
			codec => avro{
			schema_uri => "../avro_schemas/module2.avsc"
			}
		}
	}
	
}