I'm currently trying to add specific fields (which are defined in internal logging guidelines) to a log message. These fields are namespaced and look (in JSON) something like this:
{
...
"message": "Hello World!",
"post": {
"v1": {
"log_level": 10,
"log_level_name: "INFO"
}
}
...
}
Sending such a JSON document to logstash via JSON works as expected. But my current chain I am working on fails. The chain is:
- Process writes to log file (PostgreSQL CSV log)
- log file is monitored by file-beat and sent to logstash (with additional tag)
- logstash runs a filter over messages with the aforementioned tag
- This filter adds the aforementioned fields
- logstash sends the messages to elastic
- elastic fails with the following error
[2018-09-07T16:56:22,899][DEBUG][o.e.a.b.TransportShardBulkAction] [staging] [sdeb-logstash-oss-2018.09.07][0] failed to execute bulk item (index) index {[sdeb-logstash-oss-2018.09.07][log][AWW0iLVw_SXCZJbtqAUH]
, source[{"offset":42495577,"input_type":"log","source":"/var/log/postgresql/postgresql.csv","message":"2018-09-07 16:56:12.167 CEST,\"malbert\",\"ipbase_malbert\",2914,\"[local]\",5b926256.b62,4820,\"SELECT\",2
018-09-07 13:34:46 CEST,2/3842,0,LOG,00000,\"duration: 0.208 ms\",,,,,,,,,\"psql\"","type":"log","tags":["beats_input_codec_plain_applied"],"malbert":{"test":"2018-09-07 16:33:21"},"@timestamp":"2018-09-07T14:56
:14.221Z","postgresql":{"transaction_id":0,"process_id":2914,"error_severity":"LOG","sql_state_code":"00000","database_name":"ipbase_malbert","user_name":"malbert","query":null,"session_start_time":"2018-09-07 1
3:34:46 CEST","session_id":"5b926256.b62","session_line_num":4820,"command_tag":"SELECT","message":"duration: 0.208 ms","log_time":"2018-09-07 16:56:12.167 CEST","query_pos":null,"internal_query_pos":null,"virtu
al_transaction_id":"2/3842","connection_from":"[local]","application_name":"psql","hint":null,"context":null,"location":null,"detail":null,"internal_query":null},"post":{"v1":{"log_level_name":"DEBUG","log_level
":10}},"@version":"1","beat":{"name":"BBS-nexus.ipsw.dt.ept.lu","hostname":"BBS-nexus.ipsw.dt.ept.lu","version":"5.6.2"},"host":"BBS-nexus.ipsw.dt.ept.lu","fields":{"type":"postgres_csv"}}]}
org.elasticsearch.index.mapper.MapperParsingException: failed to parse [post.v1]
at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:298) ~[elasticsearch-5.2.2.jar:5.2.2]
at org.elasticsearch.index.mapper.DocumentParser.parseObjectOrField(DocumentParser.java:449) ~[elasticsearch-5.2.2.jar:5.2.2]
at org.elasticsearch.index.mapper.DocumentParser.parseObject(DocumentParser.java:466) ~[elasticsearch-5.2.2.jar:5.2.2]
at org.elasticsearch.index.mapper.DocumentParser.innerParseObject(DocumentParser.java:382) ~[elasticsearch-5.2.2.jar:5.2.2]
... truncated due to discourse limit ...
at org.elasticsearch.transport.TransportService$7.doRun(TransportService.java:610) [elasticsearch-5.2.2.jar:5.2.2]
at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:596) [elasticsearch-5.2.2.jar:5.2.2]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-5.2.2.jar:5.2.2]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]
Caused by: java.lang.IllegalStateException: Can't get text on a START_OBJECT at 1:1011
at org.elasticsearch.common.xcontent.json.JsonXContentParser.text(JsonXContentParser.java:88) ~[elasticsearch-5.2.2.jar:5.2.2]
And I can't quite figure out what's going wrong here. As mentioned, another process sends messages with the exact same structure to the logstash JSON input and it works without issue.
Logstash filter config:
filter {
if [fields][type] == "postgres_csv" {
csv {
target => "postgresql"
columns => [
"log_time",
"user_name",
"database_name",
"process_id",
"connection_from",
"session_id",
"session_line_num",
"command_tag",
"session_start_time",
"virtual_transaction_id",
"transaction_id",
"error_severity",
"sql_state_code",
"message",
"detail",
"hint",
"internal_query",
"internal_query_pos",
"context",
"query",
"query_pos",
"location",
"application_name"
]
convert => {
"process_id" => "integer"
"session_line_num" => "integer"
"transaction_id" => "integer"
"internal_query_pos" => "integer"
"query_pos" => "integer"
"log_time" => "date_time"
"session_start_time" => "date_time"
}
}
mutate {
add_field => {
# dummy field to select messages with a specific filter-config revision. Can be dropped later.
"[malbert][test]" => "2018-09-07 16:33:21"
}
copy => {
# NOTE: copying the same field twice causes a "duplicate key error", so we chain this here
"[postgresql][error_severity]" => "[post][v1][log_level_name]"
"[post][v1][log_level_name]" => "[post][v1][log_level]"
}
}
}
}
# After the CSV filter has run, the new fields are available and can be standardised following our business rules
filter {
if [fields][type] == "postgres_csv" {
mutate {
gsub => [
"[post][v1][log_level_name]", "(DEBUG\d|LOG)", "DEBUG",
"[post][v1][log_level]", "(DEBUG\d|LOG)", "10",
"[post][v1][log_level_name]", "NOTICE", "INFO",
"[post][v1][log_level]", "(NOTICE|INFO)", "20",
"[post][v1][log_level]", "WARNING", "30",
"[post][v1][log_level_name]", "FATAL", "ERROR",
"[post][v1][log_level]", "(FATAL|ERROR)", "40",
"[post][v1][log_level_name]", "PANIC", "CRITICAL",
"[post][v1][log_level]", "PANIC", "50"
]
}
}
}
# ... and finally converted to the proper type
filter {
if [fields][type] == "postgres_csv" {
mutate {
convert => {
"[post][v1][log_level]" => "integer"
}
}
}
}