I am using kafka as my input and put it in elasticsearch (output)
input { kafka { topics => ["maxwell"] codec => json } } filter { } output { stdout { codec => rubydebug } elasticsearch { index => 'test_kafka' document_type => "%{table}" hosts => 'localhost:9200' } }
When this runs, it outputs the following json
{ "database": "my_db", "xid": 88935, "@timestamp": "2016-11-14T12:00:13.763Z", "data": { "contact_country_code": null, "contact_type_id": 1, "created": "2014-10-03 12:24:36", "modified_by": null, "modified": "2014-10-03 12:24:36", "contact_id": 1, "is_default": 0, "created_by": null, "contact_number": "1241222232" }, "old": { "contact_number": "1241222" }, "commit": true, "@version": "1", "type": "update", "table": "contact", "ts": 1479124813 }
My question is, how can I only extract the data
key with dynamic document_type
in elasticsearch to achieve the this one
{ "_index": "test_kafka", "_type": "contact", "_id": "AVhitY804rvpX8qdVt9d", "_score": 1, "_source": { "contact_country_code": null, "contact_type_id": 1, "created": "2014-10-03 12:24:36", "modified_by": null, "modified": "2014-10-03 12:24:36", "contact_id": 1, "is_default": 0, "created_by": null, "contact_number": "1241222232" } }