Duplicate logs in Logstash

I collect VPN logs through Logstash and index them in Elasticsearch, but I'm having the following problem:

For each unique VPN connection (represented by TunnelID), there should be only one tunnel-up event and one tunnel-down event. However, sometimes the firewall duplicates the logs, so it can happen that a VPN connection has more than two assigned logs, as in the example below:

@timestamp	ConnectionDuration	Fortinet Action	TunnelID
Nov 19, 2023 @ 06:56:35.552	993	tunnel-down	759607548
Nov 19, 2023 @ 06:56:35.552	993	tunnel-down	759607548
Nov 19, 2023 @ 06:56:35.551	993	tunnel-down	759607548
Nov 19, 2023 @ 06:40:02.412	-	tunnel-up	759607548
Nov 19, 2023 @ 06:40:01.417	-	tunnel-up	759607548
Nov 19, 2023 @ 06:06:00.507	24410	tunnel-down	759607532
Nov 19, 2023 @ 06:06:00.507	24410	tunnel-down	759607532
Nov 18, 2023 @ 23:19:13.928	0	tunnel-up	759607532
Nov 18, 2023 @ 23:19:09.898	0	tunnel-up	759607532
Nov 18, 2023 @ 23:19:08.841	13268	tunnel-down	759607512
Nov 18, 2023 @ 23:19:08.841	13268	tunnel-down	759607512
Nov 18, 2023 @ 19:38:06.661	0	tunnel-up	759607512
Nov 18, 2023 @ 19:37:59.677	0	tunnel-up	759607512

I can use Logstash to avoid these "duplicates" so that they go to Elasticsearch as in the example below (for each unique TunnelID, there is only one tunnel-up event and one tunnel-down event)?

@timestamp	ConnectionDuration	Fortinet Action	TunnelID
Nov 19, 2023 @ 06:56:35.551	 993	tunnel-down	759607548
Nov 19, 2023 @ 06:40:01.417	-	tunnel-up	759607548
Nov 19, 2023 @ 06:06:00.507	24410	tunnel-down	759607532
Nov 18, 2023 @ 23:19:09.898	0	tunnel-up	759607532
Nov 18, 2023 @ 23:19:08.841	13268	tunnel-down	759607512
Nov 18, 2023 @ 19:37:59.677	0	tunnel-up	759607512

You could use a fingerprint filter to generate a document_id, so duplicated log entries will be overwritten. Combine the date, tunnel id, and action to create the fingerprint.

Hi, i tried this pipeline:

input {
  file {
    path => "/etc/logstash/conf.d/vpn.json"
    sincedb_path => "/dev/null/"
    start_position => "beginning"
  }
}

filter {
  json {
    source => "message"
  }

  if [Fortinet Action] == "tunnel-up" or [Fortinet Action] == "tunnel-down" {
    fingerprint {
      source => ["TunnelID", "Fortinet Action"]
      target => "[@metadata][fingerprint]"
      method => "SHA256"
    }

    aggregate {
      task_id => "%{[@metadata][fingerprint]}"
      code => "
        map['@timestamp'] ||= event.get('@timestamp')
        map['ConnectionDuration'] ||= event.get('ConnectionDuration')
        map['Fortinet Action'] ||= event.get('Fortinet Action')
        map['TunnelID'] ||= event.get('TunnelID')
      "
      push_map_as_event_on_timeout => true
      timeout_task_id_field => "@timestamp"
      timeout => 300
    }
  }
}

output {
  elasticsearch {
    hosts => ["https://192.168.xxx.xxx:9200"]
    index => "qradar-fortinetfw-1"
    action => "update"
    doc_as_upsert => true
    document_id => '%{[@metadata][fingerprint]}'
  }
}

It worked, but it is not indexing all the logs, only the first connection log (TunnelID), the others seem to be ignored. What can it be?

What was expected:

@timestamp	ConnectionDuration	Fortinet Action	TunnelID
Nov 19, 2023 @ 06:56:35.551	 993	tunnel-down	759607548
Nov 19, 2023 @ 06:40:01.417	-	tunnel-up	759607548
Nov 19, 2023 @ 06:06:00.507	24410	tunnel-down	759607532
Nov 18, 2023 @ 23:19:09.898	0	tunnel-up	759607532
Nov 18, 2023 @ 23:19:08.841	13268	tunnel-down	759607512
Nov 18, 2023 @ 19:37:59.677	0	tunnel-up	759607512

What was indexed:

