Multiple events all end up with the same result from an http filter query

Hi, I'm having trouble with checking for a new IP for a user login with an HTTP filter query against my OpenSearch node. My filter code looks like:

  if [event_type] == "user_logged_in" {
    http {
      url => "https://opensearch-node1:9200/logging_event_business/_search"
      verb => "POST"
      ssl_verification_mode => none
      user => "${OPENSEARCH_USER}"
      password => "${OPENSEARCH_PASSWORD}"
      body => '{"query":{"bool":{"must":[{"term":{"context.ip_address":"%{[context][ip_address]}"}}, {"term":{"event_type":"user_logged_in"}}]}},"size":0}'
      body_format => "json"
      target_body => "opensearch_response"
    }

  ruby {
    code => "
      if event.get('[opensearch_response][hits][total][value]').to_i < 1
        event.set('new_ip', 'True')
      else
        event.set('new_ip', 'False')
      end
     event.remove('opensearch_response')
      "
    }

  }

So, when I send the below array of json events:

test_events = [
{"uuid":"ec94b",
"created":"2024-07-30T11:43:43.868976Z",
"event_type":"user_logged_in",
"message":"User test successfully logged in.",
"context":{
"user_uuid":"testuuid4",
"ip_address":"111.111.111.111",
"user_full_name":"Test",
},
{"uuid":"ec94b",
"created":"2024-07-30T11:43:44.868976Z",
"event_type":"user_logged_in",
"message":"User test successfully logged in.",
"context":{
"user_uuid":"testuuid4",
"ip_address":"111.111.111.111",
"user_full_name":"Test",
}
]

I expect to get the field new_ip: True for one event, and False for the other one. But I get both of them to be True. The query is correct and is working as expected. I suspect that it may be that the query returns 0 for both times, but this should not be so? As I understand events are processed one after another, so by the time the second one comes in the first one should of been processed already and the ip should be in OpenSearch?
I send my events to the logstash endpoint:

http {
    port => 5044
    codec => "json"
  }

OpenSearch/OpenDistro are AWS run products and differ from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance. See What is OpenSearch and the OpenSearch Dashboard? | Elastic for more details.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )

Removed #opensearch

Events are not processed by logstash one at a time. Events are sent through the logstash pipeline in batches. The size of a batch is controlled by the pipeline.batch.size option. As I understand it, a batch is completely processed by one filter before it moves on to the next one.

Even if you set the batch size to 1 (which would be very inefficient) there will still be delays in getting the event indexed before it can be returned by a query.

Hmm. Any suggestions for the approach then?

If you assume there is a maximum time it will take for a record to become available via an index query you could use an aggregate filter to keep an in-memory database of recently seen IP addresses. I believe this database will be lost across a restart.

If you use this configuration

input { heartbeat { interval => 5 } }
output { stdout { codec => rubydebug { metadata => false } } }
filter {
    mutate { remove_field => [ "event", "host", "log", "@version", "message" ] }

    mutate { add_field => { "ip_address" => "127.9.1.23" } }
    aggregate {
        task_id => "%{ip_address}"
        code => '
            map["count"] ||= 0
            event.set("new_ip", (map["count"] == 0))
            map["count"] += 1
        '
        timeout => 10
    }
}

then it produces the following output

{ "@timestamp" => 2024-07-30T16:58:25.134340742Z, "ip_address" => "127.9.1.23", "new_ip" => true }
{ "@timestamp" => 2024-07-30T16:58:30.111953479Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T16:58:35.111934699Z, "ip_address" => "127.9.1.23", "new_ip" => false }
[2024-07-30T12:58:40,216][DEBUG][logstash.filters.aggregate][main] Aggregate remove expired map with task_id=127.9.1.23
{ "@timestamp" => 2024-07-30T16:58:40.111918657Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T16:58:45.111900486Z, "ip_address" => "127.9.1.23", "new_ip" => true }
{ "@timestamp" => 2024-07-30T16:58:50.111800530Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T16:58:55.111642570Z, "ip_address" => "127.9.1.23", "new_ip" => false }
[2024-07-30T12:59:00,214][DEBUG][logstash.filters.aggregate][main] Aggregate remove expired map with task_id=127.9.1.23
{ "@timestamp" => 2024-07-30T16:59:00.111416789Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T16:59:05.111175727Z, "ip_address" => "127.9.1.23", "new_ip" => true }
{ "@timestamp" => 2024-07-30T16:59:10.110620601Z, "ip_address" => "127.9.1.23", "new_ip" => false }
[2024-07-30T12:59:15,225][DEBUG][logstash.filters.aggregate][main] Aggregate remove expired map with task_id=127.9.1.23
{ "@timestamp" => 2024-07-30T16:59:15.110374444Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T16:59:20.110070042Z, "ip_address" => "127.9.1.23", "new_ip" => true }
{ "@timestamp" => 2024-07-30T16:59:25.109704418Z, "ip_address" => "127.9.1.23", "new_ip" => false }
[2024-07-30T12:59:30,227][DEBUG][logstash.filters.aggregate][main] Aggregate remove expired map with task_id=127.9.1.23
{ "@timestamp" => 2024-07-30T16:59:30.109625504Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T16:59:35.109028699Z, "ip_address" => "127.9.1.23", "new_ip" => true }
{ "@timestamp" => 2024-07-30T16:59:40.108947276Z, "ip_address" => "127.9.1.23", "new_ip" => false }
[2024-07-30T12:59:45,217][DEBUG][logstash.filters.aggregate][main] Aggregate remove expired map with task_id=127.9.1.23
{ "@timestamp" => 2024-07-30T16:59:45.108690135Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T16:59:50.108526577Z, "ip_address" => "127.9.1.23", "new_ip" => true }
{ "@timestamp" => 2024-07-30T16:59:55.108246684Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T17:00:00.107907162Z, "ip_address" => "127.9.1.23", "new_ip" => false }
[2024-07-30T13:00:05,220][DEBUG][logstash.filters.aggregate][main] Aggregate remove expired map with task_id=127.9.1.23
{ "@timestamp" => 2024-07-30T17:00:05.107811347Z, "ip_address" => "127.9.1.23", "new_ip" => false }

Timers in logstash are checked every five seconds to see if the code block associated with the timer should be run. That applies to both the heartbeat input and the timeout of the aggregate. As you can see, the timeout code seems to run every 15 or 20 seconds. These are not precise timers.

That said, the aggregate filter will remember that it has seen the IP address and set [new_ip] appropriately. I suggest you do the aggregate and the query and AND the results.

Note that the aggregate filter logs the message that it is going to delete a map entry before it processes the event that checks that map entry. So the event will see the map entry after the message is logged but before it is deleted.

Thanks for the answer, will consider this, though losing the table on a restart is not appealing. Will try some other options first.

Hey, me again.
There is a setting in logstash - pipeline.ordered, which seems to be meant to maintain the order of events and perform computations one after another.
I set it to true in my logstash's docker container logstash.yml file, but it does not seem to affect anything. Why?

It will preserve the order of events within a batch as they move through the pipeline, but events are still processed in batches.

logstash will sometimes re-order events within the pipeline if this is not set. It does not always happen, and if it is not happening then this setting will not have any effect.

Damn.
I have multiple such filters and not sure what to do. The suggested aggregation for ip may work, but not for other queries I have. Is the approach of enriching events based on results of the http filter not a valid one? Or is it because I'm looking to gather results in a very small timeframe?

Correct. There is nothing wrong in principal with enriching events with data from some sort of DB (including elasticsearch), but the data has to be there in order to fetch it.