Hi All,
I am not able to rename the nested attribute through logstash mutate/filter/ruby codes while loading the Elasticsearch index and input data is from database. Sample input data and logstash config file given below.
Tried 3 ways to rename the attribute checknumber to check under claimCheck Nested object but nothing worked. I have listed the 3 ways that I have tried in the config file given below. Can someone help me how to rename the attribute inside the nested object for all records. Thanks in Advance for any help and suggestions !
Input Data format:
"claimCheck" : [
{
"checknumber" : "12345689"
"accountid" : "123456789",
"interestamount" : 0,
"checkdate" : null,
"ffscapitatedind" : "F",
"pendcode" : null,
"checkaddress" : null,
"penaltyamount" : null,
"datecheckcashed" : null,
"sequestratedamount" : null,
"paidto" : "123456789",
"remitnumber" : null,
"effectivedate" : null,
"aobi" : "YES",
"eftindicator" : null,
"adjudicatedamount" : 00,
},
{
"accountid" : "987543209",
"interestamount" : 0,
"checkdate" : null,
"ffscapitatedind" : "C",
"pendcode" : null,
"checkaddress" : null,
"penaltyamount" : null,
"datecheckcashed" : null,
"sequestratedamount" : null,
"paidto" : "765865432",
"remitnumber" : null,
"effectivedate" : null,
"aobi" : "YES",
"eftindicator" : null,
"adjudicatedamount" : 00,
"checknumber" : null
},
{
"accountid" : "123456789",
"interestamount" : 0,
"checkdate" : null,
"ffscapitatedind" : "A",
"pendcode" : null,
"checkaddress" : null,
"penaltyamount" : null,
"datecheckcashed" : null,
"sequestratedamount" : null,
"paidto" : "0000123456789",
"remitnumber" : null,
"effectivedate" : null,
"aobi" : "YES",
"eftindicator" : null,
"adjudicatedamount" : 00,
"checknumber" : null
}
],
"claimStatusDescription" : "DONE",
"logstash_processed_at" : "2021-04-28T10:05:01",
}
Below is logstash config file:
input {
jdbc {
jdbc_driver_library => "$jdbc_driver_library_path"
jdbc_driver_class => "$jdbc_driver_class"
jdbc_connection_string => "$jdbc_connection_string"
jdbc_user => "$jdbc_user"
jdbc_password => "$jdbc_password"
jdbc_fetch_size => "$jdbc_fetch_size"
connection_retry_attempts => $db_retry_count
jdbc_validate_connection => true
use_column_value => true
tracking_column => "etl_cre_tmst"
tracking_column_type => timestamp
clean_run => true
statement => "select data_json from db_table"
}
jdbc {
jdbc_driver_library => "$jdbc_driver_library_path"
jdbc_driver_class => "$jdbc_driver_class"
jdbc_connection_string => "$jdbc_connection_string"
jdbc_user => "$jdbc_user"
jdbc_password => "$jdbc_password"
jdbc_fetch_size => "$jdbc_fetch_size"
connection_retry_attempts => 2
jdbc_validate_connection => true
use_column_value => true
tracking_column => "etl_cre_tmst"
tracking_column_type => timestamp
clean_run => true
type => "check_count"
statement => "select 1 as SOURCE_RCD_COUNT from db_table"
}
}
filter {
if [type] != "check_count"{
#converts the string into document fields
json {
source => "data_json"
}
#avoid additional metadata fields in the index
mutate {
remove_field => ["@version", "data_json" ,"brandId" ]
#rename => { "['claimCheck']['checknumber']" => "['claimCheck']['check']"} #try1: tried rename the nested attribute checknumber to check, but it is not working.
rename => { "col1" => "column1" }
#copy => { "['claimCheck']['checknumber']" => "['claimCheck']['check']"} #try2: tried to copy the nested attribute to another attribute, but it is not working.
}
ruby {
code => "event.set('logstash_processed_at', event.get('[@timestamp]').time.localtime.strftime('%Y-%m-%dT%H:%M:%S'))"
}
ruby {
#try3: tried to rename the nested attribute (checknumber under claimCheck to check ) using ruby code as below, but it is not working.
code => "
b = []
event.get('[claimCheck]').each { |k|
k['checknumber'] = k['check']
#k.delete('checknumber') # if we uncomment this it is removing the column "checknumber" but column "check" is not getting added to the nested object claimCheck
logger.info('for each k', 'value' => k)
b << k
}
event.set('[claimCheck]', b)
"
}
}
else {
mutate {
remove_field => ["@version", "@timestamp"]
}
}
}
output {
if [type] != "check_count"{
stdout { }
elasticsearch {
hosts => ["$es_hosts"]
index => "$INDEX_NAME"
user => "$es_user"
password => "$es_password"
action => "index"
document_type => "claim"
}
}
else {
file {
codec => json_lines
write_behavior => overwrite
path => "/dir1/sample_file_rcrdcount.txt"
}
}
}
Thanks again for my leads and suggestions!