Logstash ingest and export to elasticsearch files twice

Hello!

I have a logstash config that gather lines from CSVs and then send them to Elasticsearch. However, for some unknown reason, lines are duplicated in Elasticsearch, taking twice the storage space, and making statistics wrong.

Any ideas of the cause?

Thanks

You need to share your configuration, use the preformatted button in the forum </>, and paste your configuration.

Woops sorry

input {
  file {
    path => "/srv/edm/ftp/bcp/*.csv"
    start_position => "beginning"
    ignore_older => 17280000
    sincedb_path => "/dev/null"
    codec => plain {
      charset => "ANSI_X3.4-1968"
    }
  }
}

filter {
     csv {
        separator => ";"
        columns => ["sender", "receiver", "flow_type", "end_traitement", "start_traitement", "size_end_f", "size_start_f", "format_end", "format_start", "platform_end", "platform_start", "transport_end", "transport_start", "prod"]
        skip_header => true
     }
    ruby {
        code => 'event.set("date_start_formated", event.get("start_traitement").ljust(21, "0"))'
    }
    ruby {
        code => 'event.set("date_end_formated", event.get("end_traitement").ljust(21, "0"))'
    }
    ruby {
        code => 'event.set("date_start_formated_cut", ((event.get("date_start_formated"))[0,21]))'
    }
    ruby {
        code => 'event.set("date_end_formated_cut", ((event.get("date_end_formated"))[0,21]))'
    }
   ruby {
        code => 'event.set("date_start", (event.get("date_start_formated_cut").gsub(",",".")))'
    }
    ruby {
        code => 'event.set("date_end", (event.get("date_end_formated_cut").gsub(",",".")))'
    }
    ruby {
    code => 'event.set("size_start", event.get("size_start_f").to_i)'
    }

    ruby {
    code => 'event.set("size_end", event.get("size_end_f").to_i)'
    }
    date {
    match => ["date_start", "dd/MM/yy HH:mm:ss.SSS"]
    target => "start_traitement_true"
    }
    date {
    match => ["date_end", "dd/MM/yy HH:mm:ss.SSS"]
    target => "end_traitement_true"
    }
    ruby {
    code => 'event.set("date_epoch_end", event.get("end_traitement_true").to_i)'
    }
    ruby {
    code => 'event.set("date_epoch_start", event.get("start_traitement_true").to_i)'
    }
    ruby {
    code => 'event.set("time_between", ((event.get("date_epoch_end"))-(event.get("date_epoch_start"))).abs)'
    }
    ruby {
    code => 'event.set("fixedProd", (event.get("prod")).tr("\r", ""))'
    }

   ruby {
    code => 'event.set("cat", "BCP")'
    }

    mutate {
      convert => {
         "sender" => "string"
         "receiver" => "string"
         "flow_type" => "string"
         "size_start" => "integer"
         "size_end" =>  "integer"
         "format_start" => "string"
         "format_end" => "string"
         "platform_start" => "string"
         "platform_end" => "string"
         "date_epoch_end" => "float"
         "date_epoch_start" => "float"
         "transport_start" => "string"
         "transport_end" => "string"
         "prod" => "string"
         }
    }

    prune{
      blacklist_names => ["message","date_end_formated_cut","date_start_formated_cut","date_start_formated","date_end_formated","date_start","date_end"]
    }

}

output {
  stdout{}
  elasticsearch {
    hosts => "http://XXXXXXX:9200"
    index => "index_bcp"
    user => "XXXXXXX"
    password => "XXXXXXX"
    ssl_certificate_verification => false
  }
}

Have you run the script just once?

It is launched using systemctl, and just once, yes.

Any reason to use sincedb_path as /dev/null ? This makes Logstash reads the file again if your service is restarted for some reason.

There is nothing in your configuration that could duplicate the documents unless your logstash is being restarted.

Do you have automatic reload enabled? I think that if you have automatic reload enabled and you change the pipeline configuration file, this will trigger a pipeline reload that could reread the files.

What does your pipelines.yml looks like? Do you have multiple pipelines? Are pointing to a directory with multiple configuration files?

If not /dev/null, I could set any folder?

Also I don't think it is due to a restart, since on duplicate documents, @timestamp is the exact same.

I just verified, I have just one pipeline, leading to a folder with 2 different config files. And no automatic reload as well.

The sincedb_path needs to point to a file where logstash will keep track of the position already read from the file, you can remove this option of your config and let logstash set the sincedb path itself.

