Filebeat lit des lignes 2 fois

Bonjour,

J'accuse filebeat mais je ne suis sûr de rien. J'utilise ELK pour analyser des logs et des fichiers csv.
Pendant plusieurs semaines tout fonctionnait comme il faut mais hier en voulant ingérer des fichiers, j'ai vu que certaines lignes étaient enregistrées deux à trois dans Elastic ce qui fausse l'analyse de données. Si toute les lignes étaient enregistrées deux fois il me suffirait de diviser les resultats par deux mais comme ce sont des lignes aléatoires mes données ne sont plus exploitable.
Que puis je faire ?
Je vous remercie d'avance et vous mets mes configs ci dessous:
Filebeat:

filebeat.inputs:
- type: log
  paths:
    - PATH
  tags: ["Apache","Logs"]

- type: log
  paths:
    - PATH
  tags: ["Nginx","Logs"]

- type: log
  paths:
    - PATH
  tags: ["CA","Logs"]

- type: log
  paths:
    - PATH
  tags: ["Syslog","Logs"]

- type: log
  paths:
    - PATH
  tags: ["Iis","Logs"]
 
- type: log
  paths:
    - PATH
  tags: ["Pnor"]

- type: log
  paths:
    - PATH
  tags: ["Pnsf"]

- type: log
  paths:
    - PATH
  tags: ["Pnbo"]

- type: log
  paths:
    - PATH
  tags: ["Pnfr"]

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml

  reload.enabled: false

output.logstash:
  hosts: ["localhost:5044"]

setup.kibana:
  host: "localhost:5601"

processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

Lorsque je lance logstash:

C:\logstash-7.5.0>bin\logstash -f test2.conf
Thread.exclusive is deprecated, use Thread::Mutex
Sending Logstash logs to C:/logstash-7.5.0/logs which is now configured via log4j2.properties
[2020-06-10T08:46:39,253][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-06-10T08:46:40,663][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.5.0"}
[2020-06-10T08:46:55,264][INFO ][org.reflections.Reflections] Reflections took 637 ms to scan 1 urls, producing 20 keys and 40 values
[2020-06-10T08:47:05,501][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://127.0.0.1:9200/]}}
[2020-06-10T08:47:06,490][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://127.0.0.1:9200/"}
[2020-06-10T08:47:07,539][INFO ][logstash.outputs.elasticsearch][main] ES Output version determined {:es_version=>7}
[2020-06-10T08:47:07,539][WARN ][logstash.outputs.elasticsearch][main] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2020-06-10T08:47:07,617][INFO ][logstash.outputs.elasticsearch][main] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//127.0.0.1:9200"]}
[2020-06-10T08:47:07,742][INFO ][logstash.outputs.elasticsearch][main] Using default mapping template
[2020-06-10T08:47:07,883][INFO ][logstash.outputs.elasticsearch][main] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2020-06-10T08:47:08,520][INFO ][logstash.filters.geoip   ][main] Using geoip database {:path=>"C:/logstash-7.5.0/vendor/bundle/jruby/2.5.0/gems/logstash-filter-geoip-6.0.3-java/vendor/GeoLite2-City.mmdb"}
[2020-06-10T08:47:09,148][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
[2020-06-10T08:47:09,163][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>500, "pipeline.sources"=>["C:/logstash-7.5.0/test2.conf"], :thread=>"#<Thread:0x25aa41e6 run>"}
[2020-06-10T08:47:18,412][INFO ][logstash.inputs.beats    ][main] Beats inputs: Starting input listener {:address=>"0.0.0.0:5044"}
[2020-06-10T08:47:18,427][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2020-06-10T08:47:18,695][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-06-10T08:47:19,290][INFO ][org.logstash.beats.Server][main] Starting server on port: 5044
[2020-06-10T08:47:20,267][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}

Si vous avez besoin d'autre chose n'hésitez pas

Update:
Je cherche aussi de mon côté d'où peut venir le problème;
J'ai modifié la sortie de logstash en stdout{} et il semble qu'il n'y est pas de dédoublement.
Je pense donc que le problème se situe dans la liaison entre elastic et logstash.
Apparemment le problème se limite au csv et les logs ne sont pas impactés.

Bonjour @IneedHelp,

Peux-tu partager un exemple de documents en double? Si ca viens du csv c'est que des lignes sont en double dans tes fichiers. Y'a des solutions pour deboublonner les lignes:

1 Like

Pour être certain que cela n'arrive pas encore, tu devrais utiliser l'une des colonnes de ton fichier CSV comme _id de ton document. Ainsi, si pour quelque raison que ce soit, ton fichier est lu à nouveau, tu ne feras que écraser les valeurs existantes.

1 Like

Il y a une chose qui me pose problème avec cette notion d'id de document c'est que j'enregistre mes données avec le format d'index suivant:
squid-numerodudossier-type-format
cela permets par exemple de grouper tout les fichiers logs apache d'un même dossier.
Maintenant si j'ai plusieurs fichiers et que je définis l'id comme le numéro de ligne est que je ne risque pas de supprimer mes données après chaque ajout de fichiers?

Tu ne fais pas des index journaliers ?

Non mais si c'est nécessaire je peux le mettre en place.
Je suis en train de regarder du côté du fingerprint filter plugin pour mettre en place un hash.

Ce n'est pas nécessaire. Mais comme il s'agit de logs si j'ai bien compris, peut-être que le problème de la purge va se poser à un moment...

Dans mon idée, la purge se fait par dossier. En général, à l'ouverture d'un dossier tout les logs sont ajoutés en même temps et après il n'y a plus d'ajout donc je ne trouvais pas utile de mettre en place un système journalier. Cependant dans l'idée, cela permettra de pouvoir mieux visualiser l'activité journalière donc je vais le mettre en place.

Donc si la purge se fait par "dossier", il est inutile de faire des index journaliers puisque tu supprimeras toutes les données d'un seul coup.
Je voulais dire en fait qu'il ne faut pas utiliser la fonction Delete By Query pour faire de la purge de logs.

Pas vraiment. Un date_histogram peut très bien faire ça sur un seul index.

Finalement j'ai mis en place un script python qui va ajouter une nouvelle colonne dans mes fichiers et mettre en place un hash en fonction du nom de fichier. Ainsi pour chaque ligne j'ajoute une colonne contenant : hash-numero de ligne
Merci pour la réponse ^^