Duplicate entries in kibana from logstash

Hello there,
Today I just realized that all my cisco vpn entries are duplicated. I double checked the code and could not find any reason. Can someone help me?

Follow the pics:
session
session_1_1
session_1_2

Really appreciate for the attention.

All entries or just some?
Are you specifying the document_id in your Elasticsearch output?
Are you using rollover indices?

If you properly configure the use of document_id, duplicate stored events will be eliminated. Check out this article for more information: https://www.elastic.co/blog/efficient-duplicate-prevention-for-event-based-data-in-elasticsearch

If you are using rollover indices, you will still see event duplication even when using the methods specified in the link. This is because document ids are only unique to a given index.

@wwalker all entries.
Regarding about the document_id, never used before. So it may be the problem.

Any change you could have a look on my code and point me out what needs to be done?

input {
 udp { 
 port => 5514
 type => "cisco-fw"
 }
}

filter {
 ####### Cisco FW ####
 if [type] == "cisco-fw" {
 grok {
 named_captures_only => true
 match => ["message", "%{CISCO_TAGGED_SYSLOG} %{GREEDYDATA:cisco_message}",
       "inside:%{HOSTNAME:inside_host}/%{NUMBER:inside_port} to outside:%{HOSTNAME:outside_host}/%{NUMBER:outside_port}",
	   "discarded from %{HOSTNAME:inside_host}/%{NUMBER:inside_port} to outside:%{HOSTNAME:outside_host}/%{NUMBER:outside_port}"]
 }
 # Parse the syslog severity and facility
 syslog_pri { }


 grok {
 patterns_dir => "/etc/logstash/conf.d/patterns/"
 match => [
 "cisco_message", "%{CISCOFW106001}",
 "cisco_message", "%{ESGCISCOFW113019}",
 "cisco_message", "%{CISCOFW106006_106007_106010}",
 "cisco_message", "%{CISCOFW106014}",
 "cisco_message", "%{CISCOFW106015}",
 "cisco_message", "%{CISCOFW106021}",
 "cisco_message", "%{CISCOFW106023}",
 "cisco_message", "%{CISCOFW106100}",
 "cisco_message", "%{CISCOFW110002}",
 "cisco_message", "%{CISCOFW302010}",
 "cisco_message", "%{CISCOFW302013_302014_302015_302016}",
 "cisco_message", "%{CISCOFW302020_302021}",
 "cisco_message", "%{CISCOFW305011}",
 "cisco_message", "%{CISCOFW313001_313004_313008}",
 "cisco_message", "%{CISCOFW313005}",
 "cisco_message", "%{CISCOFW402117}",
 "cisco_message", "%{CISCOFW402119}",
 "cisco_message", "%{CISCOFW419001}",
 "cisco_message", "%{CISCOFW419002}",
 "cisco_message", "%{CISCOFW500004}",
 "cisco_message", "%{CISCOFW602303_602304}",
 "cisco_message", "%{CISCOFW710001_710002_710003_710005_710006}",
 "cisco_message", "%{CISCOFW713172}",
 "cisco_message", "%{CISCOFW733100}"
 ]
 }
 
 #If CiscoTag is ASA-4-722051
 if [type] == "cisco-fw" and [ciscotag] == "ASA-4-722051" {
 grok {
 match => ["cisco_message", "%{DATA:Group}\s<%{DATA:Policy}> User\s<%{DATA:[event_data][TargetUserName]}> IP\s<%{IPV4:src_ip}> IPv4 Address <%{IPV4:assigned_ip}%{GREEDYDATA:extra_field}"]
 }
}

  #If CiscoTag is ASA-6-722055
 if [type] == "cisco-fw" and [ciscotag] == "ASA-6-722055" {
 grok {
 match => ["cisco_message", "%{DATA:Group}\s<%{DATA:Policy}> User\s<%{DATA:[event_data][TargetUserName]}> IP\s<%{IPV4:src_ip}> Client Type:%{GREEDYDATA:VPN_Client}"]
 }
}

 #If CiscoTag is ASA-6-722055 - Authentication Rejected
 if [type] == "cisco-fw" and [ciscotag] == "ASA-6-113005" {
 grok {
 match => ["cisco_message", "%{DATA:Group}\s%{GREEDYDATA:Policy} : reason =\s%{DATA:Reason} : server =\s%{IPV4:ServerIP} : user =\s%{DATA:[event_data][TargetUserName]} : user IP =\s%{IPV4:src_ip}"]
 }
}
 
 

if [type] == "cisco-fw" {
 geoip {
 add_tag => [ "GeoIP" ]
 database => "/etc/logstash/GeoLite2-City.mmdb" ### Change me to location of GeoLiteCity.dat file
 source => "src_ip"
 }

 if [geoip][city_name] == "" { mutate { remove_field => "[geoip][city_name]" } }
 if [geoip][continent_code] == "" { mutate { remove_field => "[geoip][continent_code]" } }
 if [geoip][country_code2] == "" { mutate { remove_field => "[geoip][country_code2]" } }
 if [geoip][country_code3] == "" { mutate { remove_field => "[geoip][country_code3]" } }
 if [geoip][country_name] == "" { mutate { remove_field => "[geoip][country_name]" } }
 if [geoip][latitude] == "" { mutate { remove_field => "[geoip][latitude]" } }
 if [geoip][longitude] == "" { mutate { remove_field => "[geoip][longitude]" } }
 if [geoip][postal_code] == "" { mutate { remove_field => "[geoip][postal_code]" } }
 if [geoip][region_name] == "" { mutate { remove_field => "[geoip][region_name]" } }
 if [geoip][time_zone] == "" { mutate { remove_field => "[geoip][time_zone]" } }
 }


 
 date {
 match => ["timestamp",
 "MMM dd HH:mm:ss",
 "MMM d HH:mm:ss",
 "MMM dd yyyy HH:mm:ss",
 "MMM d yyyy HH:mm:ss"
 ]
 }
 }
 ###### End of Cisco FW #######
}