@timestamp	ConnectionDuration	Fortinet Action	TunnelID
Nov 18, 2023 @ 23:19:08.841	13268	tunnel-down	759607512
Nov 18, 2023 @ 19:37:59.677	0	tunnel-up	759607512

Your source in the fingerprint filter is an array, if you want to use both fields as the source for the fingerprint you need to also set concatenate_sources to true.

Hi Leandro, thanks for reply, i'm a brazilian too.

Your tip worked, but i'm receiving this error logs in some events:

[2023-11-23T08:21:01,020][ERROR][logstash.javapipeline    ][qradar-fortinetfw-vpn] Pipeline worker error, the pipeline will be stopped {:pipeline_id=>"qradar-fortinetfw-vpn", :error=>"(TypeError) wrong argument type String (expected LogStash::Timestamp)", :exception=>Java::OrgJrubyExceptions::TypeError, :backtrace=>["RUBY.create_timeout_event(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-filter-aggregate-2.10.0/lib/logstash/filters/aggregate.rb:278)", "RUBY.remove_expired_maps(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-filter-aggregate-2.10.0/lib/logstash/filters/aggregate.rb:390)", "org.jruby.RubyHash.delete_if(org/jruby/RubyHash.java:2012)", "RUBY.remove_expired_maps(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-filter-aggregate-2.10.0/lib/logstash/filters/aggregate.rb:385)", "org.jruby.ext.thread.Mutex.synchronize(org/jruby/ext/thread/Mutex.java:171)", "RUBY.remove_expired_maps(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-filter-aggregate-2.10.0/lib/logstash/filters/aggregate.rb:381)", "RUBY.flush(/usr/share/logstash/vendor/bundle/jruby/3.1.0/gems/logstash-filter-aggregate-2.10.0/lib/logstash/filters/aggregate.rb:327)", "org.logstash.config.ir.compiler.AbstractFilterDelegatorExt.flush(org/logstash/config/ir/compiler/AbstractFilterDelegatorExt.java:152)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:304)"], :thread=>"#<Thread:0x27bf5be3 /usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:134 sleep>"}

I tried to use these settings:

aggregate {                                                                                                                                                                                                                                    
  task_id => "%{[@metadata][fingerprint]}"                                                                                                                                                                                                     
  code => "                                                                                                                                                                                                                                      
    map['@timestamp'] ||= LogStash::Timestamp.new(event.get('@timestamp')).to_time                                                                                                                                                                       
    map['ConnectionDuration'] ||= event.get('ConnectionDuration')                                                                                                                                                                                  
    map['Fortinet Action'] ||= event.get('Fortinet Action')                                                                                                                                                                                      
    map['TunnelID'] ||= event.get('TunnelID')                                                                                                                                                                                                  
  "                                                                                                                                                                                                                                            
  push_map_as_event_on_timeout => true                                                                                                                                                                                                         
  timeout_task_id_field => "@timestamp"                                                                                                                                                                                                        
  timeout => 60                                                                                                                                                                                                                              
}
aggregate {                                                                                                                                                                                                                                    
  task_id => "%{[@metadata][fingerprint]}"                                                                                                                                                                                                     
  code => "                                                                                                                                                                                                                                      
    map['@timestamp'] ||= LogStash::Timestamp.new(event.get('@timestamp')).to_iso8601                                                                                                                                                                      
    map['ConnectionDuration'] ||= event.get('ConnectionDuration')                                                                                                                                                                                  
    map['Fortinet Action'] ||= event.get('Fortinet Action')                                                                                                                                                                                      
    map['TunnelID'] ||= event.get('TunnelID')                                                                                                                                                                                                  
  "                                                                                                                                                                                                                                            
  push_map_as_event_on_timeout => true                                                                                                                                                                                                         
  timeout_task_id_field => "@timestamp"                                                                                                                                                                                                        
  timeout => 60                                                                                                                                                                                                                              
}

But not worked, some idea?

It seems some issues in the aggregate filter related to the @timestamp field, but I'm not sure what because I do not use the aggregate filter.

In fact, I'm not sure why you are using the aggregate filter, it seems that just having the fingerprint field would be enough to avoid duplicates.

I, like Leandro, am puzzled why you are using aggregate, but to answer this question.... when aggregate creates the event (push_map_as_event_on_timeout => true) it will create the [@timestamp] field from the map['@timestamp'] field, and that needs to be a LogStash::Timestamp, not a string. So changing this to

map['@timestamp'] ||= event.get('@timestamp')

will remove the error.

It was my mistake, I was reusing a pipeline that has some similar data. The tips from the gentlemen helped me and the problem was solved, thank you very much!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.