Enrich policy not working in logstash pipeline

Hi,

I am trying to use enrich policy for indexing the data. I have 2 indices, one is emp_master and another emp_salary. My emp_salary sheet will be updated frequently and logstash pipline will capture the information and push into elasticsearch. I want emp_salary to be enriched with the emp_master information. Below is the detail for the emp_master index-

PUT /emp_master
{
  "mappings": {
   "properties": {
     "emp_id": {
       "type": "integer"
     },
     "emp_name": {
       "type": "text"
     }
   }
  }
}
 
POST /emp_master/_bulk
{ "index": { "_index": "emp_master" } }
{ "emp_id": 1, "emp_name": "krishna" }
{ "index": { "_index": "emp_master" } }
{ "emp_id": 2, "emp_name": "seemant" }
{ "index": { "_index": "emp_master" } }
{ "emp_id": 3, "emp_name": "arindam" }

In my emp_salary index I have the below json data which I'm pushing through logstash-

{"emp_id":1, "salary":"11"}
{"emp_id":2, "salary":"12"}
{"emp_id":3, "salary":"13"}

On the basis of above, I want to get the emp_name in my emp_salary index for which I have created the below enrich policy and ingest pipeline-

PUT /_enrich/policy/emp_enrich
{
  "match": {
   "indices": "emp_master",
   "match_field": "emp_id",
   "enrich_fields": ["emp_name"]
  }
}
 
 
#Execute enrich policy
PUT /_enrich/policy/emp_enrich/_execute 

 
#Create ingest pipeline 
PUT /_ingest/pipeline/emp_ingest_pipeline
{
  "processors": [
   {
     "enrich": {
       "policy_name": "emp_enrich",
       "field": "emp_id",
       "target_field": "employee",
       "max_matches": 1
     }
   }
  ]
}

Below is my emp_salary logstash conf -

input {
   file {
    path => "C:/sdas/asda/fsafa/emp_salary.json"
    start_position => "beginning"
    codec => "json"
    type => "emp_salary"
    add_field => { "source" => "emp_salary" }
    sincedb_path => "NUL"  # Forces Logstash to process all files from the beginning
  }
}


filter {
  if [type] == "emp_salary" {
  json {
        source => "message"
        target => "parsed_json_emp_salary"
      }
      if ([parsed_json_emp_salary][emp_id]) {        
        mutate { 
          add_field => { "emp_id" => "%{[parsed_json_emp_salary][emp_id]}" }
        }
      }
      if ([parsed_json_emp_salary][salary]) {        
        mutate { 
          add_field => {"salary" => "%{[parsed_json_emp_salary][salary]}" }
        }
      }
	  mutate {
        copy => { "emp_id" => "[@metadata][_id]" }
      }
    }
}

output {
  if [type] == "emp_salary" {
        elasticsearch {
        hosts => ["http://localhost:9200"]
        user => ""
        password => ""
        index => "emp_salary"
        document_id => "%{[@metadata][_id]}"
        doc_as_upsert => true
        action => "update"
        pipeline => "emp_ingest_pipeline"
      }
    stdout { codec => rubydebug }
  }
}

After doing the above, I am not getting the emp_name field in the emp_salary index.
However when I use POST emp_salary/_update_by_query?pipeline=emp_ingest_pipeline , I am getting the emp_name in the index. Kindly see the screenshot below-

Am I missing anything to set up the pipeline or policy?
Can you please suggest how enrich policy can be applied in logstash pipeline?

Can you confirm if emp_salary is a normal index or if it was created as data stream?

I would expect this to work if this is a normal index.

emp_salary is a normal index

Hi @leandrojmp,

Apology for tagging you directly but there has been an production issue.
I have used doc_as_upsert => true action => "update" in the pipeline but when I removed this, it is working fine.

Could this be an issue for enrichment? For my case, I must have doc_as_upsert => true action => "update" in conf, can you provide some insight here how to enrich the data keeping those parameter enabled?