@DavidTurner hope you are doing well can you please assist me into this one , I am stuck in this one
I successfully implemented LogStash, here below is my logstash.conf code which is working perfectly
input {
kafka {
bootstrap_servers => "http://localhost:9092"
topics => ["tracking"]
codec => avro {
schema_uri => "/home/abc/python/working/divolte-collector-with-apache-kafka/divolte-collector-0.9.0/conf/MyEventRecord.avsc"
}
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}
filter {
if [eventType] == 'view' {
prune {
blacklist_names => ["^purchase___.*"]
}
ruby {
code => "prefix = 'view___'
event.to_hash.each{|k,v|
if (!k.start_with?('@')) && (k.include?(prefix))
event.set(k.split(prefix).last, v)
event.remove(k)
end
}"
}
}
if [eventType] == 'purchase' {
prune {
blacklist_names => ["^view___.*"]
}
ruby {
code => "prefix = 'purchase___'
event.to_hash.each{|k,v|
if (!k.start_with?('@')) && (k.include?(prefix))
event.set(k.split(prefix).last, v)
event.remove(k)
end
}"
}
}
}
output {
stdout {
codec => rubydebug
}
if [eventType] == 'view' {
kafka {
bootstrap_servers => "http://localhost:9092"
topic_id => 'view'
}
}
else if [eventType] == 'purchase' {
kafka {
bootstrap_servers => "http://localhost:9092"
topic_id => 'purchase'
}
}
else {
kafka {
bootstrap_servers => "http://localhost:9092"
topic_id => 'unrecognized'
}
}
}
here in the input section if you see we applied value_deserializer_class just to deserialize the result, and when in the consumer side in my Python code I am unable to deserialize this one , I tried various strategies but unable to deserialize this one.
consumer = KafkaConsumer(bootstrap_servers="localhost:9092", auto_offset_reset="earliest", value_deserializer=lambda m: json.loads(m).decode('utf-8'))
consumer.subscribe(['purchase'])
for message in consumer:
print(message)
here I am getting JSONDecodeError Extra Data line 1 column 5 (char 4)
I removed decode() as well but this one also giving the same error
consumer = KafkaConsumer(bootstrap_servers="localhost:9092", auto_offset_reset="earliest", value_deserializer=lambda m: json.loads(m))
But what I see in the Logstash automatic shell this will giving the exact json result which I want , in the Consumer Python application
{
"order_id"=> "1",
"session_id"=> "q:2332321sqknds",
"unit_price": "100",
"tax": 10",
"additional_cost": ""
.....
}
so through this I tried one more thing I applied , in the output section
if [eventType] == 'purchase' {
kafka {
bootstrap_servers => "http://localhost:9092"
topic_id => 'purchase'
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}
But here this one is giving "value_deserializer_class" is not allowed in Kafka , even though I am using in input section value_deserializer_class