Incorrect avro codec output

I have a logstash setup in which the data comes from beats then it will output to elasticsearch and kafka.

I used avro codec to both output plugin.

Upon checking, the output in elasticsearch is just like a json c0dec and there's a tag "beats_input_codec_plain_applied". From the tag, does it mean that it did not convert to avro format?

In kafka, the messages that i received are unreadable. I'm not sure if what are these messages.

`JjIwMTgtMDgtMTQgMjM6NTk6NTkQMjAxNy4wMTIQMS4xMDcwOTkIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjEQMTE3Mi45MzEQMy4zMDg3NDUIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjIQMTA0MC41NzESMC45ODQ1NDc1CDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MjMQMTAwMi4wMjMSMC45MDQ2ODI2CDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MjQQMTIwNC45NDEOMi40NzIyMQgyMTEyDkRBIFRlYW0MWW9kYSAyBlNWVAIwCDkwMDYGU1ZUCkFXU1BI
JjIwMTgtMDgtMTUgMTE6NTM6MjYQMTE1Mi43MDkSMC45ODAyNTUzCDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MjgQMTEwOS42OTEQMS4wNTUxOTgIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjcQMTE4Mi4yMzcQMi4xNzg4NjMIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjUQMTEzMS4xODQQMS4wNTc3NzQIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MjkMMTI0NC41EDcuMTg1ODM3CDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MzEQMTMwMy4zNjkQNi42MjAzMDkIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MzAQMTM0MS44MDIQMTIuMDYxMDYIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA==
JjIwMTgtMDgtMTUgMTE6NTM6MzQQMTA5NC44ODgSMC43MDk5MDIzCDIxMTIOREEgVGVhbQxZb2RhIDIGU1ZUAjAIOTAwNgZTVlQKQVdTUEg=
JjIwMTgtMDgtMTUgMTE6NTM6MzIQMTEwOC4yNDQQMS4yNDA3MTgIMjExMg5EQSBUZWFtDFlvZGEgMgZTVlQCMAg5MDA2BlNWVApBV1NQSA=  `

Here is the sample of my config:

input{
 beats{
  port => 5044
 }
}


filter {
 if [module_name] == "Module1"{
 csv {
  separator => ","

  columns => ["logtime", "name", "address", "age"]
 }
 }
 
 if [module_name] == "Module2"{
 csv {
  separator => ","

  columns => ["logtime", "data1", "data2", "data3" ]
 }
 }
 
 mutate{
		add_field => {
			"version" => "logs v.20.1"
	}
 }
 
 fingerprint{
	target => "generated_id"
	method => "UUID"
 }
 
}

output {
	if [module_name] == "Module1"{
		elasticsearch {
			hosts => ["192.168.200.64:9200"]
			index => "logstash-%{+YYYY.MM.dd}"
			codec => avro{
			schema_uri => "../avro_schemas/module1_avro.avsc"
			}
		}
		
		kafka{
			topic_id => "logstash_output"
			bootstrap_servers => "192.168.200.64:9092"
			acks => all
			codec => avro{
			schema_uri => "../avro_schemas/module1_avro.avsc"
			}
		}
	}
	
	if [module_name] == "Module2"{
		elasticsearch {
			hosts => ["192.168.200.64:9200"]
			index => "logstash-%{+YYYY.MM.dd}"
			codec => avro{
			schema_uri => "../avro_schemas/module2.avsc"
			}
		}
		
		kafka{
			topic_id => "logstash_output"
			bootstrap_servers => "192.168.200.64:9092"
			acks => all
			codec => avro{
			schema_uri => "../avro_schemas/module2.avsc"
			}
		}
	}
	
}

Why would you want to store the Avro representation of an event in ES?

Have you tried processing one of the Kafka messages with an Avro decoder?

Sorry, i'm not familiar with the output in kafka.

I got this results when i print the topic.

Is this the expected results of data from logstash to kafka using avro format?

8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTYgMjM6NDk6MzkQNzU5LjcwNzkUMC4wNTM5NzkxMggxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTcgMDA6MTU6MTAQNzQ5Ljg4MzcSMC4wNDUyMjQ5CDE4MDAOREEgVGVhbRBHcmFmLUV2bwZTSVQCNAg4MDM0BlNJVApBV1NQSA==
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTYgMjM6NTA6MDkQNzYzLjkyODIUMC4wNDQyOTYxNQgxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTcgMDA6MTU6NDEONzU0Ljg5NBQwLjA1MDk4MDA1CDE4MDAOREEgVGVhbRBHcmFmLUV2bwZTSVQCNAg4MDM0BlNJVApBV1NQSA==
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTYgMjM6NTA6NDAQNzU5LjI5OTMUMC4wMzMwNzAwMwgxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTYgMjM6NTE6MTEQNzY5LjM5MDMUMC4wNDM4OTk3OQgxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTcgMDA6MTY6MTEQNzU0Ljk3NzkUMC4wNDM3OTU2NQgxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTYgMjM6NTE6NDIQNzg3Ljk0OTMUMC4wNDY2NzE5NAgxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTYgMjM6NTI6MTIQNzY5LjQ0OTEUMC4wNTAwODE3MggxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTcgMDA6MTY6NDEQNzQyLjQ2OTISMC4wMzA4MjY2CDE4MDAOREEgVGVhbRBHcmFmLUV2bwZTSVQCNAg4MDM0BlNJVApBV1NQSA==
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTYgMjM6NTI6NDIQNzM1LjU0MzUUMC4wMjMxNzM0NAgxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTYgMjM6NTM6MTMQNzg4LjkxNzgSMC4xMDU0MjMxCDE4MDAOREEgVGVhbRBHcmFmLUV2bwZTSVQCNAg4MDM0BlNJVApBV1NQSA==
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTcgMDA6MTc6MTEONzQ2LjIzORQwLjA0NDIxNjkxCDE4MDAOREEgVGVhbRBHcmFmLUV2bwZTSVQCNAg4MDM0BlNJVApBV1NQSA==
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTYgMjM6NTM6NDMONzgwLjE0NBQwLjAyOTUzNDQ5CDE4MDAOREEgVGVhbRBHcmFmLUV2bwZTSVQCNAg4MDM0BlNJVApBV1NQSA==
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTcgMDA6MTc6NDIQNzc3LjI3MzMUMC4wNzE4NTgwNAgxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTYgMjM6NTQ6MTMQNzY2Ljc2MTQUMC4wNDQzOTY0MwgxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTcgMDA6MTg6MTMQNzMxLjM4NDkUMC4wNjE3NTU0NggxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTcgMDA6MTg6NDMQNzMzLjQ4NzQUMC4wMjg1MDIxNggxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTYgMjM6NTQ6NDQQNzcxLjYyNTcUMC4wNDgyOTAyNQgxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTcgMDA6MTk6MTMQNzQxLjA3NDMUMC4wNDY3MzU4NwgxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTcgMDA6MTk6NDMQNzUwLjc4MjUUMC4wNDcxNDIyMQgxODAwDkRBIFRlYW0QR3JhZi1Fdm8GU0lUAjQIODAzNAZTSVQKQVdTUEg=
8/17/18 12:36:40 AM PHT , null , JjIwMTgtMDgtMTYgMjM6NTU6MTQQNzYxLjU0NjUSMC4wNjUxMjA1CDE4MDAOREEgVGVhbRBHcmFmLUV2bwZTSVQCNAg4MDM0BlNJVApBV1NQSA==

I used ksql to decode the avro, but the ksql can't read the data.

do you have any idea on how to decode the output file of logstash to kafka?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.