Reading Data From Kafka and use filter json fails with ParserError

Version: 6.2.3

Operating System: RHEL

Config File :

    input {
        kafka {
            bootstrap_servers => "127.0.0.1:9092"
            auto_offset_reset => earliest
            topics => ["test"]
            group_id => "test"
        }
    }

    filter {
      json {
        source => "message"
      }
    }

    filter {
        if ([fields][topic]=="test") {
            grok {
                match => ["message", "%{DATE:Date}\s+%{TIME:Time}\s+Caller=(%{DATA:Caller})?\s+Operator=(%{DATA:Operator})?\s+Message=(%{DATA:Message})?\s+Version=(%{DATA:Version})?\s+Service=(?<Service>(%{DATA}%{SPACE}%{DATA})|(%{DATA}))?\s+Server=(%{DATA:Server})?\s+KeyCnt=(%{NUMBER:KeyCnt:int})?\s+Key=(%{DATA:Key})?\s+(MsgId=%{DATA:MsgId}\s+)?ElapsedTime=(%{NUMBER:ElapsedTime:int})?"]
        }
      }
    }

Sample Data:
05/15/2018 13:21:28:464 Caller=WORK Operator=work Message=GetRelationship Version=005 Service=RelationshipServer Server=localhost KeyCnt=1 Key=D1231231-3-2, MsgId=6e3e3f84-3f49-496d-a287-3b277a598538 ElapsedTime=64

Error:

