Logstasth Filter and Plugins Elasticsearch to Search and Add New Tag

I use Zeek Framework (BroIDS) to monitoring my lab network. I use filebeat to send data to ELK. The logs from Zeek are in json format and they are similar to that:

{"ts":"2019-12-15T23:08:37.016852Z","uid":"CzKj6o2MOtx4sTuor4","id.orig_h":"192.168.29.87","id.orig_p":35518,"id.resp_h":"91.189.89.198","id.resp_p":123,"proto":"udp","service":"ntp","duration":0.03400611877441406,"orig_bytes":48,"resp_bytes":48,"conn_state":"SF","local_orig":true,"local_resp":false,"missed_bytes":0,"history":"Dd","orig_pkts":1,"orig_ip_bytes":76,"resp_pkts":1,"resp_ip_bytes":76}

Where the field id.resp_h and id.orig_h are important for me.

On the other hand I download a file with reputation ip (intelmq, malwaredomainlist, abuse, spamhaus, malc0de) and the lines of the file is similar to that:

     {"feed.accuracy": 100.0, "feed.name": "Feodo Tracker Browse", "feed.provider": "Abuse.ch", "feed.url": "https://feodotracker.abuse.ch/browse", "time.observation": "2019-12-15T23:04:49+00:00", "time.source": "2018-07-23T16:31:21+00:00", "source.ip": "71.71.3.84", "malware.name": "heodo", "status": "Offline", "source.as_name": "TWC-11426-CAROLINAS - Charter Communications Inc, US", "source.geolocation.cc": "US", "classification.type": "c2server", "raw": "PHRyPjx0ZD4yMDE4LTA3LTIzIDE4OjMxOjIxPC90ZD48dGQ+PGEgaHJlZj0iL2Jyb3dzZS9ob3N0LzcxLjcxLjMuODQvIiB0YXJnZXQ9Il9wYXJlbnQiIHRpdGxlPSJHZXQgbW9yZSBpbmZvcm1hdGlvbiBhYm91dCB0aGlzIGJvdG5ldCBDJmFtcDtDIj43MS43MS4zLjg0PC9hPjwvdGQ+PHRkPjxzcGFuIGNsYXNzPSJiYWRnZSBiYWRnZS1pbmZvIj5IZW9kbyA8YSBjbGFzcz0ibWFscGVkaWEiIGhyZWY9Imh0dHBzOi8vbWFscGVkaWEuY2FhZC5ma2llLmZyYXVuaG9mZXIuZGUvZGV0YWlscy93aW4uZW1vdGV0IiB0YXJnZXQ9Il9ibGFuayIgdGl0bGU9Ik1hbHBlZGlhOiBFbW90ZXQgKGFrYSBHZW9kbyBha2EgSGVvZG8pIj48L2E+PC9zcGFuPjwvdGQ+PHRkPjxzcGFuIGNsYXNzPSJiYWRnZSBiYWRnZS1zdWNjZXNzIj5PZmZsaW5lPC9zcGFuPjwvdGQ+PHRkPk5vdCBsaXN0ZWQ8L3RkPjx0ZCBjbGFzcz0idGV4dC10cnVuY2F0ZSI+QVMxMTQyNiBUV0MtMTE0MjYtQ0FST0xJTkFTIC0gQ2hhcnRlciBDb21tdW5pY2F0aW9ucyBJbmM8L3RkPjx0ZD48aW1nIGFsdD0iLSIgc3JjPSIvaW1hZ2VzL2ZsYWdzL3VzLnBuZyIgdGl0bGU9IlVTIi8+IFVTPC90ZD48L3RyPg==", "classification.taxonomy": "malicious code", "source.asn": 11426, "source.network": "71.68.0.0/14", "source.registry": "ARIN", "source.allocated": "2005-05-16T00:00:00+00:00"}

Where the important informacion for me is the field source.ip.

I would like to do the four task (if it is possible):
[1] Use the filter elasticsearch in Logstahs to search in the index intelmq the field source.ip. (Is the index with reputation IP)

[2] Compare in realtime (more or less) the source.ip whith the other field from the other log id.resp_h and id.orig_h (these fields exist in the new index that I try to add to ELK through Logstash)

[3] If id.resp_h or id.orig_h are iqual than source.ip add a tag [warning]

[4] Send data to elasticsearch with ther output to manage with Kibana

.....
I am not sure but I thing that it would be similar than:

    input {
      beats {
        port => 5044
    } }
    filter {
        if "zeek" in [tags] {
            json {
                source => "message"
            }
            mutate {
            remove_field => ["message"]
            }
            elasticsearch {
            hosts => ["http://localhost:9200"]
            index => "filebeat-intel-*"
            query => "source.ip:%{message}"    
            fields => { "source.ip" => "botip" }
            result_size => 1
            enable_sort => false
        }
        if [botip] {
          mutate {
            add_tag => ["botnet"]
            remove_field => "botip"
          }
        }  

       }
     }
    output {
      if "zeek" in [tags] {
        elasticsearch {
          hosts => ["http://localhost:9200"]
          index => "filebeat-zeek-%{+YYYY.MM.dd}"
        }
    } }

The configuration is wrong because when data is added to ELK the filter don't add the new tag.

Some ideas to resolve my problem?
Thanks to all.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.