Update few Attributes in Nested Object of index using logstash

Hi All,

Good Day!

I have data in elastic search index and want to update the nested object two attributes values out of 20 attributes. Input table has n number of records and whatever matches with document_id , it should update the elastic search index with the nestedfield1, nestedfield2 values in the transx nested object. I am not sure how it can be configured through logstash config file, I have used below configuration file, but it is not working. Please help me in resolving it.

input data: sample consider for 2 records from table.
es_index_doc_id , nestedfield1, nestedfield2
tranx202103014, 8000, 9000
tranx202103015, -100, -200

existing data in es index:

"_id":"tranx202103014"
_source:{
col1:"tnx1"
col2:"parent",
transx: [

{
nestedfield1:200
nestedfield2:100
.
.
nestedfield20:"sample20"
},
{
}

}
]
},
"_id":"tranx202103015"
_source:{
col1:"tnx1"
col2:"parent",
transx: [

{
nestedfield1:200
nestedfield2:100
.
.
nestedfield20:"sample20"
},
{
}

}
]
}

Config File:

input {
    jdbc {
    jdbc_driver_library => "$jdbc_driver_library_path"
    jdbc_driver_class => "$jdbc_driver_class"
    jdbc_connection_string => "$jdbc_connection_string"
    jdbc_user => "$jdbc_user"
    jdbc_password => "$jdbc_password"
    jdbc_fetch_size => "1000"
    connection_retry_attempts => $db_retry_count
    jdbc_validate_connection => true
	type => "input_records"
    statement => "select  es_index_doc_id, nestedfield1, nestedfield2 from transaction_pipe"
  }
 
}
filter {
		if [type] != "check"{
	ruby {
        code => "event.set('logstash_processed_at', event.get('[@timestamp]').time.localtime.strftime('%Y-%m-%dT%H:%M:%S'))"            
        }
	}
	else {
		mutate { 
		remove_field => ["@version", "@timestamp"]
	}
	}
}
output {
		if [type] != "others"{
        elasticsearch {
                hosts => ["$es_hosts"]
                index => "$INDEX_NAME"
                user => "$es_user"
                password => "$es_password"
		manage_template => false
                action => "update"
                document_id => "%{es_index_doc_id}"
		document_type => "daily"
        }
		}
		else {
			file {
                codec => json_lines
		write_behavior => overwrite
                path => "${pipeline_status_file}${INDEX_NAME}_rcrdcount.txt"
            }
		}		
}

Thanks In Advance!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.