Using /dev/null means that logstash does not keep track of the position in the file already read and it can read the file again on a restart.

If the @timestamp is the same, then you are right, it may not be related to a restart as this would change the @timestamp when logstash reread the file.

You didn't share your pipelines.yml, are you using multiple pipelines or just one main pipeline pointing to /etc/logstash/conf.d/*.conf? Share your pipelines.yml.

The only other way that I can think of that could create duplicate documents is if you are pointing to a folder with multiple configurations and have another Elasticsearch output indexing to the same index name.

image

Ok, and do you have any other *.conf files in the /etc/logstash/conf.d directory?

If there is other files, please share them as well.

These 2 configs :

input {
  file {
    path => "/srv/edm/ftp/not_reversed/*.csv"
    start_position => "beginning"
    ignore_older => 17280000
    sincedb_path => "/dev/null"
    codec => plain {
      charset => "ANSI_X3.4-1968"
    }
  }
}

filter {
     csv {
        separator => ";"
        columns => ["sender", "receiver", "flow_type", "start_traitement", "end_traitement", "size_start_f", "size_end_f", "format_start", "format_end", "platform_start", "platform_end", "transport_start", "transport_end", "prod"]
        skip_header => true
     }
    ruby {
        code => 'event.set("date_start_formated", event.get("start_traitement").ljust(21, "0"))'
    }
    ruby {
        code => 'event.set("date_end_formated", event.get("end_traitement").ljust(21, "0"))'
    }
    ruby {
        code => 'event.set("date_start_formated_cut", ((event.get("date_start_formated"))[0,21]))'
    }
    ruby {
        code => 'event.set("date_end_formated_cut", ((event.get("date_end_formated"))[0,21]))'
    }
   ruby {
        code => 'event.set("date_start", (event.get("date_start_formated_cut").gsub(",",".")))'
    }
    ruby {
        code => 'event.set("date_end", (event.get("date_end_formated_cut").gsub(",",".")))'
    }
    ruby {
    code => 'event.set("size_start", event.get("size_start_f").to_i)'
    }

    ruby {
    code => 'event.set("size_end", event.get("size_end_f").to_i)'
    }

    date {
    match => ["date_start", "dd/MM/yy HH:mm:ss.SSS"]
    target => "start_traitement_true"
    }
    date {
    match => ["date_end", "dd/MM/yy HH:mm:ss.SSS"]
    target => "end_traitement_true"
    }
    ruby {
    code => 'event.set("date_epoch_end", event.get("end_traitement_true").to_i)'
    }
    ruby {
    code => 'event.set("date_epoch_start", event.get("start_traitement_true").to_i)'
    }
    ruby {
    code => 'event.set("time_between", ((event.get("date_epoch_end"))-(event.get("date_epoch_start"))).abs)'
    }
    ruby {
    code => 'event.set("fixedProd", (event.get("prod")).tr("\r", ""))'
    }

   ruby {
    code => 'event.set("cat", "BCP")'
    }

    mutate {
      convert => {
         "sender" => "string"
         "receiver" => "string"
         "flow_type" => "string"
         "size_start" => "integer"
         "size_end" =>  "integer"
         "format_start" => "string"
         "format_end" => "string"
         "platform_start" => "string"
         "platform_end" => "string"
         "date_epoch_end" => "float"
         "date_epoch_start" => "float"
         "transport_start" => "string"
         "transport_end" => "string"
         "prod" => "string"
         }
    }

    prune{
      blacklist_names => ["message","date_end_formated_cut","date_start_formated_cut","date_start_formated","date_end_formated","date_start","date_end"]
    }

}

output {
  stdout{}
  elasticsearch {
    hosts => "http://XXXXXXX:9200"
    index => "index_bcp"
    user => "XXXXX"
    password => "XXXXXXX"
    ssl_certificate_verification => false
  }
}

AND

input {
  file {
    path => "/srv/edm/ftp/bcp/*.csv"
    start_position => "beginning"
    ignore_older => 17280000
    sincedb_path => "/dev/null"
    codec => plain {
      charset => "ANSI_X3.4-1968"
    }
  }
}

