Elasticsearch producing Duplicate entries in Kibana


I have a Windows FTP server that produces 3 different logs and they are all output in to the same folder on C: Drive. I am trying to get a separation between the logs so that I can create custom filters. I have set up a test for 2 of the logs and I am getting an issue with a duplication of files in Kibana. Everything else seems to be working ok so I am not sure why I am getting doubled up entries.

Audit Log

25 Aug 2017 00:00:08	Default Site	192.xxx.xxx.xxx	user 	LogIn	******	SSH	22

Diagnostic Log

2017-09-15 06:31:15,790 INFO SFTPConnection [Session.4502958:Default Site:user?] Client sent SSH_MSG_KEXINIT

Filbeat.yml on Windows FTP server

- input_type: log
- C:\Test\audit.log
document_type: audit
- input_type: log
- C:\Test\diagnostic.log
 document_type: diagnostic

# The Logstash hosts
hosts:["10.xxx.xxx.xxx:5043", "10.xxx.xxx.xxx:5044"]

Logstash 2 x .conf files @ /etc/logstash/conf.d/


        port => "5043"

filter {
    if [type] == "audit" {
        grok {
            patterns_dir => ["/etc/logstash/patterns"]
            match => { "message" => "%{FTP_STAMP:ftpstamp}%{DATA:LOGLEVEL}%{IPV4:client}%{SPACE}%{USERNAME:userid}%{SPACE}%{GREEDYDATA:message}" }
        date {
           match => ["ftpstamp" , "dd MMM yyyy HH:mm:ss"]
           target => ["@timestamp"]

    output {
      elasticsearch {
        hosts => ["10.xxx.xxx.xxx:9200"]
        manage_template => false
        index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
        document_type => "%{[@metadata][type]}"


        port => "5044"

filter {
    if [type] == "diagnostic" {
        grok {
            match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}%{GREEDYDATA:message}" }
        date {
           match => ["timestamp" , "yyyy-MM-dd HH:mm:ss,SSS"]
           target => ["@timestamp"]

    output {
       elasticsearch {
        hosts => ["10.xxx.xxx.xxx:9200"]
        manage_template => false
        index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
        document_type => "%{[@metadata][type]}"

Tail of Logstash log during start up

    [2017-09-15T14:06:04,423][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://10.xxx.xxx.xxx:9200/]}}
[2017-09-15T14:06:04,432][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://10.xxx.xxx.xxx:9200/, :path=>"/"}
[2017-09-15T14:06:04,597][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://10.xxx.xxx.xxx:9200/"}
[2017-09-15T14:06:04,601][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//10.xxx.xxx.xxx:9200"]}
[2017-09-15T14:06:04,609][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://10.xxx.xxx.xxx:9200/]}}
[2017-09-15T14:06:04,610][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://10.xxx.xxx.xxx:9200/, :path=>"/"}
[2017-09-15T14:06:04,616][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://10.xxx.xxx.xxx:9200/"}
[2017-09-15T14:06:04,618][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//10.xxx.xxx.xxx:9200"]}
[2017-09-15T14:06:04,859][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-09-15T14:06:05,857][INFO ][logstash.inputs.beats    ] Beats inputs: Starting input listener {:address=>""}
[2017-09-15T14:06:05,954][INFO ][logstash.inputs.beats    ] Beats inputs: Starting input listener {:address=>""}
[2017-09-15T14:06:05,957][INFO ][logstash.pipeline        ] Pipeline main started
[2017-09-15T14:06:06,173][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

JSON file from one of the duplicates

  "_index": "filebeat-2017.08.24",
  "_type": "audit",
  "_id": "AV6DaUIRTNMqag3Z2IkO",
  "_version": 1,
  "_score": null,
  "_source": {
    "offset": 153,
    "input_type": "log",
    "source": "C:\\Test\\audit.log",
    "message": [
      "25 Aug 2017 00:00:05\tDefault Site\t10.xxx.xxx.xxx\tuser\tLogIn\t******\tSSH\t22\t",
    "type": "audit",
    "userid": "user",
    "tags": [
    "@timestamp": "2017-08-24T14:00:05.000Z",
    "ftpstamp": "25 Aug 2017 00:00:05",
    "@version": "1",
    "beat": {
      "hostname": "FTP1",
      "name": "FTP1",
      "version": "5.5.2"
    "host": "FTP1",
    "client": "10.xxx.xxx.xxx",
    "fields": {
      "test": "test",
      "hosts": [
    "LOGLEVEL": "\tDefault Site\t"
  "fields": {
    "@timestamp": [
  "sort": [

Any help with this one would be much appreciated



You need conditionals around your outputs as well. Now all data goes to both defined outputs.

Is this correct?

output {
       if [type] == "audit" {
          elasticsearch {
            hosts => ["10.xxx.xxx.xxx:9200"]
            manage_template => false
            index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"
            document_type => "%{[@metadata][type]}"

If not would you be able to give an example?

Anyone have the correct setup for conditionals on the output? Everything is ok apart from Elasticsearch producing duplicates

As far as I can see that looks fine. Have you tested it?

Its not working for me.

just by adding the conditional to both .conf files I now get some kind of syntax error during the pipeline creation and I cannot work out what it is. The error seems to be at the start of the diagnostic.conf.

    [2017-09-18T10:19:10,575][ERROR][logstash.agent           ] Cannot create pipeline {:reason=>"Expected one of #, => at line 31, column 10 (byte 718) after output {\n    if [type] == \"audit\" {\n        elasticsearch {\n          hosts => [\"\"]\n          manage_template => false\n          index => \"%{[@metadata][beat]}-%{+YYYY.MM.dd}\"\n          document_type => \"%{[@metadata][type]}\"\n      }\n    }\n\ninput{\n    beats"}

It seems like you are missing matching closing braces after the output. Is that a cut and paste error or actually wrong in the file?

That is just a cut and paste. I didn't not give you the full statement. I have pasted the full statement in the messages above that but obviously it does not show the condition on the output. Here is the full statement for the diagnostic.conf with the condition inserted.

        port => "5044"                                                                     
filter {                                                                                   
    if [type] == "diagnostic" {                                                            
        grok {                                                                             
            match => { "message" => "%{TIMESTAMP_ISO8601:timestamp}%{GREEDYDATA:message}" }
        date {                                                                             
           match => ["timestamp" , "yyyy-MM-dd HH:mm:ss,SSS"]                              
           target => ["@timestamp"]                                                        
output {                                                                                   
    if [type] == "diagnostic" {                                                            
      elasticsearch {                                                                      
        hosts => ["10.xxx.xxx.xxx:9200"]                                                   
        manage_template => false                                                           
        index => "%{[@metadata][beat]}-%{+YYYY.MM.dd}"                                     
        document_type => "%{[@metadata][type]}"                                            

It looks like you still are missing a closing brace for the output block.

Thanks Christian that was the solution. I really appreciate your help.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.