Hi,
I am trying to use enrich policy for indexing the data. I have 2 indices, one is emp_master and another emp_salary. My emp_salary sheet will be updated frequently and logstash pipline will capture the information and push into elasticsearch. I want emp_salary to be enriched with the emp_master information. Below is the detail for the emp_master index-
PUT /emp_master
{
"mappings": {
"properties": {
"emp_id": {
"type": "integer"
},
"emp_name": {
"type": "text"
}
}
}
}
POST /emp_master/_bulk
{ "index": { "_index": "emp_master" } }
{ "emp_id": 1, "emp_name": "krishna" }
{ "index": { "_index": "emp_master" } }
{ "emp_id": 2, "emp_name": "seemant" }
{ "index": { "_index": "emp_master" } }
{ "emp_id": 3, "emp_name": "arindam" }
In my emp_salary index I have the below json data which I'm pushing through logstash-
{"emp_id":1, "salary":"11"}
{"emp_id":2, "salary":"12"}
{"emp_id":3, "salary":"13"}
On the basis of above, I want to get the emp_name in my emp_salary index for which I have created the below enrich policy and ingest pipeline-
PUT /_enrich/policy/emp_enrich
{
"match": {
"indices": "emp_master",
"match_field": "emp_id",
"enrich_fields": ["emp_name"]
}
}
#Execute enrich policy
PUT /_enrich/policy/emp_enrich/_execute
#Create ingest pipeline
PUT /_ingest/pipeline/emp_ingest_pipeline
{
"processors": [
{
"enrich": {
"policy_name": "emp_enrich",
"field": "emp_id",
"target_field": "employee",
"max_matches": 1
}
}
]
}
Below is my emp_salary logstash conf -
input {
file {
path => "C:/sdas/asda/fsafa/emp_salary.json"
start_position => "beginning"
codec => "json"
type => "emp_salary"
add_field => { "source" => "emp_salary" }
sincedb_path => "NUL" # Forces Logstash to process all files from the beginning
}
}
filter {
if [type] == "emp_salary" {
json {
source => "message"
target => "parsed_json_emp_salary"
}
if ([parsed_json_emp_salary][emp_id]) {
mutate {
add_field => { "emp_id" => "%{[parsed_json_emp_salary][emp_id]}" }
}
}
if ([parsed_json_emp_salary][salary]) {
mutate {
add_field => {"salary" => "%{[parsed_json_emp_salary][salary]}" }
}
}
mutate {
copy => { "emp_id" => "[@metadata][_id]" }
}
}
}
output {
if [type] == "emp_salary" {
elasticsearch {
hosts => ["http://localhost:9200"]
user => ""
password => ""
index => "emp_salary"
document_id => "%{[@metadata][_id]}"
doc_as_upsert => true
action => "update"
pipeline => "emp_ingest_pipeline"
}
stdout { codec => rubydebug }
}
}
After doing the above, I am not getting the emp_name field in the emp_salary index.
However when I use POST emp_salary/_update_by_query?pipeline=emp_ingest_pipeline
, I am getting the emp_name in the index. Kindly see the screenshot below-
Am I missing anything to set up the pipeline or policy?
Can you please suggest how enrich policy can be applied in logstash pipeline?