Enrichment with recent logs

Hello,

I have a logstash server receiving security events from BRO and also from a firewall. Upon ingest I've created a new log field that merges source-ip, source-port, dest-ip and dest-port into a single field called src_dst. This field is present in both my BRO events and also in my firewall events. They look like this;

src_dst: 192.168.100.3_49778_23.12.57.18_443

The BRO log enters the system prior to the firewall log, and the timestamp is usually half a second or so ahead of the first firewall entry.

I'm attempting to have logstash lookup the src_dst event from the firewall log and extract other fields (initially a single field called ja3) from the BRO logs. This is what I'm using for my elasticsearch filter.

elasticsearch {
hosts => ["X.X.X.X:9200"]
index => "logstash-bro-%{+YYYY.MM.DD}"
query_template => "/etc/logstash/data/template.json"
fields => { "ja3_hash" => "ja3" }
}
ruby {
code => 'event.set("ja3_hash",event.get("ja3"))'
}

Here is my template.json file

{"query": {
"term": {
"src_dst": {
"value": "%{[src_dst]}"}
}
},
"_source": ["ja3_hash"]
}

When this is run I'm only getting a "-" value entered into the populated ja3_hash field in the firewall event.

This is the first time I've attempted to use the elasticsearch filter for enrichment, so possibly doing something incorrect here.

F

Just validated my search from the command line also. I'm getting results back from the CLI/Curl but always a null entry in my log events.

curl -XPOST 'localhost:9200/_search?pretty' -H 'Content-Type: application/json' -d ' {

"size": 1,
"query": {
"term": {
"src_dst": {
"value": "192.168.100.201_59481_192.168.100.240_9080"}
}
},
"_source": ["version", "ja3_hash", "ja3s_hash", "cipher"]
}'
{
"took" : 6,
"timed_out" : false,
"_shards" : {
"total" : 42,
"successful" : 42,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 1,
"max_score" : 3.2834144,
"hits" : [
{
"_index" : "logstash-bro-2019.03.81",
"_type" : "doc",
"_id" : "za4so2kBv9DUFyxXaAmG",
"_score" : 3.2834144,
"_source" : {
"cipher" : "TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA384",
"ja3s_hash" : "9099266b09da09a1d9e1839ae9ad5682",
"ja3_hash" : "decfb48a53789ebe081b88aabb58ee34",
"version" : "TLSv12"
}
}
]
}
}

ok, I've hardcoded my search now to not use a variable passed from my filter and this works. Looks like this filter can't lookup data from recent events? Is there any restrictions here that we need to be aware of?

For an event to be searchable in Elasticsearch, it must first have passed all the way through the Logstash pipeline and been successfully written to Elasticsearch. As Logstash batches up events the full batch need to be successfully processed. Once it is in Elasticsearch it has to wait for a refresh to occur to be made searchable, which by default is initiated once per second. How long this process takes will depend on how much load the cluster is under, the latency of the bulk request and how long the refresh operation takes once initiated.

This can likely take at least a few seconds, so if your events arrive close in time this type of solution might not work. You may want to have a look at this blog post which discussed enrichment and talks about a prototype memcached plugin that could perhaps be used to achieve much lower latencies and be suitable, although I have not tried it out.

Yep, I'm running my tests with subsequent packets and seems to be taking 10's of seconds to be searchable....the filter syntax is correct though. I'll look into this memchched plugin :slight_smile:

Cheers
F

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.