Hi team,
I am new to here, i apologize for any inconvenient. I am looking for some help on my issue here, kindly assist.
My requirement is to process data from csv files located in s3 bucket using Logstash and ingest into elastic.
Few things about the csv files and its contents in s3 bucket:
a: data delimiter is pipe(|)
b: headers(column names) are not same or consistent across each csv file, so i have to use autodetect_column_names => "true" with skip_header => "true" whcih seems not working as expected as its populating some columns with names like belwo with null response.
"column41": null,
"column88": null,
"column92": null,
"column18": null
c: Need to extract a custom field name from all csv file full name(file full path in s3) using grok match and add it as a new field(EntityName) in logstash but it seems like the grok regex match right after call " mutate { copy => { "file" => "EntityName" }} " below not executing or having issues. Could someone help suggest a right way to get it done pls(more details as follows).
S3 csv file names:
s3://q1unit-au-mdl-shared/staging/inbound/dynamicEntity/Core_Source_Contact_SOURCE_EMAIL/Core_Source_Contact_SOURCE_EMAIL_20210524010953_1621861793439.csv
s3://q1unit-au-mdl-shared/staging/inbound/dynamicEntity/Core_Source_Contact_SOURCE_PHONE/Core_Source_Contact_SOURCE_PHONE_20210524010954_1621861794621.csv
The custom field that need to extract from both above csv file names as belwo respectively.
EntityName:Core_Source_Contact_SOURCE_EMAIL #string between last two forward slashes from above file name#
EntityName:Core_Source_Contact_SOURCE_PHONE #string between last two forward slashes from above file name#
csv file contents from two different files for reference:
ActiveInd|ActivityTs|FnameLnameMatchInd|GeoCd|IndivRef|LobCd|MktgEntityCd|MktgEntityRef|OrgCd|OriginTs|PanopticIndivId|ProfanityInd|SrcBirthDay|SrcBirthDt|SrcBirthMth|SrcBirthYr|SrcBusnNm|SrcFirstNm|SrcGenderCd|SrcLastNm|SrcMaritalStatusCd|SrcMiddleNm|SrcNamePrefix|SrcNameSuffix|SrcPrefLanguageCd|SrcSysCd|SrcSysKey|SrcSysNbr| |SrcSysRoleCd|SrcSysTrackNbr|SrcTitleNm|SrcUnparsedNm|StdBusnNm|StdDeceasedDobDt|StdDeceasedDt|StdDeceasedInd|StdFirstNm|StdGenderCd|StdLastNm|StdMiddleNm|StdNamePrefix|StdNameSuffix|StdTitleNm|StdUnparsedNm|TestRecordInd|SYSTEM_TRANSACTION_ID|update_sys_txn_id|source_system|DELETE_IND|realtime_update_ts|create_file_id|ext_create_ts|create_rec_nbr|unite_id
Y|2020-10-07 12:30:37|N|testgeo15946|2832578987120943115|lob|automate_Mktg11|2757500868787240973|org|2020-10-07 12:30:37||N|1|1982-03-20 19:35:00|9|1982|test24 company|test24|M|test241|M||Mr|dr|ENG|automate_SysCd11|automate_SysNbr11|automate_SysNbr11|2767146298789195777|role|track|title|test24 test241|TEST 24 CO.|||N||U|||||||T|125b90c7-a5be-aa13-163c-46b7e675e571|8feb4545-29b7-c92b-fd31-79a010441573|RT|N|2021-05-24 13:09:51.000449|d4a68fea-0eac-4173-8ed5-1766e98ee563|2021-05-24 13:09:51.703|1|2757503793412505623
Atr_array_boolean|Atr_array_float|Atr_array_name|Atr_dt|Atr_id|Atr_name|SYSTEM_TRANSACTION_ID|update_sys_txn_id|source_system|DELETE_IND|realtime_update_ts|create_file_id|sys_create_user|sys_update_user|ext_create_ts|create_rec_nbr|unite_id
|||2022-11-29 09:25:29|1||652dc9a6-63fa-979e-3056-71bb98aa75a3|d92a2a3a-9b07-46a7-4cba-ca3e937ac11a|RT|N|2023-01-19 07:03:35.000723|20b37b16-8477-4332-a16b-eb0a0a239976|||2023-01-19 07:03:42.882|1|2724778757417111553
Logstash pipeline configuration:
input {
s3 { type => "s3feed"
bucket => "elk-backup****"
interval => 60
delete => false
include_object_properties => true
tags => "s3feed"
# sincedb_path => "/dev/null"
}
}
filter {
csv {
skip_header => "true"
autodetect_column_names => "true"
separator => "|"
}
#belwo will add the csv file full path from s3 which is a key field to apply a grok regex "/([^/]*)/[^/]*$" to get string(EntityName) between last two forward slashes from file field #
mutate { add_field => { "file" => "%{[@metadata][s3][key]}" }}
# want to preserve file field so making a copy #
mutate { copy => { "file" => "EntityName" }}
#looking for a way to make below regex match string value to apply to EntityName #
grok { "match" => ["EntityName", "/([^/]*)/[^/]*$"] }
}
output {
elasticsearch
{ hosts => ["https://*ip-*:9200","https://*ip-*:9200"]
ssl => true
ssl_certificate_verification => false
cacert => "/etc/logstash/ca/ca.crt"
timeout => 300
user => "logstash_user"
password => "*****"
index => "logstash-epc-customer-sync-s3-q2unit"
}
stdout { codec => rubydebug }
}
When i test this config , i end up getting the "file" field response into "EntityName" filed as well where as i am looking for my Regex "/([^/])/[^/]$" match output to be supplied to the response of EntityName field but its not happening.
All i want is to extract a string from csv file s3 path and populate it as a new field(EntityName) in my json doc in elastic.
ex:
"EntityName": "Core_Source_Contact_SOURCE_EMAIL"
"EntityName": "Core_Source_Contact_SOURCE_PHONE"
my logic seems not correct , kindly help find the right way to do it pls.
-rj