[2018-05-16T00:44:29,119][WARN ][logstash.filters.json ] Error parsing json {:source=>"message", :raw=>"05/15/2018\t13:21:28:464\tCaller=WORK\tOperator=work\tMessage=GetRelationship\tVersion=005\tService=RelationshipServer\tServer=localhost\tKeyCnt=1\tKey=D1231231-3-2,\tMsgId=6e3e3f84-3f49-496d-a287-3b277a598538\tElapsedTime=64", :exception=>#<LogStash::Json::ParserError: Invalid numeric value: Leading zeroes not allowed

Steps to Reproduce:

Filebeat reads the log files, and publishes to topic in kafka. Logstash is reading the kafka topic, and the filter is applied. During the filter, json throws an error.

When kafka is not used, and filebeat sends directly to logstash there is no failure.

Cannot go to the log/app owner and ask for correction.

How do we make logstash parse the data as string?

That is not valid JSON. I would handle that event with dissect and kv. The separators in dissect and the field_split character are tabs.

  dissect { mapping => { "message" => "%{ts}    %{+ts}  %{restOfLine}" } }
  kv { source => "restOfLine"  field_split => " " }

@Badger Yep, my Bad. The Data which i had pasted was RAW which filebeat is pushing. Kafka adds the json parser, and logstash fails on filter.

OK, so with the kafka input and no filters, what does output { stdout { codec => rubydebug } } look like?

@Badger below is the raw data logstash is reading from kafka. Output from rubydebug

{
"@timestamp" => 2018-05-19T22:56:24.065Z,
"message" => "{"@timestamp":"2018-05-19T22:56:11.222Z","@metadata":{"beat":"filebeat","type":"doc","version":"6.2.3","topic":"test"},"fields":{"logsource":"test","topic":"test"},"prospector":{"type":"log"},"beat":{"name":"elk","hostname":"elk","version":"6.2.3"},"source":"/var/log/dummy.log","offset":215,"message":"05/15/2018\t13:21:28:464\tCaller=WORK\tOperator=work\tMessage=GetRelationship\tVersion=005 Service=RelationshipServer\tServer=localhost\tKeyCnt=1\tKey=D1231231-3-2,\tMsgId=6e3e3f84-3f49-496d-a287-3b277a598538\tElapsedTime=64"}",
"@version" => "1"
}

A json filter turns that into

{
    "prospector" => {
        "type" => "log"
    },
        "source" => "/var/log/dummy.log",
    "@timestamp" => 2018-05-19T22:56:11.222Z,
        "fields" => {
            "topic" => "test",
        "logsource" => "test"
    },
       "message" => "05/15/2018\t13:21:28:464\tCaller=WORK\tOperator=work\tMessage=GetRelationship\tVersion=005 Service=RelationshipServer\tServer=localhost\tKeyCnt=1\tKey=D1231231-3-2,\tMsgId=6e3e3f84-3f49-496d-a287-3b277a598538\tElapsedTime=64",
        "offset" => 215,
          "beat" => {
            "name" => "elk",
         "version" => "6.2.3",
        "hostname" => "elk"
    }
}

I can reproduce the error using

  json { source => message }
  json { source => message }

i.e. over-writing the message field with a string that is not JSON, then trying to parse that with a json filter. What does rubydebug produce if you remove the json filter?

I do not have the filter when rubydebug was enabled. @Badger any specific setting i need to add to make the message a string and try json filer

I don't think you need a json filter. The fact that message refers to "05/15/2018\t13:21:28:464\tCaller=WORK\tOperator=work\tMessage=GetRelationship\tVersion=005 Service=RelationshipServer\tServer=localhost\tKeyCnt=1\tKey=D1231231-3-2,\tMsgId=6e3e3f84-3f49-496d-a287-3b277a598538\tElapsedTime=64" indicates it is already being parsed as JSON.

@Badger i tried reading the JSON msg directly, but it failed as we have a grok pattern.
Basically, Extracting the message out from Kafka Message using JSON filter, and then grok pattern the result.

What needs to be done to stop the JSONparse failure? any suggestion?

If you are talking about this

Error parsing json {:source=>"message", :raw=>"05/15/2018\t13:21:28:464\tCaller=WORK\tOperator=work\tMessage=GetRelationship\tVers[...]

The answer is to remove the json filter. The incoming JSON has already been parsed and does not need to be parsed a second time. If you are talking about some other error please show the error, the configuration that produces it, and the 'stdout { codec => rubydebug }' output.

@Badger I have been trying different options. And what i see is, if we remove json filter, when inserted into elastic, key-values do not show up. It is the complete json message itself.

{"@timestamp":"2018-06-04T20:01:55.429Z","@metadata":{"beat":"filebeat","type":"doc","version":"6.2.3","topic":"dummy"},"prospector":{"type":"log"},"fields":{"topic":"dummy","logsource":"dummy"},"beat":{"name":"elk","hostname":"elk","version":"6.2.3"},"message":"04/24/2018\t00:00:18:390\tCaller=WORK\tOperator=work\tMessage=GetClientPreference\tVersion=v7\tService=ClientPreferenceService\tServer=localhost\tKeyCnt=1\tKey=2/32115004\tMsgId=438cc172-f99b-4cf0-83a0-2ff48d0f44ce\tElapsedTime=28","source":"/var/log/dummy.log","offset":913}

Any better way to get the json key-value into elastic.

What does 'stdout { codec => rubydebug }' show?

@Badger adding the json filter as
source => "[message]" was part of the solution. DO not know by adding the Brackets helped in resolving.

Also noticed, if we have multiple conf file with json filter, and if any one of the conf file would fail parsing , rest would fail.

Example:
app1.conf which has a JSON filter
app2.conf which has GROK and JSON.

If app2.conf fails on the filter, it would throw jsonparsefailure.

How do we resolve Filter sequence? And how does logstash read multiple ".conf" files?

This is commonly misunderstood. Unless you are using pipelines.yml to point individual pipelines at different configuration files, logstash concatenates all the configuration files. If you point -f at a directory that contains app1.conf, app2. conf and app3. conf, it will read events from all the inputs, put them through every filter in every file (with the files ordered in dictionary sort order) then send the events to every output.

So if app1.conf has 'json { source => message }' it will overwrite the message field with the contents of "message": "05/15/2018\t13:21:28:464\tCaller=WORK...". Then if app2.conf also has 'json { source => message }' it will fail to parse that, since it is not valid JSON. You do not need the json filter in both files, unless you are using conditionals to make sure the filters are only applied to specific messages.

@Badger is there a way to have logstash read pipelines in sequence. Like to name the files as 001app1.conf 002app2.conf ...

Any documentation/best practices we need to follow.

Yes, you can number the files to make the order clearer. It will still read from all the inputs, apply all the filters, and write the same events to every output unless you use conditionals.

Thanks. This helped.
ANy documentation on how to name logstash conf files.

I am not aware of any best practices documented. We all have our own way doing things.

Thanks @Badger. Will close this issue.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.