Logstash ingestion de fichiers textes et csv entières ne fonctionne pas

Bonjour tout le monde

Je voudrais ingérer des fichiers textes et CSV en tant que fichier distincts via Logstash dans Elasticsearch. J'ai donc trouvé le paramètre "mode" avec la valeur "read", sauf que ça ne fonctionne pas, c'est à dire que Logstash n'ingère rien et n'indique aucune erreur.
L'index n'est apparemment pas créé
J'ai décoché la lecture seule pour le dossier et son contenu que je veux scanner
J'ai ajouté à la config Logstash les paramètres start_position et sincedb_path puisque j'avais trouvé ces solutions sur d'autres topics de Discuss, ça ne fonctionne pas non plus

Voici ma configuration pour Logstash :

input {
    file {
        path => "C:\logs*.txt"
        type => "logs"
        start_position => "beginning"
        sincedb_path => "C:\logs\null"
    }
}
output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "logtest"
    }
}

Pour ce qui est des versions, j'utilise Elasticsearch 7.5.1, logstash 7.5.1 et kibana 7.5.1

Merci pour votre aide

Bonjour,

Pourrais -tu nous donner un peu plus de détails notamment les logs du service Logstash. Cela nous aidera à détecter le problèmes.

A savoir que sur windows, la valeur de sincedb_path devrait etre plutot (si je dis pas des betises) : sincedb_path => "NUL"

1 Like

Bonjour,

Merci du retour.

J'ai changé la valeur de sincedb_path à NUL puis j'ai enlevé sincedb_path et start_position, même échec.
Quand je démarre Logstash, il y a premièrement Java qui détecte une "illegal reflective access operation" qui n'est pas inscripte dans le log de Logstash.

Voici le log des deux derniers lancements uniquement sachant que l'arrêt de Logstash a été causé par mon action :

