Handle concurrency while updating different records of the same id from json file

I am loading employee data from json files to elasticSearch using logstash

The json file can have multiple records for an employee with different data e.g. addresses, languages etc.
Each of these records can have same entities with same or different values.
Goal is to reconcile all records and have only one record per employee in elastic which has data(reconciled, no duplicates) from multiple incoming json records.

Logstash processes each incoming record in different thread, based on number of workers.
This will lead to a scenario(rare but possible)

  1. employee: e1 has 4 records in json file
  2. each record is picked up by 4 logstash workers/thread at the same time,
  3. each worker/thread will try to write/update the elasticSearch index document for employee e1.
  4. however the worker/thread that finishes last, data from that record will be pushed in the index document, and data from other 3 workers/thread will be lost. (either completely or partially)

Questions:

  1. I hope this can happen, but I am not sure. so first question is, is this valid scenario?
  2. if it can happen, is there a way to get over such use case in logstash or in elasticSEarch?

Sample employee record from the elasticSearch index

{
          "id": "e1", 
          "languages" : [ "en","spn" ],
          "addresses" : [
            {
              "zip" : "12345",
              "phone" : "1234512345",
              "address_2" : "MAYO CLINIC",
              "address" : "200 1ST ST SW",
              "state" : "MN",
              "city" : "ROCHESTER"
            },
            {
              "zip" : "12346",
              "phone" : "1234512346",
              "address_2" : "a2",
              "address" : "a1",
              "state" : "MN",
              "city" : "ROCHESTER"
            },
          ],
}

From what you described your events depends on each other, so you need to run this pipeline with just one worker.

You can do that by setting pipeline.worker: 1 in your pipelines.yml.

1 Like

If there are multiple records which contain a unique record identifier then you can combine them using an aggregate filter. This requires every event go through a single pipeline worker thread, so that they are all processed by a single filter instance.

If the records for an employee all have disparate data then you could have elasticsearch combine them. That is, if there are records for languages, addresses, shoe size, age, etc., but a record never contains both shoe size and address, then you can have logstash write out a file that updates each field using doc_as_upsert and feed that file to elasticsearch using curl (or Invoke-WebRequest on Windows). A little more detail here.

Given that I implemented that using a file output and curl I have to assume that having an elasticsearch output do the equivalent doc_as_upsert does not work the way I wanted it to.

1 Like

reducing pipeline workers will impact on performance, dramatically.

single pipeline works is not an option as I am dealing with large data set

When your events depends on one another, to work with then in Logstash you are required to use just one pipeline worker.

As badger mentioned, the aggregate filter could be used, but this only works with pipeline works equals to one.

If you cannot set pipeline works to 1, then I don't think you can solve this with Logstash.

1 Like