Duplicate events even after introducing fingerprint

Hello All ,

We have a weird issue in ELK . We have introduced fingerprint in logstash , But still we can see the duplicates in Kibana. Kindly help how can we improve this .
input {

	beats {
		port => 5044
		tags => ["srvr_logs"]
	}

}


filter  
{

if "metric" not in [fields][type]
{
	mutate {
			remove_field => [ "host" ]
		   }

		if "SOAPRouter" in [source]
		 {
		ruby {
		code => "
            event.set('Server', event.get('source').split('\\')[4])

        "
		}

		mutate {
			add_field => { 
				"Component" => "SOAPRouter"
            }
		}
grok {
patterns_dir => [ "E:\ELK\logstash\patterns\patterns.txt" ]
match => ["message", "%{SOAPROUTER}"]
}
}

else if "RequestRouter" in [source]
{

ruby {
        code => "
            event.set('Server', event.get('source').split('\\')[4])
            event.set('InterfaceName', event.get('source').split('\\')[5])

        "
    }
mutate {

add_field => { 

		"Component" => "RequestRouter"
               }
gsub => ["InterfaceName", "RequestRouter_", ""]            
}
ruby {
        code => "
             event.set('InterfaceName', event.get('InterfaceName').split('.')[0])

        "
    }
    
grok {
patterns_dir => [ "E:\ELK\logstash\patterns\patterns.txt" ]
match => ["message", "%{REQUESTROUTER1}",
"message", "%{REQUESTROUTER2}"]
}
     
mutate
{
gsub => ["EventType", "[0-9]", ""]
gsub => ["EventType", "\[\]", ""]
remove_field => [ "NR1" ]
remove_field => [ "NR2" ]
}
ruby {
        code => "
             event.set('EventType', event.get('EventType').split(' ')[0])

        "
    }

}
else if "eappweb" not in [source] and "RequestRouter" not in [source] and "SOAPRouter" not in [source]
{
ruby {
        code => "
            event.set('Server', event.get('source').split('\\')[4])
            event.set('Component', event.get('source').split('\\')[5])
        "
    }
mutate 
{
gsub => ["Component", "_[0-9].*", ""]
}

if  "SBL" in [message]
{
grok {
patterns_dir => [ "E:\ELK\logstash\patterns\patterns.txt" ]
match => ["message", "%{SERVERLOG2}"]
}
}

else 
{
grok {
patterns_dir => [ "E:\ELK\logstash\patterns\patterns.txt" ]
match => ["message", "%{SERVERLOG1}"]
}
}
if "srvr_logs" in [tags] and "_grokparsefailure" in [tags]
{
grok {
patterns_dir => [ "E:\ELK\logstash\patterns\patterns.txt" ]
match => ["message", "%{SERVERLOG1}"]
}
}
}
else if "eappweb" in [source] 
{
 
mutate {

 rename => {
                        "[beat][hostname]" => "Server"
                }
add_field => { 

		"Component" => "eappweb"
               }
}

grok {
patterns_dir => [ "E:\ELK\logstash\patterns\patterns.txt" ]
match => ["message", "%{SERVERLOG1}"]
}

if "srvr_logs" in [tags] and "_grokparsefailure" in [tags]
{
grok {
patterns_dir => [ "E:\ELK\logstash\patterns\patterns.txt" ]
match => ["message", "%{SERVERLOG1}"]
}
}
}

if "RequestRouter" in [source] or "SOAPRouter" in [source]

{
date {
    			target => "EventDate"
 			match => [ "EventDate", "yyyy-MM-dd HH:mm:ss,SSS" ]
     }
}
else 

{
date {
    			target => "EventDate"
 			match => [ "EventDate", "yyyy-MM-dd HH:mm:ss" ]
     }
}



	fingerprint {
		source => ["message","source"]
		target => "[fingerprint]"
		key => "Compass"
		method => "MD5"
		concatenate_sources => true
	}

}
}

output {
	if "PROD" in [fields][env] and "metric" not in [fields][type]{

        elasticsearch {
            hosts => ["https:XXXXXXXXXXX:8200"]
			index => "prodsrvrlog-%{+YYYY.MM.dd}"
			user => "${es_usr}"
			password => "${es_pwd}"
			ssl => true
            ssl_certificate_verification => false
            cacert => "E:\ELK\ODForESearch\config\chain.pem"
            ilm_enabled => false
		}
		}
		
   else if "metric" in [fields][type]{

        elasticsearch {
            hosts => ["https:XXXXXXXXXX8200"]
			index => "metricbeat-%{+YYYY.MM.dd}"
			user => "${es_usr}"
			password => "${es_pwd}"
			ssl => true
            ssl_certificate_verification => false
            cacert => "E:\ELK\ODForESearch\config\chain.pem"
            ilm_enabled => false
		}
		
	}
}

Hello Saravana,

I think you are missing a last step for it to work as you expect: The fingerprint filter does not remove duplicates - it just generates a unique ID which you can use to recognise duplicates.

To remove duplicates you have to configure you elasticsearch output to explicitly use the fingerprint value as document id. This way, the document will be created if a document with this fingerprint does not exist, otherwise the document will be overwritten.

Best regards
Wolfram

Hello @Wolfram_Haussig ,

I was in an assumption that fingerprint alone can remove duplicates .Thank you so much for letting me know about this.

I have added a piece of code to the output section . Can you please check if this is sufficient ?

elasticsearch {
    hosts => ["https:XXXXXXXXXXX:8200"]
		index => "prodsrvrlog-%{+YYYY.MM.dd}"
		user => "${es_usr}"
		password => "${es_pwd}"
		ssl => true
    ssl_certificate_verification => false
    cacert => "E:\ELK\ODForESearch\config\chain.pem"
    ilm_enabled => false
		document_id => "%{fingerprint}"
	}

Hello @Saravana37,

I think this should be sufficient. I am just not sure if it works with document_id => "%{fingerprint}"or if it has to be document_id => "%{[fingerprint]}".

Best regards
Wolfram

Thank you @Wolfram_Haussig ,

You are right .

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.