output {
if [type] == "cisco-fw" and ([ciscotag] == "ASA-4-722051" or [ciscotag] == "ASA-6-722055" or [ciscotag] == "ASA-6-113005" or [ciscotag] == "ASA-6-722023" or [ciscotag] == "ASA-4-113019") {
elasticsearch {
hosts => ["http://localhost:9200"]
    user => "${ES_USER}"
    password => "${ES_PWD}"
    index => "logstash-ciscovpn-%{+yyyy.MM.dd}"

  }
 }

else if [type] == "heythere" {
 elasticsearch {
hosts => ["http://localhost:9200"]
    user => "${ES_USER}"
    password => "${ES_PWD}"
    index => "logstash-cisco-%{+xxxx.ww}"

  }
}

} 

Thanks for the attention.

Try running logstash with --config.debug --log.level debug --config.test_and_exit and review the debug logs to see if the configuration is what you expect, and that you are not loading a second copy of the configuration by mistake.

@Badger here´s the output:

[2020-05-13T18:32:43,483][DEBUG][logstash.runner          ] --------------- Logstash Settings -------------------
[2020-05-13T18:32:43,532][DEBUG][logstash.config.source.multilocal] Reading pipeline configurations from YAML {:location=>"/etc/logstash/pipelines.yml"}
[2020-05-13T18:32:43,580][DEBUG][logstash.config.source.multilocal] Reading pipeline configurations from YAML {:location=>"/etc/logstash/pipelines.yml"}
[2020-05-13T18:32:43,651][DEBUG][logstash.config.source.local.configpathloader] Skipping the following files while reading config since they don't match the specified glob pattern {:files=>["/etc/logstash/conf.d/patterns"]}
[2020-05-13T18:32:43,673][DEBUG][logstash.config.source.local.configpathloader] Reading config file {:config_file=>"/etc/logstash/conf.d/ciscovpn.conf"}
[2020-05-13T18:32:43,697][DEBUG][logstash.config.pipelineconfig] -------- Logstash Config ---------
[2020-05-13T18:32:43,699][DEBUG][logstash.config.pipelineconfig] Config from source {:source=>LogStash::Config::Source::MultiLocal, :pipeline_id=>:main}
[2020-05-13T18:32:43,702][DEBUG][logstash.config.pipelineconfig] Config string {:protocol=>"file", :id=>"/etc/logstash/conf.d/ciscovpn.conf"}
[2020-05-13T18:32:43,704][DEBUG][logstash.config.pipelineconfig] 

[2020-05-13T18:32:43,708][DEBUG][logstash.config.pipelineconfig] Merged config
[2020-05-13T18:32:43,712][DEBUG][logstash.config.pipelineconfig] 

OK, and what does the merged config look like?

@Badger if you can, have a look on the debug output here:

Logstash Debug Log

OK, go and read the merged config. You have two sets of inputs, one of which is probably logging a "port in use" message". Also two sets of filters, then two outputs that write to the same destination. That should result in two copies of each event.

