assuming you have a mapping like:
"mappings": {
"atype": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"nestedpath": {
"type": "nested",
"properties": {
"nested_field": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"tags": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
one nested field with parent doc (id and tag + timestamp and version ES fields)
Note: order by is important since aggregate filter requires that.
jdbc {
...
statement => "select id,tag, child.nested_field from parent left outer join child on parent.id=child.id order by id"
type => "atype"
....
}
filter:{
#use @metadata field to prevent saving the field
mutate {
add_field => { "[@metadata][type]" => "%{type}" }
remove_field => [ "type" ]
}
if "atype" == [@metadata][type] {
aggregate {
task_id => "%{id}"
code => "
map['id'] ||= event.get('id')
map['@metadata'] = event.get('@metadata')
map['nested_path'] ||= []
map['nested_path'] < < {'nested_field' => event.get('nested_field')}
"
push_previous_map_as_event => true
timeout =>5
timeout_tags => ['aggregated']
}
}
}
output {
//only ingest to ES when aggregated. timeout_tags put it there
if "aggregated" in [tags]{
elasticsearch {
index => "aindex"
document_type => "%{[@metadata][type]}"
document_id => "%{id}"
hosts => ......
action => "update"
doc_as_upsert => true
}
}
}