filter {
     csv {
        separator => ";"
        columns => ["sender", "receiver", "flow_type", "end_traitement", "start_traitement", "size_end_f", "size_start_f", "format_end", "format_start", "platform_end", "platform_start", "transport_end", "transport_start", "prod"]
        skip_header => true
     }
    ruby {
        code => 'event.set("date_start_formated", event.get("start_traitement").ljust(21, "0"))'
    }
    ruby {
        code => 'event.set("date_end_formated", event.get("end_traitement").ljust(21, "0"))'
    }
    ruby {
        code => 'event.set("date_start_formated_cut", ((event.get("date_start_formated"))[0,21]))'
    }
    ruby {
        code => 'event.set("date_end_formated_cut", ((event.get("date_end_formated"))[0,21]))'
    }
   ruby {
        code => 'event.set("date_start", (event.get("date_start_formated_cut").gsub(",",".")))'
    }
    ruby {
        code => 'event.set("date_end", (event.get("date_end_formated_cut").gsub(",",".")))'
    }
    ruby {
    code => 'event.set("size_start", event.get("size_start_f").to_i)'
    }

    ruby {
    code => 'event.set("size_end", event.get("size_end_f").to_i)'
    }
    date {
    match => ["date_start", "dd/MM/yy HH:mm:ss.SSS"]
    target => "start_traitement_true"
    }
    date {
    match => ["date_end", "dd/MM/yy HH:mm:ss.SSS"]
    target => "end_traitement_true"
    }
    ruby {
    code => 'event.set("date_epoch_end", event.get("end_traitement_true").to_i)'
    }
    ruby {
    code => 'event.set("date_epoch_start", event.get("start_traitement_true").to_i)'
    }
    ruby {
    code => 'event.set("time_between", ((event.get("date_epoch_end"))-(event.get("date_epoch_start"))).abs)'
    }
    ruby {
    code => 'event.set("fixedProd", (event.get("prod")).tr("\r", ""))'
    }

   ruby {
    code => 'event.set("cat", "BCP")'
    }

    mutate {
      convert => {
         "sender" => "string"
         "receiver" => "string"
         "flow_type" => "string"
         "size_start" => "integer"
         "size_end" =>  "integer"
         "format_start" => "string"
         "format_end" => "string"
         "platform_start" => "string"
         "platform_end" => "string"
         "date_epoch_end" => "float"
         "date_epoch_start" => "float"
         "transport_start" => "string"
         "transport_end" => "string"
         "prod" => "string"
         }
    }

    prune{
      blacklist_names => ["message","date_end_formated_cut","date_start_formated_cut","date_start_formated","date_end_formated","date_start","date_end"]
    }

}

output {
  stdout{}
  elasticsearch {
    hosts => "http://XXXXXX:9200"
    index => "index_bcp"
    user => "XXXXXXX"
    password => "XXXXXXX"
    ssl_certificate_verification => false
  }
}

Both hosts, user and password are the same.

The 2nd one is the one I already gave.

That's the issue.

Your pipeline.yml has only one pipeline pointing to /etc/logstash/conf.d/*.conf, so when Logstash starts all the files in this directory will be merged and the events will pass through every filter and output.

In reality what you have is something like this:

input {
  file {
    path => "/srv/edm/ftp/not_reversed/*.csv"
    start_position => "beginning"
    ignore_older => 17280000
    sincedb_path => "/dev/null"
    codec => plain {
      charset => "ANSI_X3.4-1968"
    }
  }
  file {
    path => "/srv/edm/ftp/bcp/*.csv"
    start_position => "beginning"
    ignore_older => 17280000
    sincedb_path => "/dev/null"
    codec => plain {
      charset => "ANSI_X3.4-1968"
    }
  }
}
filter {
    all your filters
}
output {
  stdout{}
  elasticsearch {
    hosts => "http://XXXXXXX:9200"
    index => "index_bcp"
    user => "XXXXX"
    password => "XXXXXXX"
    ssl_certificate_verification => false
  }
  stdout{}
  elasticsearch {
    hosts => "http://XXXXXX:9200"
    index => "index_bcp"
    user => "XXXXXXX"
    password => "XXXXXXX"
    ssl_certificate_verification => false
  }
}

Since you use the same index, you have your output twice, this is what get your documents duplicated.

You should isolate your pipelines changing your pipelines.yml to something similar as this:

- pipeline.id: "pipeline-one"
  path.config: "/etc/logstash/conf.d/file-one.conf"

- pipeline.id: "pipeline-two"
  path.config: "/etc/logstash/conf.d/file-two.conf

This will isolate your pipelines and you will have just one output per pipeline.

2 Likes

That was the solution! Thank you very much <3

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.