If you assume there is a maximum time it will take for a record to become available via an index query you could use an aggregate filter to keep an in-memory database of recently seen IP addresses. I believe this database will be lost across a restart.
If you use this configuration
input { heartbeat { interval => 5 } }
output { stdout { codec => rubydebug { metadata => false } } }
filter {
mutate { remove_field => [ "event", "host", "log", "@version", "message" ] }
mutate { add_field => { "ip_address" => "127.9.1.23" } }
aggregate {
task_id => "%{ip_address}"
code => '
map["count"] ||= 0
event.set("new_ip", (map["count"] == 0))
map["count"] += 1
'
timeout => 10
}
}
then it produces the following output
{ "@timestamp" => 2024-07-30T16:58:25.134340742Z, "ip_address" => "127.9.1.23", "new_ip" => true }
{ "@timestamp" => 2024-07-30T16:58:30.111953479Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T16:58:35.111934699Z, "ip_address" => "127.9.1.23", "new_ip" => false }
[2024-07-30T12:58:40,216][DEBUG][logstash.filters.aggregate][main] Aggregate remove expired map with task_id=127.9.1.23
{ "@timestamp" => 2024-07-30T16:58:40.111918657Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T16:58:45.111900486Z, "ip_address" => "127.9.1.23", "new_ip" => true }
{ "@timestamp" => 2024-07-30T16:58:50.111800530Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T16:58:55.111642570Z, "ip_address" => "127.9.1.23", "new_ip" => false }
[2024-07-30T12:59:00,214][DEBUG][logstash.filters.aggregate][main] Aggregate remove expired map with task_id=127.9.1.23
{ "@timestamp" => 2024-07-30T16:59:00.111416789Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T16:59:05.111175727Z, "ip_address" => "127.9.1.23", "new_ip" => true }
{ "@timestamp" => 2024-07-30T16:59:10.110620601Z, "ip_address" => "127.9.1.23", "new_ip" => false }
[2024-07-30T12:59:15,225][DEBUG][logstash.filters.aggregate][main] Aggregate remove expired map with task_id=127.9.1.23
{ "@timestamp" => 2024-07-30T16:59:15.110374444Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T16:59:20.110070042Z, "ip_address" => "127.9.1.23", "new_ip" => true }
{ "@timestamp" => 2024-07-30T16:59:25.109704418Z, "ip_address" => "127.9.1.23", "new_ip" => false }
[2024-07-30T12:59:30,227][DEBUG][logstash.filters.aggregate][main] Aggregate remove expired map with task_id=127.9.1.23
{ "@timestamp" => 2024-07-30T16:59:30.109625504Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T16:59:35.109028699Z, "ip_address" => "127.9.1.23", "new_ip" => true }
{ "@timestamp" => 2024-07-30T16:59:40.108947276Z, "ip_address" => "127.9.1.23", "new_ip" => false }
[2024-07-30T12:59:45,217][DEBUG][logstash.filters.aggregate][main] Aggregate remove expired map with task_id=127.9.1.23
{ "@timestamp" => 2024-07-30T16:59:45.108690135Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T16:59:50.108526577Z, "ip_address" => "127.9.1.23", "new_ip" => true }
{ "@timestamp" => 2024-07-30T16:59:55.108246684Z, "ip_address" => "127.9.1.23", "new_ip" => false }
{ "@timestamp" => 2024-07-30T17:00:00.107907162Z, "ip_address" => "127.9.1.23", "new_ip" => false }
[2024-07-30T13:00:05,220][DEBUG][logstash.filters.aggregate][main] Aggregate remove expired map with task_id=127.9.1.23
{ "@timestamp" => 2024-07-30T17:00:05.107811347Z, "ip_address" => "127.9.1.23", "new_ip" => false }
Timers in logstash are checked every five seconds to see if the code block associated with the timer should be run. That applies to both the heartbeat input and the timeout of the aggregate. As you can see, the timeout code seems to run every 15 or 20 seconds. These are not precise timers.
That said, the aggregate filter will remember that it has seen the IP address and set [new_ip] appropriately. I suggest you do the aggregate and the query and AND the results.
Note that the aggregate filter logs the message that it is going to delete a map entry before it processes the event that checks that map entry. So the event will see the map entry after the message is logged but before it is deleted.