I am using kafka as my input and put it in elasticsearch (output)
input {
kafka {
topics => ["maxwell"]
codec => json
}
}
filter {
}
output {
stdout { codec => rubydebug }
elasticsearch {
index => 'test_kafka'
document_type => "%{table}"
hosts => 'localhost:9200'
}
}
When this runs, it outputs the following json
{
"database": "my_db",
"xid": 88935,
"@timestamp": "2016-11-14T12:00:13.763Z",
"data": {
"contact_country_code": null,
"contact_type_id": 1,
"created": "2014-10-03 12:24:36",
"modified_by": null,
"modified": "2014-10-03 12:24:36",
"contact_id": 1,
"is_default": 0,
"created_by": null,
"contact_number": "1241222232"
},
"old": {
"contact_number": "1241222"
},
"commit": true,
"@version": "1",
"type": "update",
"table": "contact",
"ts": 1479124813
}
My question is, how can I only extract the data key with dynamic document_type in elasticsearch to achieve the this one
{
"_index": "test_kafka",
"_type": "contact",
"_id": "AVhitY804rvpX8qdVt9d",
"_score": 1,
"_source": {
"contact_country_code": null,
"contact_type_id": 1,
"created": "2014-10-03 12:24:36",
"modified_by": null,
"modified": "2014-10-03 12:24:36",
"contact_id": 1,
"is_default": 0,
"created_by": null,
"contact_number": "1241222232"
}
}