Logstash filter Cellery JSON log with array entries


(Bogdan Stoica) #1

I have a cellery app, let's call it generic myapp. The logs are configured to be in JSON format. We receive some entries from some operators we work with and unfortunatelly, those entries, in fact a specific entry is not in JSON format. I'm trying to create a logstash filter so I can import those logs into elasticsearch, into a specific index called myapp.

The logs look like this:

 "rpc_method": "get_comm_details", "rpc_params": [52683], "timestamp": "2018-10-16T20:23:24.165372Z"}
{"event": "Task succeeded", "id": "890a084e-5ff8-4d3f-aedb-2a8688bb62ff", "level": "info", "logger": "celery.worker", "name": "myapp.tasks.RPCTask", "rpc_endpoint": "user", "rpc_method": "get_comm_details", "rpc_params": [52683], "runtime": 0.3010744289495051, "service": "core", "timestamp": "2018-10-16T20:23:24.466367Z"}


{"event": "Received task", "id": "f1b5357d-5902-447a-9bd0-b69c5d0a416f", "level": "info", "logger": "celery.worker", "name": "myapp.tasks.RPCTask", "rpc_endpoint": "wallet.payment.request", "rpc_method": "get_incoming_requests", "rpc_params": [52683], "timestamp": "2018-10-16T20:23:25.307046Z"}

{"event": "Task accepted", "id": "f1b5357d-5902-447a-9bd0-b69c5d0a416f", "level": "info", "logger": "celery.worker", "name": "myapp.tasks.RPCTask", "pid": 140559562601968, "rpc_endpoint": "wallet.payment.request", "rpc_method": "get_incoming_requests", "rpc_params": [52683], "timestamp": "2018-10-16T20:23:25.308901Z"}

{"event": "Task succeeded", "id": "f1b5357d-5902-447a-9bd0-b69c5d0a416f", "level": "info", "logger": "celery.worker", "name": "myapp.tasks.RPCTask", "rpc_endpoint": "wallet.payment.request", "rpc_method": "get_incoming_requests", "rpc_params": [52683], "runtime": 0.016890593920834363, "service": "wallet", "timestamp": "2018-10-16T20:23:25.325567Z"}

{"event": "Received task", "id": "23fb661f-a256-488a-bf22-3680fe4d4f32", "level": "info", "logger": "celery.worker", "name": "myapp.tasks.SilentRPCTask", "rpc_endpoint": "analytics.fact", "rpc_method": "add", "rpc_params": {"city_id": null, "correlation_id": "6ad37ee2-e4a8-46ab-9dad-fb3d16918fcc", "country_id": null, "dst_is_platform": true, "dst_ln": 21580012, "dst_mid": null, "dst_operator_id": null, "dst_package_id": null, "dst_user_id": null, "duration": null, "ignore_errors": true, "medium": null, "platform_event": "sms-in", "service": "wallet", "service_event": null, "service_path": null, "sms_in_size": 1, "sms_out_size": null, "src_is_platform": false, "src_ln": 647234242, "src_mid": null, "src_operator_id": 300046, "src_package_id": null, "src_user_id": 313676, "ts_end": null, "ts_start": null}, "timestamp": "2018-10-16T20:23:27.739982Z"}

As you can see, there is some rpc_params variable in the log that's in JSON format:

"rpc_params": {"city_id": null, "correlation_id": "6ad37ee2-e4a8-46ab-9dad-fb3d16918fcc", "country_id": null, "dst_is_platform": true, "dst_ln": 21580012, "dst_mid": null, "dst_operator_id": null, "dst_package_id": null, "dst_user_id": null, "duration": null, "ignore_errors": true, "medium": null, "platform_event": "sms-in", "service": "wallet", "service_event": null, "service_path": null, "sms_in_size": 1, "sms_out_size": null, "src_is_platform": false, "src_ln": 647234242, "src_mid": null, "src_operator_id": 300046, "src_package_id": null, "src_user_id": 313676, "ts_end": null, "ts_start": null}

and it's indexed correctly into elasticsearch. Unfortunatelly there are entries like:

{"event": "Task succeeded", "id": "f1b5357d-5902-447a-9bd0-b69c5d0a416f", "level": "info", "logger": "celery.worker", "name": "myapp.tasks.RPCTask", "rpc_endpoint": "wallet.payment.request", "rpc_method": "get_incoming_requests", "rpc_params": [52683], "runtime": 0.016890593920834363, "service": "wallet", "timestamp": "2018-10-16T20:23:25.325567Z"}
{"event": "Task succeeded", "id": "f1b5357d-5902-447a-9bd0-b69c5d0a416f", "level": "info", "logger": "celery.worker", "name": "myapp.tasks.RPCTask", "rpc_endpoint": "wallet.payment.request", "rpc_method": "get_incoming_requests", "rpc_params": [[77621]], "runtime": 0.016890593920834363, "service": "wallet", "timestamp": "2018-10-16T20:23:25.325567Z"}

which are not in JSON compatible format. What type of filter am I supposed to use in order to convert only the entries like: "rpc_params": [value] or "rpc_params": [[value]] into JSON or string?

I'm shipping the logs with filebeat, into a specific index called myapp. Logstash sends the logs to elasticsearch and the current logstash filter I'm using is this:

filter {
  if "myapp" in [tags] {
    json {
      source => "message"
    }
    }
  }
}

Everything works perfect, except for the entries with the "rpc_params": [value] or "rpc_params" [[value]]

I tried different scenarios:

  • I tried to convert [] and [[]] to string but it fails
  • I tried a ruby filter like:
    event.get('rpc_params').each { |v|
    event.set(v[0], v[1])
    }
    but it fails as well. The problem here is that we're talking about the exact same variable, rpc_params which one time appears in JSON format and some other times appears as an array. Unfortunatelly I can't change the logging for those specific entries and I also need those entries to be indexed into my elasticsearch.

The error message I get in logstash is this:

[2018-10-17T20:06:40,077][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"myapp-2018.10.17", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x309a7c5d>], :response=>{"index"=>{"_index"=>"myapp-2018.10.17", "_type"=>"doc", "_id"=>"WIyjg2YBsQ1pLtYXKEi8", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [rpc_params] tried to parse field [null] as object, but found a concrete value"}}}}
[2018-10-17T20:06:40,088][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"myapp-2018.10.17", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x133a2ad1>], :response=>{"index"=>{"_index"=>"myapp-2018.10.17", "_type"=>"doc", "_id"=>"WYyjg2YBsQ1pLtYXKEjN", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [rpc_params] tried to parse field [null] as object, but found a concrete value"}}}}
[2018-10-17T20:06:41,078][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"myapp-2018.10.17", :_type=>"doc", :_routing=>nil}, #<LogStash::Event:0x5660bdae>], :response=>{"index"=>{"_index"=>"myapp-2018.10.17", "_type"=>"doc", "_id"=>"Woyjg2YBsQ1pLtYXLEiv", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [rpc_params] tried to parse field [null] as object, but found a concrete value"}}}}

Is it possible to configure a mapping inside the elasticsearch index template?

Any help would be really appreciated!


(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.