Query ElasticSeach with Conf Logstash

Hi,
I want to use a pipeline to do a query in elasticsearch and replace just some fields from csv input file.
I have this fields in elasticsearch: ID, State
And I want to replace the field State with CSV file
Anyone can help me?

Hi there,

can you share here a snapshot of what you have in elasticsearch and what you have in your csv? That way I'll write the pipeline once.

Thanks

In the elasticsearch the index is: test_update
and the mapping is:
"mapping": {
"properties": {
"ID": {
"type": "keyword"
},
"State": {
"type": "keyword"
}
}
}
I have this data in moment:
ID : 1
State : In progress

And in the csv to update data I have:
ID,State
1, Completed

Ok, for your next posts, please format your text in any editor (VSCode, Atom, Sublime ecc) properly spacing and indenting it, then paste it here, highlight it and click on the Preformatted text tool (image ), otherwise it'll be unreadable.

As for your question, do you want to upload all the lines contained in the csv and update those elasticsearch documents which have an ID already ingested, or do you want to simply update the already existing documents without add new ones?

This means, if in Elasticsearch you have something like this:

And in your CSV something like this:
image

Do you want only the document with id_3 to be updated or also the lines with id_4 and id_5 added?

I want all be updated..
In this case, the state of id_3 will be "state_3_new"
And the id_4 and id_5 will be add...

Ok so, admitting your documents have been ingested using the ID field as _id of the document, as it should be and as following:


(you can see the _id of the document corresponds to what is written in the ID field)

This pipeline should do what you want:

input {
  file {
    path => "/absolute_path_to_your_csv_file"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
  csv {
    separator => ","
    columns => ["ID","State"]
  }

  if [ID] == "ID" {
    drop{}
  }
}

output {
  elasticsearch {
    hosts => "localhost:9200"
    index => "test_update"
    document_id => "%{ID}"
    action => "update"
    doc_as_upsert => true
  }
}

In fact, running it, this is the result:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.