Logstash csv filter plugin, with mariadb audit plugin logs

Hi, we are develeping our log management ELK Platform, and we managed almost everything we need in order to get logs with filebeat from percona, mariadb, ssh and SAMBA4 ADC access. The only issue we are facing is that logs from mariadb audit plugin are available only in csv format, and only with comma separated fields, and the problem happens when we log queries, connects and disconnects are perfect, but queries that contains commas inside their VALUE statement , for example (FIELD1,FIELD2) VALUES (data1,data2) , cause logstash csv filter plugin to break the query itself into multiple columns, because each "comma" is seen as a field separator, we have tried with mutate plugin, it modifies the record, but not the behaviour , we cannot succeed in managing correctly the queries
Thanks in advance

can you please provide example data/payload & your configuration please. Otherwise it will be just guess on how it looks like etc

Hi,
here is our configuration and how data are INGESTED, in this case I have used mutate BEFORE the filter to convert commas with semicolon, , I've tried also AFTER the filter, obtaining the "message" filed apparently correct, but the object is interupted and broken in multiple columns exactly like this.

filter {
mutate {
gsub => [
'message', ",", ";"
]
}
csv {
separator => ";"
columns => [ "timestamp","serverhost","username","host","connectionid","queryid","operation","database","object","retcode" ]
}

}

{
"_index": "logstash-2020.07.28",
"_type": "doc",
"_id": "_WiplXMBt5BZnqx7LsLv",
"_version": 1,
"_score": null,
"_source": {
"beat": {
"hostname": "mbc-db-vodafone",
"version": "6.3.2",
"name": "mbc-db-vodafone"
},
"connectionid": "464232271",
"column28": " \'FISSI E MOBILI NAZIONALI SENZA LIMITI\'",
"column41": " \'NO\')'",
"column27": " \'13\'",
"column29": " \'-\'",
"source": "/var/lib/mysql/server_audit.log",
"input": {
"type": "log"
},
"column19": " iccid_sim",
"column33": " \'-\'",
"column34": " \'\'",
"column21": " piano_telefonico_sim",
"@timestamp": "2020-07-28T13:43:23.779Z",
"column11": " id_promo",
"offset": 3792527,
"prospector": {
"type": "log"
},
"message": "20200728 15:43:23;mbc-db-vodafone;root;192.168.2.165;464232271;9113436875;QUERY;vodafone_en;'INSERT INTO tb_promo_attivate (id_customer; id_phone_calls; id_promo; descrizione_promo; costo_promo; id_agent_crm; nome_agent; telefono_sconto; tipo_scelta_sim; numero_sim; iccid_sim; vecchio_gestore_sim; piano_telefonico_sim; data_inserimento; campagna;tipo; scelta_rete_sicura)\n\t\tVALUES (\'13285100\'; \'0\'; \'13\'; \'FISSI E MOBILI NAZIONALI SENZA LIMITI\'; \'-\'; \'83\'; \'Ledda Marcellino\'; \'\'; \'-\'; \'\'; \'\'; \'-\'; \'-\'; CURRENT_TIMESTAMP; \'Richiami su Numeri Uscenti C2C 4u Italia\';\'DSL\'; \'NO\')';0",
"username": "root",
"column16": " telefono_sconto",
"column15": " nome_agent",
"column14": " id_agent_crm",
"column26": " \'0\'",
"column13": " costo_promo",
"serverhost": "mbc-db-vodafone",
"column38": " CURRENT_TIMESTAMP",
"tags": [
"beats_input_codec_plain_applied"
],
"retcode": " id_phone_calls",
"object": "'INSERT INTO tb_promo_attivate (id_customer",
"column23": " campagna",
"@version": "1",
"column31": " \'XXXXXXXXXXX\'",
"queryid": "9113436875",
"column32": " \'\'",
"operation": "QUERY",
"column25": " scelta_rete_sicura)\n\t\tVALUES (\'13285100\'",
"column40": "\'DSL\'",
"column37": " \'-\'",
"database": "vodafone_en",
"column22": " data_inserimento",
"column35": " \'\'",
"column20": " vecchio_gestore_sim",
"column24": "tipo",
"column30": " \'83\'",
"column42": "0",
"host": "192.168.2.165",
"timestamp": "20200728 15:43:23",
"column39": " \'Richiami su Numeri Uscenti XXXXXXXX\'",
"column17": " tipo_scelta_sim",
"column36": " \'-\'",
"column18": " numero_sim",
"column12": " descrizione_promo"
},
"fields": {
"@timestamp": [
"2020-07-28T13:43:23.779Z"
]
},
"highlight": {
"message": [
"20200728 15:43:23;mbc-db-vodafone;root;192.168.2.165;464232271;9113436875;QUERY;vodafone_en;'@kibana-highlighted-field@INSERT@/kibana-highlighted-field@ INTO tb_promo_attivate (id_customer; id_phone_calls; id_promo; descrizione_promo; costo_promo; id_agent_crm; nome_agent; telefono_sconto; tipo_scelta_sim; numero_sim; iccid_sim; vecchio_gestore_sim; piano_telefonico_sim; data_inserimento; campagna;tipo; scelta_rete_sicura)\n\t\tVALUES (\'13285100\'; \'0\'; \'13\'; \'FISSI E MOBILI NAZIONALI SENZA LIMITI\'; \'-\'; \'83\'; \'XXXXXXXXX\'; \'\'; \'-\'; \'\'; \'\'; \'-\'; \'-\'; CURRENT_TIMESTAMP; \'Richiami su Numeri Uscenti XXXXXXX\';\'DSL\'; \'NO\')';0"
],
"operation": [
"@kibana-highlighted-field@QUERY@/kibana-highlighted-field@"
],
"object": [
"'@kibana-highlighted-field@INSERT@/kibana-highlighted-field@ INTO tb_promo_attivate (id_customer"
]
},
"sort": [
1595943803779
]
}

