Kafka input codec JSON parse error

Hello!

Telegraf send JSON to Kafka and logstash retrieve it.

ES gives me parse error, but i can't understand what's wrong with my data.

JSON from Telegraf:

{"fields":{"created":1594396144981,"value":"{\"id\": 1, \"type\": \"exit\", \"num\": \"12\", \"other_num\": 2, \"name\": \"Ivan\", \"children\": [{\"name\": \"Julia\", \"age\": 1, \"sex\": true, \"birthtime\": \"09/07/2020 17:38:13\"}], \"birthtime\": \"09/07/2020 17:38:13\", \"created\": 1594316293296}"},"name":"generator_log","tags":{"host":"test.dev.map","path":"/opt/map/agents/generator.log","type":"generator"},"timestamp":1594396144}

logstash config:

input {
        kafka {
                group_id => "test-consumer-group"
                topics => ["generator"]
                bootstrap_servers => "192.168.2.30:9092"
                codec => json
       }
}

output {
        elasticsearch {
                hosts => "elasticsearch:9200"
                user => "elastic"
                password => "changeme"
        }
}

Kafka python consumer script to see what data in Kafka:

ConsumerRecord(topic='test', partition=0, offset=87, timestamp=-1, timestamp_type=0, key=b'test.dev.map', value=b'{"fields":{"created":1594395665612,"value":"{\\"id\\": 3, \\"type\\": \\"exit\\", \\"num\\": \\"12\\", \\"other_num\\": 4, \\"name\\": \\"Maxim\\", \\"children\\": [{\\"name\\": \\"Maxim\\", \\"age\\": 3, \\"sex\\": true, \\"birthtime\\": \\"09/07/2020 17:38:14\\"}], \\"birthtime\\": \\"09/07/2020 17:38:14\\", \\"created\\": 1594316294252}"},"name":"generator_log","tags":{"host":"test.dev.map","path":"/opt/map/agents/generator.log","type":"generator"},"timestamp":1594395665}\n', headers=[], checksum=None, serialized_key_size=12, serialized_value_size=444, serialized_header_size=-1)
ConsumerRecord(topic='test', partition=0, offset=88, timestamp=-1, timestamp_type=0, key=b'test.dev.map', value=b'{"fields":{"created":1594395665612,"value":"{\\"id\\": 2, \\"type\\": \\"exit\\", \\"num\\": \\"12\\", \\"other_num\\": 3, \\"name\\": \\"Ivan\\", \\"children\\": [{\\"name\\": \\"Galya\\", \\"age\\": 2, \\"sex\\": true, \\"birthtime\\": \\"09/07/2020 17:38:14\\"}], \\"birthtime\\": \\"09/07/2020 17:38:14\\", \\"created\\": 1594316294216}"},"name":"generator_log","tags":{"host":"test.dev.map","path":"/opt/map/agents/generator.log","type":"generator"},"timestamp":1594395665}\n', headers=[], checksum=None, serialized_key_size=12, serialized_value_size=443, serialized_header_size=-1)
ConsumerRecord(topic='test', partition=0, offset=89, timestamp=-1, timestamp_type=0, key=b'test.dev.map', value=b'{"fields":{"created":1594395665612,"value":"{\\"id\\": 1, \\"type\\": \\"exit\\", \\"num\\": \\"12\\", \\"other_num\\": 2, \\"name\\": \\"Ivan\\", \\"children\\": [{\\"name\\": \\"Julia\\", \\"age\\": 1, \\"sex\\": true, \\"birthtime\\": \\"09/07/2020 17:38:13\\"}], \\"birthtime\\": \\"09/07/2020 17:38:13\\", \\"created\\": 1594316293296}"},"name":"generator_log","tags":{"host":"test.dev.map","path":"/opt/map/agents/generator.log","type":"generator"},"timestamp":1594395665}\n', headers=[], checksum=None, serialized_key_size=12, serialized_value_size=443, serialized_header_size=-1)