If you pointed path.config at a directory (possibly using -f) then logstash will concatenate all the files in the directory into a single configuration. If, for example, the directory contains a "logstash.conf" and a "logstash.conf.backup" then logstash will concatenate them both, run all the filters and send identical events through both outputs.

@Badger That´s the thing, there´s only one config. Have a look:

logstash

Also i´ve done the whole config from scratch and still duplicanting the values!!! OMG!! :frowning:

Follow the config as well:

input {
 udp { 
 port => 5514
 type => "cisco-fw"
 }
}

filter {
 ####### Cisco FW ####
 if [type] == "cisco-fw" {
 grok {
 named_captures_only => true
 match => ["message", "%{CISCO_TAGGED_SYSLOG} %{GREEDYDATA:cisco_message}"]
 }
 # Parse the syslog severity and facility
 syslog_pri { }


 grok {
 patterns_dir => "/etc/logstash/conf.d/patterns/"
 match => [
 "cisco_message", "%{CISCOFW106001}",
 "cisco_message", "%{ESGCISCOFW113019}",
 "cisco_message", "%{CISCOFW106006_106007_106010}",
 "cisco_message", "%{CISCOFW106014}",
 "cisco_message", "%{CISCOFW106015}",
 "cisco_message", "%{CISCOFW106021}",
 "cisco_message", "%{CISCOFW106023}",
 "cisco_message", "%{CISCOFW106100}",
 "cisco_message", "%{CISCOFW110002}",
 "cisco_message", "%{CISCOFW302010}",
 "cisco_message", "%{CISCOFW302013_302014_302015_302016}",
 "cisco_message", "%{CISCOFW302020_302021}",
 "cisco_message", "%{CISCOFW305011}",
 "cisco_message", "%{CISCOFW313001_313004_313008}",
 "cisco_message", "%{CISCOFW313005}",
 "cisco_message", "%{CISCOFW402117}",
 "cisco_message", "%{CISCOFW402119}",
 "cisco_message", "%{CISCOFW419001}",
 "cisco_message", "%{CISCOFW419002}",
 "cisco_message", "%{CISCOFW500004}",
 "cisco_message", "%{CISCOFW602303_602304}",
 "cisco_message", "%{CISCOFW710001_710002_710003_710005_710006}",
 "cisco_message", "%{CISCOFW713172}",
 "cisco_message", "%{CISCOFW733100}"
 ]
 }
 
if [type] == "cisco-fw" {
 geoip {
 add_tag => [ "GeoIP" ]
 database => "/etc/logstash/GeoLite2-City.mmdb" ### Change me to location of GeoLiteCity.dat file
 source => "src_ip"
 }

 if [geoip][city_name] == "" { mutate { remove_field => "[geoip][city_name]" } }
 if [geoip][continent_code] == "" { mutate { remove_field => "[geoip][continent_code]" } }
 if [geoip][country_code2] == "" { mutate { remove_field => "[geoip][country_code2]" } }
 if [geoip][country_code3] == "" { mutate { remove_field => "[geoip][country_code3]" } }
 if [geoip][country_name] == "" { mutate { remove_field => "[geoip][country_name]" } }
 if [geoip][latitude] == "" { mutate { remove_field => "[geoip][latitude]" } }
 if [geoip][longitude] == "" { mutate { remove_field => "[geoip][longitude]" } }
 if [geoip][postal_code] == "" { mutate { remove_field => "[geoip][postal_code]" } }
 if [geoip][region_name] == "" { mutate { remove_field => "[geoip][region_name]" } }
 if [geoip][time_zone] == "" { mutate { remove_field => "[geoip][time_zone]" } }
 }


 # Parse the date
 date {
 match => ["timestamp",
 "MMM dd HH:mm:ss",
 "MMM d HH:mm:ss",
 "MMM dd yyyy HH:mm:ss",
 "MMM d yyyy HH:mm:ss"
 ]
 }
 }
 ###### End of Cisco FW #######
}

output {
    elasticsearch {
    hosts => ["http://localhost:9200"]
    user => "${ES_USER}"
    password => "${ES_PWD}"
    index => "logstash-ciscovpn-%{+yyyy.MM.dd}"

  }
 }

Final Logstash Debug

Just find out what went wrong!!!! My input was receiving from 0.0.0.0 on port 5514 and I had 2 firewalls sending the same syslog. That´s why the duplicated entries.

Thanks @Badger and @wwalker for the attention. This way I had no choice then look everywhere what´s causing the problem.

Really appreciate.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.