Logstash Filter and Plugins Elasticsearch to Search and Add New Tag

I use Zeek Framework (BroIDS) to monitoring my lab network. I use filebeat to send data to ELK. The logs from Zeek are in json format and they are similar to that:


Where the field id.resp_h and id.orig_h are important for me.

On the other hand I download a file with reputation ip (intelmq, malwaredomainlist, abuse, spamhaus, malc0de) and the lines of the file is similar to that:

     {"feed.accuracy": 100.0, "feed.name": "Feodo Tracker Browse", "feed.provider": "Abuse.ch", "feed.url": "https://feodotracker.abuse.ch/browse", "time.observation": "2019-12-15T23:04:49+00:00", "time.source": "2018-07-23T16:31:21+00:00", "source.ip": "", "malware.name": "heodo", "status": "Offline", "source.as_name": "TWC-11426-CAROLINAS - Charter Communications Inc, US", "source.geolocation.cc": "US", "classification.type": "c2server", "raw": "PHRyPjx0ZD4yMDE4LTA3LTIzIDE4OjMxOjIxPC90ZD48dGQ+PGEgaHJlZj0iL2Jyb3dzZS9ob3N0LzcxLjcxLjMuODQvIiB0YXJnZXQ9Il9wYXJlbnQiIHRpdGxlPSJHZXQgbW9yZSBpbmZvcm1hdGlvbiBhYm91dCB0aGlzIGJvdG5ldCBDJmFtcDtDIj43MS43MS4zLjg0PC9hPjwvdGQ+PHRkPjxzcGFuIGNsYXNzPSJiYWRnZSBiYWRnZS1pbmZvIj5IZW9kbyA8YSBjbGFzcz0ibWFscGVkaWEiIGhyZWY9Imh0dHBzOi8vbWFscGVkaWEuY2FhZC5ma2llLmZyYXVuaG9mZXIuZGUvZGV0YWlscy93aW4uZW1vdGV0IiB0YXJnZXQ9Il9ibGFuayIgdGl0bGU9Ik1hbHBlZGlhOiBFbW90ZXQgKGFrYSBHZW9kbyBha2EgSGVvZG8pIj48L2E+PC9zcGFuPjwvdGQ+PHRkPjxzcGFuIGNsYXNzPSJiYWRnZSBiYWRnZS1zdWNjZXNzIj5PZmZsaW5lPC9zcGFuPjwvdGQ+PHRkPk5vdCBsaXN0ZWQ8L3RkPjx0ZCBjbGFzcz0idGV4dC10cnVuY2F0ZSI+QVMxMTQyNiBUV0MtMTE0MjYtQ0FST0xJTkFTIC0gQ2hhcnRlciBDb21tdW5pY2F0aW9ucyBJbmM8L3RkPjx0ZD48aW1nIGFsdD0iLSIgc3JjPSIvaW1hZ2VzL2ZsYWdzL3VzLnBuZyIgdGl0bGU9IlVTIi8+IFVTPC90ZD48L3RyPg==", "classification.taxonomy": "malicious code", "source.asn": 11426, "source.network": "", "source.registry": "ARIN", "source.allocated": "2005-05-16T00:00:00+00:00"}

Where the important informacion for me is the field source.ip.

I would like to do the four task (if it is possible):
[1] Use the filter elasticsearch in Logstahs to search in the index intelmq the field source.ip. (Is the index with reputation IP)

[2] Compare in realtime (more or less) the source.ip whith the other field from the other log id.resp_h and id.orig_h (these fields exist in the new index that I try to add to ELK through Logstash)

[3] If id.resp_h or id.orig_h are iqual than source.ip add a tag [warning]

[4] Send data to elasticsearch with ther output to manage with Kibana

I am not sure but I thing that it would be similar than:

    input {
      beats {
        port => 5044
    } }
    filter {
        if "zeek" in [tags] {
            json {
                source => "message"
            mutate {
            remove_field => ["message"]
            elasticsearch {
            hosts => ["http://localhost:9200"]
            index => "filebeat-intel-*"
            query => "source.ip:%{message}"    
            fields => { "source.ip" => "botip" }
            result_size => 1
            enable_sort => false
        if [botip] {
          mutate {
            add_tag => ["botnet"]
            remove_field => "botip"

    output {
      if "zeek" in [tags] {
        elasticsearch {
          hosts => ["http://localhost:9200"]
          index => "filebeat-zeek-%{+YYYY.MM.dd}"
    } }

The configuration is wrong because when data is added to ELK the filter don't add the new tag.

Some ideas to resolve my problem?
Thanks to all.

