Is it possible detect duplicate message in Kafka topic with logstash

we use logstash and we want read one table from oracle database and send this message to kafka, we format message in kafka is:

Topic1: message1: {"name":"name-1", "id":"fbd89256-12gh-10og-etdgn1234njF", "site":"site-1", "time":"2019-07-30"}
        message2: {"name":"name-2", "id":"fbd89256-12gh-10og-etdgn1234njG", "site":"site-1", "time":"2019-07-30"}
        message3: {"name":"name-3", "id":"fbd89256-12gh-10og-etdgn1234njS", "site":"site-1", "time":"2019-07-30"}
        message4: {"name":"name-4", "id":"fbd89256-12gh-10og-etdgn1234njF", "site":"site-1", "time":"2019-07-30"}

Please note that message1 and message4 are the duplicates with the same ID number.

Now, we want sure all messages are unique, so how can we filter topic1 and unique all message then send to topic2?

The end result we want:

Topic2: message1: {"name":"name-1", "id":"fbd89256-12gh-10og-etdgn1234njF", "site":"site-1", "time":"2019-07-30"}
        message2: {"name":"name-2", "id":"fbd89256-12gh-10og-etdgn1234njG", "site":"site-1", "time":"2019-07-30"}
        message3: {"name":"name-3", "id":"fbd89256-12gh-10og-etdgn1234njS", "site":"site-1", "time":"

Is this possible with logstash?
thanks.

It is possible

    mutate { add_field => { "[@metadata][constant]" => "task" } }
    aggregate {
        task_id => "%{[@metadata][constant]}"
        code => '
            map["ids"] ||= []
            id = event.get("id")
            if map["ids"].include? id
                event.cancel
            else
                map["ids"] << id
            end
        '
    }

However, that never purges old entries, so eventually it will run out of memory. Also, in current versions you cannot control whether you get name-1 or name-4.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.