How to update an existing es document and add a new value received from logstash event, to the existing array field at es at indexing time

Hello guys,

  1. I am running logstash with es as output and my logstash input looks like this:

Date time [some-fields] country_name city_name

sample input values=>
22-06-2020 02:19:27 [some-fields] US NewYork
22-06-2020 03:19:27 [some-fields] US Chicago

22-06-2020 04:19:27 [some-fields] India Mumbai
22-06-2020 04:19:27 [some-fields] India Bangalore

.....

  1. and I want doc structure on es to be as,

After processing 1st event:

{
"_index":"location"
"_id":"US",
"_source" : {
"cities":["NewYork"]
}
}

After processing 2nd event:

{
"_index":"location",
"_id":"US",
"_source" : {
"cities":["NewYork", "Chicago"]
}
}

After processing 3rd event:

{
"_index":"location",
"_id":"US",
"_source" : {
"cities":["NewYork", "Chicago"]
}
},
{
"_index":"location",
"_id":"India",
"_source" : {
"cities":["Mumbai"]
}
}

After processing 4th event:

{
"_index":"location",
"_id":"US",
"_source" : {
"cities":["NewYork", "Chicago"]
}
},
{
"_index":"location",
"_id":"India",
"_source" : {
"cities":["Mumbai", "bangalore"]
}
}

  1. Is this possible? I have tried few options in logstash's output plugin and few ingest processors at es side, but still not getting the solution.

  2. Any help would be appreciated, thanks in advance!

Is there any reason why you are not considering your document data model to have a more flattened structure like below?

{ 
"index": "location", 
"_id": "a random ID assigned by Elasticsearch"
"country": "US", 
"city": "New York"
}

Each city will be a doc of its own as opposed to stuffing them into an Array.

There are both indexing and search performance benefits in indexing this type of data model, specially if you are likely going to be indexing a large number of cities in those countries. Each update will require loading the entire Array into Logstash process' memory at the time of indexing which will cut down the indexing throughput. Likewise, on the search side, scanning the entire Array in memory to match the terms will be memory intensive as you add to the Array's length.

  1. Hey there @Rahul_Kumar4 thanks for the response, you are right this array_based structure will hamper the performance, but we do want to explore this structural side as well to perform few other things at our end (for example significant terms aggregation etc) and currenlty we do have the above mentioned "flattened structure" live, but for some of the future use cases we do want to try the new array based structure. We might keep both the structures active.

  2. And regarding this point =>

I am planning something like, at the indexing time, the logstash will just send docs in bulk to es with country name as a unique doc id fetched from an event, and
if
the doc id is new then the new doc will be created at es side with "id" as country_name and array field of "cities"
else
the existing doc will get updated and the new city value will be appended to the existing array field.

  1. so in "else" case I am not planning to read the complete array in logstash instead I am planning to append new value to the existing array field at ES side, at the time of indexing (using something like ingest pipeline etc, I want to intercept the event before indexing at es side and fetch the city_name from that event and then just append that city_name to the existing array field in ES doc on the fly. I think this might not be as bad as "loading the entire Array into Logstash process' memory" in terms of performnace right?).

  2. So can you please give me some idea or guidance on, how to create the array based structure? Thanks in advance!

To my knowledge, the append to an existing array cannot happen automatically on the Elasticsearch side in the bulk index request from Logstash. When you try to index a doc with an id that already exists (country in your case), Elasticsearch will only update the doc with new field values, which does not automatically include appending to an array field. So you will have to do this in Logstash for each insert/event:

  1. Use the elasticsearch filter plugin to first query the Elasticsearch for the matching doc id (country)

  2. Use the Array field (cities) in the matching doc from step 1 to check if the city in your event (which you are trying to index) is already there. If yes, skip it ( I assume you don't want duplicates). If no, then append to the array. You can use the Ruby filter for this.

1 Like

hey @Rahul_Kumar4 thanks for the reply, sure I'll try this! :+1:

  1. one more thing I want to ask is, I saw enrich processor, using which we can fetch some data from es index in pipeline (at ES side) and then we can add those enrich fields in the current document which we have received from logstash.
  2. but I think the enrich plugin is supported for es version 7.5 and higher and we are using es v 6.6 and can we solve my array_structure problem using this enrich plugin? any idea or word on that? the reason behind this question is I am only seeing few functionlities with this (enrich processor) and I think there is no support for script also.
    Regards,
    Amey

Yes, the enrich processor should work with the right enrich policy definitions. You could also use an Append processor in your ingest pipeline. - Append processor | Elasticsearch Guide [8.11] | Elastic

hi, @Rahul_Kumar4 thanks for the reply.

  1. I checked the Append processor, can you please verify one thing for me, I think in append processor we can only access/intercept documents passed by logstash right? we can not access documents from ES index which have already been indexed/saved at ES side and hence this might not work for us, coz I want to access logstash events as well as corrosponding ES document at the same time in the pipeline.
  2. I am currently working with ES filter plugin as suggested by you, I am having one problem with that, in case of continuous events the ES filter plugin is not updating the index and hence we can not access the country_names / doc_ids indexed just little bit earlier.
    For eg :
    22-06-2020 02:19:27 [some-fields] US NewYork
    22-06-2020 03:19:27 [some-fields] US Chicago

in this case after processing first event I got the document at ES with
id = US and cities = ["NewYork"]

but when I tried to process second event (which is US Chicago), at that time when I tried to fetch the above document from ES in "ES filter plugin" (using "_id:country_name" which is document_id) I got null value, I think the reason here is ES indexing takes some time to refresh. Do you know any possible way to refresh the index immediately from "ES filter plugin" / logstash?

Regards,
Amey

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.