I use Zeek Framework (BroIDS) to monitoring my lab network. I use filebeat to send data to ELK. The logs from Zeek are in json format and they are similar to that:
{"ts":"2019-12-15T23:08:37.016852Z","uid":"CzKj6o2MOtx4sTuor4","id.orig_h":"192.168.29.87","id.orig_p":35518,"id.resp_h":"91.189.89.198","id.resp_p":123,"proto":"udp","service":"ntp","duration":0.03400611877441406,"orig_bytes":48,"resp_bytes":48,"conn_state":"SF","local_orig":true,"local_resp":false,"missed_bytes":0,"history":"Dd","orig_pkts":1,"orig_ip_bytes":76,"resp_pkts":1,"resp_ip_bytes":76}
Where the field id.resp_h and id.orig_h are important for me.
On the other hand I download a file with reputation ip (intelmq, malwaredomainlist, abuse, spamhaus, malc0de) and the lines of the file is similar to that:
{"feed.accuracy": 100.0, "feed.name": "Feodo Tracker Browse", "feed.provider": "Abuse.ch", "feed.url": "https://feodotracker.abuse.ch/browse", "time.observation": "2019-12-15T23:04:49+00:00", "time.source": "2018-07-23T16:31:21+00:00", "source.ip": "71.71.3.84", "malware.name": "heodo", "status": "Offline", "source.as_name": "TWC-11426-CAROLINAS - Charter Communications Inc, US", "source.geolocation.cc": "US", "classification.type": "c2server", "raw": "PHRyPjx0ZD4yMDE4LTA3LTIzIDE4OjMxOjIxPC90ZD48dGQ+PGEgaHJlZj0iL2Jyb3dzZS9ob3N0LzcxLjcxLjMuODQvIiB0YXJnZXQ9Il9wYXJlbnQiIHRpdGxlPSJHZXQgbW9yZSBpbmZvcm1hdGlvbiBhYm91dCB0aGlzIGJvdG5ldCBDJmFtcDtDIj43MS43MS4zLjg0PC9hPjwvdGQ+PHRkPjxzcGFuIGNsYXNzPSJiYWRnZSBiYWRnZS1pbmZvIj5IZW9kbyA8YSBjbGFzcz0ibWFscGVkaWEiIGhyZWY9Imh0dHBzOi8vbWFscGVkaWEuY2FhZC5ma2llLmZyYXVuaG9mZXIuZGUvZGV0YWlscy93aW4uZW1vdGV0IiB0YXJnZXQ9Il9ibGFuayIgdGl0bGU9Ik1hbHBlZGlhOiBFbW90ZXQgKGFrYSBHZW9kbyBha2EgSGVvZG8pIj48L2E+PC9zcGFuPjwvdGQ+PHRkPjxzcGFuIGNsYXNzPSJiYWRnZSBiYWRnZS1zdWNjZXNzIj5PZmZsaW5lPC9zcGFuPjwvdGQ+PHRkPk5vdCBsaXN0ZWQ8L3RkPjx0ZCBjbGFzcz0idGV4dC10cnVuY2F0ZSI+QVMxMTQyNiBUV0MtMTE0MjYtQ0FST0xJTkFTIC0gQ2hhcnRlciBDb21tdW5pY2F0aW9ucyBJbmM8L3RkPjx0ZD48aW1nIGFsdD0iLSIgc3JjPSIvaW1hZ2VzL2ZsYWdzL3VzLnBuZyIgdGl0bGU9IlVTIi8+IFVTPC90ZD48L3RyPg==", "classification.taxonomy": "malicious code", "source.asn": 11426, "source.network": "71.68.0.0/14", "source.registry": "ARIN", "source.allocated": "2005-05-16T00:00:00+00:00"}
Where the important informacion for me is the field source.ip.
I would like to do the four task (if it is possible):
[1] Use the filter elasticsearch in Logstahs to search in the index intelmq the field source.ip. (Is the index with reputation IP)
[2] Compare in realtime (more or less) the source.ip whith the other field from the other log id.resp_h and id.orig_h (these fields exist in the new index that I try to add to ELK through Logstash)
[3] If id.resp_h or id.orig_h are iqual than source.ip add a tag [warning]
[4] Send data to elasticsearch with ther output to manage with Kibana
.....
I am not sure but I thing that it would be similar than:
input {
beats {
port => 5044
} }
filter {
if "zeek" in [tags] {
json {
source => "message"
}
mutate {
remove_field => ["message"]
}
elasticsearch {
hosts => ["http://localhost:9200"]
index => "filebeat-intel-*"
query => "source.ip:%{message}"
fields => { "source.ip" => "botip" }
result_size => 1
enable_sort => false
}
if [botip] {
mutate {
add_tag => ["botnet"]
remove_field => "botip"
}
}
}
}
output {
if "zeek" in [tags] {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "filebeat-zeek-%{+YYYY.MM.dd}"
}
} }
The configuration is wrong because when data is added to ELK the filter don't add the new tag.
Some ideas to resolve my problem?
Thanks to all.