I am trying to process a csv file containing logs from several applications. Containing 10 columns like shown in the mapping at the end.
I run into errors while trying to parse it because the'field_8_request'
that usually contains a SQL request might be multilined. I tried using the multiline codec but problem is there is no specific pattern that each line starts with, so the codec does not seem to be the solution.
These records are associated with logs from 'ORACLE'
app, so if there is no solution I would like to get assist on how to reject all those lines and maybe put them in a new index where I can use it to create something like a rejection rate.
Below the logstash conf file:
input {
file {
path => "/path/to/csv_log_data.csv"
start_position => beginning
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
skip_header => true
columns => [
"field_1",
"field_2",
"field_3",
"field_4_app_name",
"field_5",
"field_6",
"field_7",
"field_8_request",
"field_9",
"field_10",
]
}
date {
match => [ "field_7", "dd/MM/yyyy HH:mm:ss"]
target => "field_7"
}
}
output {
elasticsearch {
index => "csv_logs_v0"
}
stdout {
codec => rubydebug
}
}
Below the mapping:
{
"mappings": {
"properties": {
"field_1": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"field_2": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"field_3": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"field_4_app_name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"field_5": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"field_6": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"field_7": {
"type": "date",
"ignore_malformed":true
},
"field_8_request": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"field_9": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"field_10": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
And here is an anonymized sample of the data since I can't share confidential details.
some_log_text,same_text_translated,,App_2,,App_2_87941,13/02/2020 10:03:40,App_2_87941,,App_2,1/2/2021 7:14,,,,,,,,,systeme
69,inconnu,,ORACLE,,ORACLE58924,10/5/2018 15:55,ORACLE58924,,ORACLE_1,1/2/2021 11:11,,,,,,,"DECLARE
line 1 sql;
BEGIN
select round(sum(some_value)/1024) into smth from db where name='name_1';
line_4_sql;
EXCEPTION
WHEN NO_DATA_FOUND THEN
line_5_sql);
END;",,systeme
stat ssl,inconnu,,App_3,,App_3_12345,14/02/2020 09:19:08,App_3_12345,,App_3,1/2/2021 7:14,,,,,,,,,systeme
97,LOGOFF,1505,ORACLE,,ORACLE235784,1/1/2018 0:00,ORACLE235784,Oracle,ORACLE_4,1/2/2021 7:15,,,,,,,,,systeme