Hello!
Telegraf send JSON to Kafka and logstash retrieve it.
ES gives me parse error, but i can't understand what's wrong with my data.
JSON from Telegraf:
{"fields":{"created":1594396144981,"value":"{\"id\": 1, \"type\": \"exit\", \"num\": \"12\", \"other_num\": 2, \"name\": \"Ivan\", \"children\": [{\"name\": \"Julia\", \"age\": 1, \"sex\": true, \"birthtime\": \"09/07/2020 17:38:13\"}], \"birthtime\": \"09/07/2020 17:38:13\", \"created\": 1594316293296}"},"name":"generator_log","tags":{"host":"test.dev.map","path":"/opt/map/agents/generator.log","type":"generator"},"timestamp":1594396144}
logstash config:
input {
kafka {
group_id => "test-consumer-group"
topics => ["generator"]
bootstrap_servers => "192.168.2.30:9092"
codec => json
}
}
output {
elasticsearch {
hosts => "elasticsearch:9200"
user => "elastic"
password => "changeme"
}
}
Kafka python consumer script to see what data in Kafka:
ConsumerRecord(topic='test', partition=0, offset=87, timestamp=-1, timestamp_type=0, key=b'test.dev.map', value=b'{"fields":{"created":1594395665612,"value":"{\\"id\\": 3, \\"type\\": \\"exit\\", \\"num\\": \\"12\\", \\"other_num\\": 4, \\"name\\": \\"Maxim\\", \\"children\\": [{\\"name\\": \\"Maxim\\", \\"age\\": 3, \\"sex\\": true, \\"birthtime\\": \\"09/07/2020 17:38:14\\"}], \\"birthtime\\": \\"09/07/2020 17:38:14\\", \\"created\\": 1594316294252}"},"name":"generator_log","tags":{"host":"test.dev.map","path":"/opt/map/agents/generator.log","type":"generator"},"timestamp":1594395665}\n', headers=[], checksum=None, serialized_key_size=12, serialized_value_size=444, serialized_header_size=-1)
ConsumerRecord(topic='test', partition=0, offset=88, timestamp=-1, timestamp_type=0, key=b'test.dev.map', value=b'{"fields":{"created":1594395665612,"value":"{\\"id\\": 2, \\"type\\": \\"exit\\", \\"num\\": \\"12\\", \\"other_num\\": 3, \\"name\\": \\"Ivan\\", \\"children\\": [{\\"name\\": \\"Galya\\", \\"age\\": 2, \\"sex\\": true, \\"birthtime\\": \\"09/07/2020 17:38:14\\"}], \\"birthtime\\": \\"09/07/2020 17:38:14\\", \\"created\\": 1594316294216}"},"name":"generator_log","tags":{"host":"test.dev.map","path":"/opt/map/agents/generator.log","type":"generator"},"timestamp":1594395665}\n', headers=[], checksum=None, serialized_key_size=12, serialized_value_size=443, serialized_header_size=-1)
ConsumerRecord(topic='test', partition=0, offset=89, timestamp=-1, timestamp_type=0, key=b'test.dev.map', value=b'{"fields":{"created":1594395665612,"value":"{\\"id\\": 1, \\"type\\": \\"exit\\", \\"num\\": \\"12\\", \\"other_num\\": 2, \\"name\\": \\"Ivan\\", \\"children\\": [{\\"name\\": \\"Julia\\", \\"age\\": 1, \\"sex\\": true, \\"birthtime\\": \\"09/07/2020 17:38:13\\"}], \\"birthtime\\": \\"09/07/2020 17:38:13\\", \\"created\\": 1594316293296}"},"name":"generator_log","tags":{"host":"test.dev.map","path":"/opt/map/agents/generator.log","type":"generator"},"timestamp":1594395665}\n', headers=[], checksum=None, serialized_key_size=12, serialized_value_size=443, serialized_header_size=-1)
ElasticSearch MapperParsingException:
elasticsearch_1 | 2020-07-10T15:42:20.094079139Z {"type": "server", "timestamp": "2020-07-10T15:42:20,088Z", "level": "DEBUG", "component": "o.e.a.b.TransportShardBulkAction", "cluster.name": "docker-cluster", "node.name": "038e97db87f4", "message": "[logstash-2020.06.19-000005][0] failed to execute bulk item (index) index {[logstash][_doc][JIdkOXMBWm-Bp172Rwa2], source[_na_]}", "cluster.uuid": "6_6oYznnSNu5AzEeRA2MAw", "node.id": "O2YpYk1kS_OUc1Mts3Wh7A" ,
elasticsearch_1 | 2020-07-10T15:42:20.094175676Z "stacktrace": ["org.elasticsearch.index.mapper.MapperParsingException: failed to parse field [tags] of type [text] in document with id 'JIdkOXMBWm-Bp172Rwa2'. Preview of field's value: '{path=/opt/map/agents/generator.log, host=test.dev.map, type=generator}'",
elasticsearch_1 | 2020-07-10T15:42:20.094187658Z "at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:316) ~[elasticsearch-7.8.0.jar:7.8.0]",
...
elasticsearch_1 | 2020-07-10T15:42:20.118273911Z "Caused by: java.lang.IllegalStateException: Can't get text on a START_OBJECT at 1:405",
elasticsearch_1 | 2020-07-10T15:42:20.118418948Z "at org.elasticsearch.common.xcontent.json.JsonXContentParser.text(JsonXContentParser.java:85) ~[elasticsearch-x-content-7.8.0.jar:7.8.0]",
elasticsearch_1 | 2020-07-10T15:42:20.118472545Z "at org.elasticsearch.common.xcontent.support.AbstractXContentParser.textOrNull(AbstractXContentParser.java:253) ~[elasticsearch-x-content-7.8.0.jar:7.8.0]",
elasticsearch_1 | 2020-07-10T15:42:20.135630026Z "at org.elasticsearch.index.mapper.TextFieldMapper.parseCreateField(TextFieldMapper.java:830) ~[elasticsearch-7.8.0.jar:7.8.0]",
elasticsearch_1 | 2020-07-10T15:42:20.135654484Z "at org.elasticsearch.index.mapper.FieldMapper.parse(FieldMapper.java:294) ~[elasticsearch-7.8.0.jar:7.8.0]",
elasticsearch_1 | 2020-07-10T15:42:20.135662911Z "... 160 more"] }