Deserialization of results in the Output section

@DavidTurner hope you are doing well can you please assist me into this one , I am stuck in this one
I successfully implemented LogStash, here below is my logstash.conf code which is working perfectly

input {
  kafka {
        bootstrap_servers => "http://localhost:9092"
        topics => ["tracking"]
        codec => avro {
            schema_uri => "/home/abc/python/working/divolte-collector-with-apache-kafka/divolte-collector-0.9.0/conf/MyEventRecord.avsc"
        }
        value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    }
}


filter {
    if [eventType] == 'view' {
        prune {
            blacklist_names => ["^purchase___.*"]
        }
        ruby {
            code => "prefix = 'view___'
            event.to_hash.each{|k,v|
            if (!k.start_with?('@')) && (k.include?(prefix))
                event.set(k.split(prefix).last, v)
                event.remove(k)
            end
            }"
        }

    }

    if [eventType] == 'purchase' {
        prune {
            blacklist_names => ["^view___.*"]
        }
        ruby {
            code => "prefix = 'purchase___'
            event.to_hash.each{|k,v|
            if (!k.start_with?('@')) && (k.include?(prefix))
                event.set(k.split(prefix).last, v)
                event.remove(k)
            end
            }"
        }

    }
}

output {
    stdout {
        codec => rubydebug
    }

    if [eventType] == 'view' {
        kafka {
            bootstrap_servers => "http://localhost:9092"
            topic_id => 'view'
        }
    }
    else if [eventType] == 'purchase' {
        kafka {
            bootstrap_servers => "http://localhost:9092"
            topic_id => 'purchase'
        }
    }
    else {
        kafka {
            bootstrap_servers => "http://localhost:9092"
            topic_id => 'unrecognized'
        }

    }
}

here in the input section if you see we applied value_deserializer_class just to deserialize the result, and when in the consumer side in my Python code I am unable to deserialize this one , I tried various strategies but unable to deserialize this one.

consumer = KafkaConsumer(bootstrap_servers="localhost:9092", auto_offset_reset="earliest", value_deserializer=lambda m: json.loads(m).decode('utf-8'))

consumer.subscribe(['purchase'])
for message in consumer:
      print(message)

here I am getting JSONDecodeError Extra Data line 1 column 5 (char 4)

I removed decode() as well but this one also giving the same error

consumer = KafkaConsumer(bootstrap_servers="localhost:9092", auto_offset_reset="earliest", value_deserializer=lambda m: json.loads(m))

But what I see in the Logstash automatic shell this will giving the exact json result which I want , in the Consumer Python application
{
"order_id"=> "1",
"session_id"=> "q:2332321sqknds",
"unit_price": "100",
"tax": 10",
"additional_cost": ""

.....

}

so through this I tried one more thing I applied , in the output section

if [eventType] == 'purchase' {
kafka {
bootstrap_servers => "http://localhost:9092"
topic_id => 'purchase'
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}

But here this one is giving "value_deserializer_class" is not allowed in Kafka , even though I am using in input section value_deserializer_class

It's quite impolite to ping random people who aren't already involved in your conversation. Please don't do this again. We're all volunteers here, we cannot address every question.

ok sorry

You do not have a value_deserializer_class setting in the kafka output, only in the input.

In the output you have a value_serializer_class.

Can you give more context about what you expect to be in the output? What it should look like?

@leandrojmp thanks for your response
basically I want the object {"eventType": "purchase", "purchase___order_id": "123XC", purchase___product_price: "99", "view__product_id": ""}

which this logstash get into the input section and after I applied Ruby filter to remove some of the fields (event type is purchase => then remove view fields)
(event type is view => then remove purchase fields)

After Applying filter
{"eventType": "purchase", "purchase___order_id": "123XC", purchase___product_price: "99"}
The object is looking something like this and this object I want to be easily accessible by my consumer application
but my consumer application getting something like this
2022-03-22%{host}%{message}

In logstash running automatic configuration I am getting in terminal are the perfect results

On view:

{
"created_at" => 1646241664540,
"session_id" => "0:l09gy8t0:deZazkdX_mux~VSA06rEr812XXnzlpBV",
"customer_id" => "22",
"@version" => "1",
"product_id" => "34",
"@timestamp" => 2022-03-02T17:21:04.554Z,
"eventType" => "view"
}

on Purchase

{
"created_at" => 1646237126471,
"tax" => "5",
"unit_total_cost" => "",
"quantity" => "",
"unit_price" => "20",
"session_id" => "0:l09gy8t0:deZazkdX_mux~VSA06rEr812XXnzlpBV",
"customer_id" => "282",
"promo_id" => "",
"@version" => "1",
"gross_spend_amount" => "",
"product_id" => "181",
"@timestamp" => 2022-03-02T16:05:26.489Z,
"order_id" => "CC2",
"eventType" => "purchase",
"other_cost" => "10",
"discount" => "",
"net_spend_amount" => "35"
}

but in Kafka consumer

for message in consumer:
print(message.value)
#b'2022-03-02T17:36:24.056Z %{host} %{message}

Behind the scene message contains:

ConsumerRecord(topic='purchase', partition=0, offset=31, timestamp=1646242584162, timestamp_type=0, key=None, value=b'2022-03-02T17:36:24.056Z %{host} %{message}', headers=, checksum=3789492333, serialized_key_size=-1, serialized_value_size=43, serialized_header_size=-1)

hi, anybody here who can help me in this, my build is stucked

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.