[2020-02-12T15:09:16,719][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-02-12T15:09:17,244][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.5.1"}
[2020-02-12T15:09:19,375][INFO ][org.reflections.Reflections] Reflections took 58 ms to scan 1 urls, producing 20 keys and 40 values 
[2020-02-12T15:09:20,754][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2020-02-12T15:09:21,664][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2020-02-12T15:09:21,794][INFO ][logstash.outputs.elasticsearch][main] ES Output version determined {:es_version=>7}
[2020-02-12T15:09:21,804][WARN ][logstash.outputs.elasticsearch][main] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2020-02-12T15:09:21,994][INFO ][logstash.outputs.elasticsearch][main] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2020-02-12T15:09:22,044][INFO ][logstash.outputs.elasticsearch][main] Using default mapping template
[2020-02-12T15:09:22,084][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
[2020-02-12T15:09:22,094][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["C:/logstash-file-read.conf"], :thread=>"#<Thread:0x5a1e123c run>"}
[2020-02-12T15:09:22,174][INFO ][logstash.outputs.elasticsearch][main] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2020-02-12T15:09:23,304][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2020-02-12T15:09:23,374][INFO ][filewatch.observingtail  ][main] START, creating Discoverer, Watch with file and sincedb collections
[2020-02-12T15:09:23,379][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-02-12T15:09:23,984][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2020-02-12T15:11:40,463][WARN ][logstash.runner          ] SIGINT received. Shutting down.
[2020-02-12T15:11:40,528][INFO ][filewatch.observingtail  ] QUIT - closing all files and shutting down.
[2020-02-12T15:11:41,403][INFO ][logstash.javapipeline    ] Pipeline terminated {"pipeline.id"=>"main"}
[2020-02-12T15:11:41,853][INFO ][logstash.runner          ] Logstash shut down.
[2020-02-12T15:12:14,202][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2020-02-12T15:12:14,332][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"7.5.1"}
[2020-02-12T15:12:16,122][INFO ][org.reflections.Reflections] Reflections took 39 ms to scan 1 urls, producing 20 keys and 40 values 
[2020-02-12T15:12:17,303][INFO ][logstash.outputs.elasticsearch][main] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2020-02-12T15:12:17,496][WARN ][logstash.outputs.elasticsearch][main] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2020-02-12T15:12:17,603][INFO ][logstash.outputs.elasticsearch][main] ES Output version determined {:es_version=>7}
[2020-02-12T15:12:17,613][WARN ][logstash.outputs.elasticsearch][main] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2020-02-12T15:12:18,003][INFO ][logstash.outputs.elasticsearch][main] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2020-02-12T15:12:18,042][INFO ][logstash.outputs.elasticsearch][main] Using default mapping template
[2020-02-12T15:12:18,072][WARN ][org.logstash.instrument.metrics.gauge.LazyDelegatingGauge][main] A gauge metric of an unknown type (org.jruby.specialized.RubyArrayOneObject) has been create for key: cluster_uuids. This may result in invalid serialization.  It is recommended to log an issue to the responsible developer/development team.
[2020-02-12T15:12:18,082][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>8, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["C:/logstash-file-read.conf"], :thread=>"#<Thread:0x7931cabc run>"}
[2020-02-12T15:12:18,177][INFO ][logstash.outputs.elasticsearch][main] Attempting to install template {:manage_template=>{"index_patterns"=>"logstash-*", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s", "number_of_shards"=>1}, "mappings"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"=>"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}
[2020-02-12T15:12:18,672][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2020-02-12T15:12:18,725][INFO ][filewatch.observingtail  ][main] START, creating Discoverer, Watch with file and sincedb collections
[2020-02-12T15:12:18,743][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2020-02-12T15:12:19,422][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2020-02-12T15:15:28,934][WARN ][logstash.runner          ] SIGINT received. Shutting down.
[2020-02-12T15:15:29,002][INFO ][filewatch.observingtail  ] QUIT - closing all files and shutting down.
[2020-02-12T15:15:29,254][INFO ][logstash.javapipeline    ] Pipeline terminated {"pipeline.id"=>"main"}
[2020-02-12T15:15:30,241][INFO ][logstash.runner          ] Logstash shut down.

Je cherche à ingérer un fichier de log entier c'est à dire qu'au lieu de considérer ligne par ligne, Elasticsearch voit un seul enregistremement. Mais je précise que même sans le mode "read" de file input, l'ingestion ne fonctionne pas.
Je vois qu'il y a une ligne qui m'indique que le type déclaré pour le document n'est pas utilisé pour le document_type.

Vu le contenu du log, j'ai pas l'impression qu'il ait un problème.

En revanche je doute que logstash n'arrive pas à lire le fichier en input. Sur windows, il faut que

  1. tu changes les \ du path en /.
  2. Ensuite la valeur de sincedb_path => "NULL".

J'ai fait des testes en local, et ça marche bien. [Exemple] :

 input {
  file {
    path => "C:/Users/toto/tata/exemple.txt"
    start_position => "beginning"
    sincedb_path => "NULL"
  }
}
 output {
   stdout { codec => json }
}

Dans le terminal (output):

{
    "@timestamp":"2020-02-12T15:09:04.381Z",
    "@version":"1",
    "message":"sdfkgjsdmlfkgjsdmflkgjsdmlfkgj sdfmklg jdsmfl kgj\r",
    "path":"C:/Users/toto/tata/exemple.txt"
}

Autres :

=> Il s'agit d'une information.
en faite, Avant la version 6.x, les données ont été classées par type dans un index. Et depuis la version 6.x, cette notion a été supprimé.

1 Like

Merci pour l'aide. Effectivement, le code exemple fonctionne. Et l'ajout de ce code là permet de l'envoyer à ES :

 elasticsearch { hosts => ["localhost:9200"] }

Maintenant, ce que je voudrais savoir c'est comment je peux créer un seul enregistrement à partir d'un fichier ingéré ?

Qu'est ce que t'entends par "un seul enregistrement" ? Si c'est pour repartir fichier ingéré / index, il va falloir que tu joues avec des if else.

Pour définir depuis ta fichier de config le nom de l'index à créer :smile:

 output {
    elasticsearch {
        hosts => ["localhost:9200"]
        index => "%{[some_fields_id_file]}-%{+YYYY.MM.dd}"
    }
}

A savoir que la documentation est tellement bien faite ; N’hésites pas de jeter un coup d’œil.

1 Like

[Complément]

Pour ne pas mélanger les sujets, il serait bien de créer un autre ticket pour si tu as d'autres questions. Et Merci d'indiquer (button solution) dans le cas où j'ai répondu à ta question afin que ce ticket soit clos.

Effectivement, je vais probablement avoir besoin des conditions

Donc pour résumer, il fallait que je mette bien le stdout en bas de la configuration et le sincedb_path à NULL. Ainsi que des slashes / dans le chemin du fichier à la place des backslashes .

Merci pour m'avoir aidé sur ce premier sujet

Quelques remarques :

le stdout{} dans le plugin output te permet juste d'avoir une sortie des données apres transformations ou pas dans le terminal. (ça n'est pas la solution au problème cité).

Les options start_position => "beginning" et sincedb_path => "NULL" ordonnent à logsatsh qu'a chaque redémarrage de celui ci de lire les fichiers en input.

N’hésites pas de jeter un coup d’œil à la documentation.