Hi All,
Good Day!
I have data in elastic search index and want to update the nested object two attributes values out of 20 attributes. Input table has n number of records and whatever matches with document_id , it should update the elastic search index with the nestedfield1, nestedfield2 values in the transx nested object. I am not sure how it can be configured through logstash config file, I have used below configuration file, but it is not working. Please help me in resolving it.
input data: sample consider for 2 records from table.
es_index_doc_id , nestedfield1, nestedfield2
tranx202103014, 8000, 9000
tranx202103015, -100, -200
existing data in es index:
"_id":"tranx202103014"
_source:{
col1:"tnx1"
col2:"parent",
transx: [
{
nestedfield1:200
nestedfield2:100
.
.
nestedfield20:"sample20"
},
{
}
}
]
},
"_id":"tranx202103015"
_source:{
col1:"tnx1"
col2:"parent",
transx: [
{
nestedfield1:200
nestedfield2:100
.
.
nestedfield20:"sample20"
},
{
}
}
]
}
Config File:
input {
jdbc {
jdbc_driver_library => "$jdbc_driver_library_path"
jdbc_driver_class => "$jdbc_driver_class"
jdbc_connection_string => "$jdbc_connection_string"
jdbc_user => "$jdbc_user"
jdbc_password => "$jdbc_password"
jdbc_fetch_size => "1000"
connection_retry_attempts => $db_retry_count
jdbc_validate_connection => true
type => "input_records"
statement => "select es_index_doc_id, nestedfield1, nestedfield2 from transaction_pipe"
}
}
filter {
if [type] != "check"{
ruby {
code => "event.set('logstash_processed_at', event.get('[@timestamp]').time.localtime.strftime('%Y-%m-%dT%H:%M:%S'))"
}
}
else {
mutate {
remove_field => ["@version", "@timestamp"]
}
}
}
output {
if [type] != "others"{
elasticsearch {
hosts => ["$es_hosts"]
index => "$INDEX_NAME"
user => "$es_user"
password => "$es_password"
manage_template => false
action => "update"
document_id => "%{es_index_doc_id}"
document_type => "daily"
}
}
else {
file {
codec => json_lines
write_behavior => overwrite
path => "${pipeline_status_file}${INDEX_NAME}_rcrdcount.txt"
}
}
}
Thanks In Advance!