Deserialization of results in the Output section

I successfully implemented LogStash, here below is my logstash.conf code which is working perfectly

input {
  kafka {
        bootstrap_servers => "http://localhost:9092"
        topics => ["tracking"]
        codec => avro {
            schema_uri => "/home/abc/python/working/divolte-collector-with-apache-kafka/divolte-collector-0.9.0/conf/MyEventRecord.avsc"
        value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"

filter {
    if [eventType] == 'view' {
        prune {
            blacklist_names => ["^purchase___.*"]
        ruby {
            code => "prefix = 'view___'
            if (!k.start_with?('@')) && (k.include?(prefix))
                event.set(k.split(prefix).last, v)


    if [eventType] == 'purchase' {
        prune {
            blacklist_names => ["^view___.*"]
        ruby {
            code => "prefix = 'purchase___'
            if (!k.start_with?('@')) && (k.include?(prefix))
                event.set(k.split(prefix).last, v)


output {
    stdout {
        codec => rubydebug

    if [eventType] == 'view' {
        kafka {
            bootstrap_servers => "http://localhost:9092"
            topic_id => 'view'
    else if [eventType] == 'purchase' {
        kafka {
            bootstrap_servers => "http://localhost:9092"
            topic_id => 'purchase'
    else {
        kafka {
            bootstrap_servers => "http://localhost:9092"
            topic_id => 'unrecognized'


here in the input section if you see we applied value_deserializer_class just to deserialize the result, and when in the consumer side in my Python code I am unable to deserialize this one , I tried various strategies but unable to deserialize this one.

consumer = KafkaConsumer(bootstrap_servers="localhost:9092", auto_offset_reset="earliest", value_deserializer=lambda m: json.loads(m).decode('utf-8'))

for message in consumer:

here I am getting JSONDecodeError Extra Data line 1 column 5 (char 4)

I removed decode() as well but this one also giving the same error

consumer = KafkaConsumer(bootstrap_servers="localhost:9092", auto_offset_reset="earliest", value_deserializer=lambda m: json.loads(m))

But what I see in the Logstash automatic shell this will giving the exact json result which I want , in the Consumer Python application
"order_id"=> "1",
"session_id"=> "q:2332321sqknds",
"unit_price": "100",
"tax": 10",
"additional_cost": ""



so through this I tried one more thing I applied , in the output section

if [eventType] == 'purchase' {
kafka {
bootstrap_servers => "http://localhost:9092"
topic_id => 'purchase'
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"

But here this one is giving "value_deserializer_class" is not allowed in Kafka , even though I am using in input section value_deserializer_class

You do not have a value_deserializer_class setting in the kafka output, only in the input.

In the output you have a value_serializer_class.

Can you give more context about what you expect to be in the output? What it should look like?

@leandrojmp thanks for your response
basically I want the object {"eventType": "purchase", "purchase___order_id": "123XC", purchase___product_price: "99", "view__product_id": ""}

which this logstash get into the input section and after I applied Ruby filter to remove some of the fields (event type is purchase => then remove view fields)
(event type is view => then remove purchase fields)

After Applying filter
{"eventType": "purchase", "purchase___order_id": "123XC", purchase___product_price: "99"}
The object is looking something like this and this object I want to be easily accessible by my consumer application
but my consumer application getting something like this

In logstash running automatic configuration I am getting in terminal are the perfect results

On view:

"created_at" => 1646241664540,
"session_id" => "0:l09gy8t0:deZazkdX_mux~VSA06rEr812XXnzlpBV",
"customer_id" => "22",
"@version" => "1",
"product_id" => "34",
"@timestamp" => 2022-03-02T17:21:04.554Z,
"eventType" => "view"

on Purchase

"created_at" => 1646237126471,
"tax" => "5",
"unit_total_cost" => "",
"quantity" => "",
"unit_price" => "20",
"session_id" => "0:l09gy8t0:deZazkdX_mux~VSA06rEr812XXnzlpBV",
"customer_id" => "282",
"promo_id" => "",
"@version" => "1",
"gross_spend_amount" => "",
"product_id" => "181",
"@timestamp" => 2022-03-02T16:05:26.489Z,
"order_id" => "CC2",
"eventType" => "purchase",
"other_cost" => "10",
"discount" => "",
"net_spend_amount" => "35"

but in Kafka consumer

for message in consumer:
#b'2022-03-02T17:36:24.056Z %{host} %{message}

Behind the scene message contains:

ConsumerRecord(topic='purchase', partition=0, offset=31, timestamp=1646242584162, timestamp_type=0, key=None, value=b'2022-03-02T17:36:24.056Z %{host} %{message}', headers=, checksum=3789492333, serialized_key_size=-1, serialized_value_size=43, serialized_header_size=-1)

