Enrich fields in new index from other index present in kibana using logstash

Hi ,

I am using logstash to create a new index from csv file which is having the index name-"therapy"

Another index is "participation" already present in kibana with other fields.

Both participation and therapy index is haiving '"id" field in common.

I am able to built a conf file to ingest therapy data to kibana.
I want some ideas/hints on how to add fields from the participation index when therapy and participation field "id" got matched. and enriched therapy docs with participation fields to the therapy index.

Thank you in advance.

Current conf file-

input {
  file {
    path => "/home/data.csv"
    start_position => "beginning"
  }
}
filter {
  csv {
      separator => ","
      skip_header => "true"
      columns => ["user_id","DeviceSerialNumber"]
  }
 elasticsearch {
              hosts => "<>"
              index => "participation"
              query => "id:\"%{[id]}\"
#Fields to add from participation index to therapy              
                    fields => { 
                    "Month" => "Month",
                    "cost" => "cost" 
                   }
        }
}
output{
stdout {}
#output {
 #  elasticsearch {
  #   hosts => "<>"
   #  index => "therapy"
 # }
}

Hi,

I checked with enrich processor to enrich fields from one index "participation" to another index "therapy" and stored consolidated fields into 3 rd index '"data-participation-therapy1"

"user_id" is the common field between two indexes.

create an enrich policy-

PUT /_enrich/policy/therapy-policy
{
  "match": {
    "indices": "therapy",
    "match_field": "user_id",
    "enrich_fields": [<.....>]
  }
}

execute that policy in order to create an enrichment index

POST /_enrich/policy/therapy-policy/_execute

{
  "status" : {
    "phase" : "COMPLETE"
  }
}
PUT /_ingest/pipeline/therapy_lookup
{
  "description" : "Enriching data-participation with therapy data",
  "processors" : [
    {
      "enrich" : {
        "policy_name": "therapy-policy",
        "field" : "user_id",
        "target_field": "tmp",
        "max_matches": "1"
      }
    },
    {
      "script": {
        "if": "ctx.tmp != null",
        "source": "ctx.putAll(ctx.tmp); ctx.remove('tmp');"
      }
    }
  ]
}
*************
{
  "acknowledged" : true
}
***************

Getting error while reindex the cosolidated data into "data-participation-therapy1" index

POST _reindex
{
  "source": {
    "index": "-participation"
  },
  "dest": {
    "index": "data-participation-therapy1",
    "pipeline": "therapy_lookup"
  }
}

output-
{"ok":false,"message":"backend closed connection"}

Ref link-https://stackoverflow.com/questions/64008094/combine-two-index-into-third-index-in-elastic-search-using-logstash

@val

backend closed connection simply means that the client (i.e. Kibana Dev Tools in your browser) timed out. But the reindex process is still ongoing in the background.

If your source index is big, odds are high that it will take longer than the timeout for the operation to terminate. So you should start your reindex to run asynchronously using

POST _reindex?wait_for_completion=false

The call will return immediately and give you a task ID which you can use to check the task status as it progresses using

GET _tasks/<task_id>

Thanks, Val for the reply.

participation index consist of 80k docs and therapy index contains 9k docs with ''user_id" field in comman.

I want to know if enrichment processor scans docs 1 to 1 for user id in both indexes.
OR Enrichment processor will scan all docs present in participation index for particular "user id" in therapy index and it will enrich stored docs in new index.

If docs get 1 to 1 that is the user id of 1st doc in therapy is check with user id participation in 1 st then hardly any docs will get enriched.

Enriching process described here-Enrichment processor working fine on demo index

The reindex API will iterate over all docs of the source index (i.e. participation) and for each will retrieve the doc with the corresponding user_id from the enrichment index (i.e. therapy) and will add what it finds to the source document and store the result in the destination index (i.e. data-participation-therapy1).

Thanks Val, It's a Success.

Index pattern showing 49 fields which is a combination of participation and therapy fields.

Two Queries-

  1. But In Discover section I am seeing only 21 fields which is from participation index.Therapy index fields is not present in data-participation-therapy1 index.
    I am able to query all fields in dev tools dsl query and mapping also showing all 49 fields.

21 fields in discovery tab of data-participation-therapy1 index.

image

Index pattern showing 49 fields-

image

2,Field "DayDateStorage" is date field in "therapy" index but in "data-participation-therapy1" it's showing text , have to changed it to date.

Update-

Both Queries are solved. I place mapping of data-participation-therapy1 index before reindexing data-participation-therapy1. Discover field showing all fields and "DayDateStorage" is converted to date type due to changes in mapping of index.

Thanks.

One Query-

As the participation and therapy index is getting updated once in a day.(live pipelines)
How to enrich the new docs into the enriched index "data-participation-therapy1" index.

Ref link-What is the best way to enrich near real-time data in ElasticSearch with batch data that may come in later? - Stack Overflow
I got this link, but still not got the clarity.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.