ElasticSearch MapperParsingException:

    elasticsearch_1  | 2020-07-10T15:42:20.094079139Z {"type": "server", "timestamp": "2020-07-10T15:42:20,088Z", "level": "DEBUG", "component": "o.e.a.b.TransportShardBulkAction", "cluster.name": "docker-cluster", "node.name": "038e97db87f4", "message": "[logstash-2020.06.19-000005][0] failed to execute bulk item (index) index {[logstash][_doc][JIdkOXMBWm-Bp172Rwa2], source[_na_]}", "cluster.uuid": "6_6oYznnSNu5AzEeRA2MAw", "node.id": "O2YpYk1kS_OUc1Mts3Wh7A" ,
    elasticsearch_1  | 2020-07-10T15:42:20.094175676Z "stacktrace": ["org.elasticsearch.index.mapper.MapperParsingException: failed to parse field [tags] of type [text] in document with id 'JIdkOXMBWm-Bp172Rwa2'. Preview of field's value: '{path=/opt/map/agents/generator.log, host=test.dev.map, type=generator}'",
    elasticsearch_1  | 2020-07-10T15:42:20.094187658Z "at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:316) ~[elasticsearch-7.8.0.jar:7.8.0]",
...
    elasticsearch_1  | 2020-07-10T15:42:20.118273911Z "Caused by: java.lang.IllegalStateException: Can't get text on a START_OBJECT at 1:405",
    elasticsearch_1  | 2020-07-10T15:42:20.118418948Z "at org.elasticsearch.common.xcontent.json.JsonXContentParser.text(JsonXContentParser.java:85) ~[elasticsearch-x-content-7.8.0.jar:7.8.0]",
    elasticsearch_1  | 2020-07-10T15:42:20.118472545Z "at org.elasticsearch.common.xcontent.support.AbstractXContentParser.textOrNull(AbstractXContentParser.java:253) ~[elasticsearch-x-content-7.8.0.jar:7.8.0]",
    elasticsearch_1  | 2020-07-10T15:42:20.135630026Z "at org.elasticsearch.index.mapper.TextFieldMapper.parseCreateField(TextFieldMapper.java:830) ~[elasticsearch-7.8.0.jar:7.8.0]",
    elasticsearch_1  | 2020-07-10T15:42:20.135654484Z "at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:294) ~[elasticsearch-7.8.0.jar:7.8.0]",
    elasticsearch_1  | 2020-07-10T15:42:20.135662911Z "... 160 more"] }

Indeed in a 5 minutes after i create topic i get an idea, what if ES can't parse tags field and think that it is internal tags.... yes! it is.

I rename tags into telegraf_tags and it works!

But now i have a question: why this happens? why such behaviour?

updated logstash config:

input {
        kafka {
                group_id => "test-consumer-group"
                topics => ["generator"]
                bootstrap_servers => "192.168.2.30:9092"
                codec => json
       }
}

filter {
        mutate {
                rename => { "tags" => "telegraf_tags" }
                remove_field => [ "tags" ]
        }
}

output {
        elasticsearch {
                hosts => "elasticsearch:9200"
                user => "elastic"
                password => "changeme"
        }
}

The field tags is mapped as the data type text in ES. But you were trying to insert an object.

Can't get text on a START_OBJECT at 1:405

so, i can't have in my JSON field tags as object? because it is default for ES with mapping text?
looks like constraint

I've found one more unexpected behaviour:

This one config works perfectly:

filter {
  ruby {
    code => "event.set('[@metadata][kafka][lc_topic]', event.get('[@metadata][kafka][topic]').split(/(?=[A-Z])/).map{|x| x.downcase }.join('_') )" # вытаскиваем имя топика из metadata и приводим к lowercase (чтобы ES не ругался что index с большими буквами)
  }
  mutate {
    rename => ["tags", "telegraf_tags" ] # в случае с telegraf - переименовываем поле tags,
  }
  mutate {
    remove_field => [ "tags" ]
    copy => {
      "[fields][created]" => "created"
      "[fields][value]" => "data"
    }
  }
  mutate {
    add_tag => [ "%{[telegraf_tags][type]}", "%{[telegraf_tags][host]}", "%{[telegraf_tags][path]}" ]
  }
  mutate {
    remove_field => [ "fields", "telegraf_tags" ]
  }
}

But this one gives me a parse error 'tags' again (same as in topic):

filter {
  ruby {
    code => "event.set('[@metadata][kafka][lc_topic]', event.get('[@metadata][kafka][topic]').split(/(?=[A-Z])/).map{|x| x.downcase }.join('_') )" # вытаскиваем имя топика из metadata и приводим к lowercase (чтобы ES не ругался что index с большими буквами)
  }
  if [tags][agent_type] == "telegraf" {
    mutate {
      rename => ["tags", "telegraf_tags" ] # в случае с telegraf - переименовываем поле tags,
    }
    mutate {
      remove_field => [ "tags" ]
      copy => {
        "[fields][created]" => "created"
        "[fields][value]" => "data"
      }
    }
    mutate {
      add_tag => [ "%{[telegraf_tags][type]}", "%{[telegraf_tags][host]}", "%{[telegraf_tags][path]}" ]
    }
    mutate {
      remove_field => [ "fields", "telegraf_tags" ]
    }
  }
}

Difference between configuration is 1 if condition:

if [tags][agent_type] == "telegraf" {
}

Seems when this if appear - logstash does not delete tags field.
Why does this happen?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.