If a field is surrounded by double quotes then it is allowed to contain the separator. So you would need your data to look like

20200728 15:43:23;mbc-db-vodafone;root;192.168.2.165;464232271;9113436875;QUERY;vodafone_en;"INSERT INTO tb_promo_attivate (id_customer; id_phone_calls; id_promo; descrizione_promo; costo_promo; id_agent_crm; nome_agent; telefono_sconto; tipo_scelta_sim; numero_sim; iccid_sim; vecchio_gestore_sim; piano_telefonico_sim; data_inserimento; campagna;tipo; scelta_rete_sicura)\n\t\tVALUES (\'13285100\'; \'0\'; \'13\'; \'FISSI E MOBILI NAZIONALI SENZA LIMITI\'; \'-\'; \'83\'; \'Ledda Marcellino\'; \'\'; \'-\'; \'\'; \'\'; \'-\'; \'-\'; CURRENT_TIMESTAMP; \'Richiami su Numeri Uscenti C2C 4u Italia\';\'DSL\'; \'NO\')";0

with double quotes around the SQL. The following mutate will replace any single quote not preceded by a backslash with a double quote. That reduces the number of columns that the csv filter produces from 42 to 9.

    mutate { gsub => [ "message", "(?<!\\)'", '"' ] }
    csv { separator => ";" }

Great ! It works, I still get some parse error (warning) in the log stash log file, but my "json formatted" records are there, available also with kibana :slight_smile:
here is late in Italy, tomorrow I will check better if these warnings are minor issues and close the post !
thank yoo so much!

can you please explain me lucene syntax ?
ok for ( .. ) which create a group, the ? means 0 or more , the ! preceding "\" means not a "Literal" "" , I don't understand the meaning of "<" character, for what I've read, it's normally user for numeric ranges ( for example <0-10> ) , but without it's "closing" character ">" I can't understand exactly the regexp, it works, but as I told yesterday I have some minor error in the logs, and I'm wondering if these can be related to this !
Thanks in advance

It's nothing to do with lucene. It's a ruby regular expression. This article discusses negative lookbehind assertions.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.