How to use jdbc to import data into nested objects?

Hi everybody,

I have multiple tables in my database which I would like to import into Elasticsearch using a single document type. One of the tables has a one-to-many relation to another table.

I will use the example as provided in the guide to explain the problem, because my problem is similar (https://www.elastic.co/guide/en/elasticsearch/guide/current/nested-objects.html). So we have blogposts (in one table) and each blogpost can have multiple comments (in another table, linked with a one-to-many relation). Representing comments as Arrays of Inner Objects does not work for us, because we would loose the correlation between the different attributes of a comment as stated in the Guide. A solution is the use of Nested Objects, by mapping the comments field as type nested.

This would be a valid output to Elasticsearch:

{
 { 
   "comments.id":      [ 11 ],
   "comments.name":    [ john, smith ],
   "comments.comment": [ article, great ],
   "comments.age":     [ 28 ],
   "comments.stars":   [ 4 ],
   "comments.date":    [ 2014-09-01 ]
 }
 { 
   "comments.id":      [ 12 ],
   "comments.name":    [ alice, white ],
   "comments.comment": [ like, more, please, this ],
   "comments.age":     [ 31 ],
   "comments.stars":   [ 5 ],
   "comments.date":    [ 2014-10-22 ]
 }
 { 
   "id":               [ 1 ],
   "title":            [ eggs, nest ],
    "body":            [ making, money, work, your ],
   "tags":             [ cash, shares ]
 }
}

I used the following nested mapping: (mostly obtained from the guide)

PUT /my_index
{
  "mappings": {
    "blogpost": {
      "properties": {
        "comments": {
          "type": "nested", 
          "properties": {
	   "id":       { "type": "number"  },
            "name":    { "type": "string"  },
            "comment": { "type": "string"  },
            "age":     { "type": "short"   },
            "stars":   { "type": "short"   },
            "date":    { "type": "date"    }
          }
        }
      }
    }
  }
}

The statement below shows the relevant configuration to import data from jdbc using logstash:

input {
  jdbc {
    ...
     statement => "SELECT * FROM blogpost LEFT JOIN comments ON blogpost.id = comments.id ORDER BY blogpost.id"
    ...
  }
}

However, when looking what has been imported into ES I see that for each comment another event is generated instead of a single event per blogpost with comments as nested objects. I have not found how to implement this functionality, but here (https://github.com/jprante/elasticsearch-jdbc#structured-objects) this functionality is used. How should I proceed with importing to get a single event per blogpost with its comments as nested objects?

Thank you.

Daan

Hi Daan

Did you find a solution for this because I'm looking at the same problem.

Thanks.

Use logstash-filter-aggregate.

Hi Iti,
Can you please give me an example as how to use logstash-filter-aggregate to achieve the results in the nested objects.

Just to let you know that I want to have a collection of an object for example for each parent document I may have multiple records from a child table .. so its left join and I want few fields from that child table.

I tried the array symbol "[]" to use in the jdbc importer file through logstash .. but it didnt worked.

assuming you have a mapping like:
"mappings": {
"atype": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"id": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"nestedpath": {
"type": "nested",
"properties": {
"nested_field": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
"tags": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
}
one nested field with parent doc (id and tag + timestamp and version ES fields)

Note: order by is important since aggregate filter requires that.

jdbc {

...
statement => "select id,tag, child.nested_field from parent left outer join child on parent.id=child.id order by id"
type => "atype"
....
}
filter:{
#use @metadata field to prevent saving the field
mutate {
add_field => { "[@metadata][type]" => "%{type}" }
remove_field => [ "type" ]
}
if "atype" == [@metadata][type] {
aggregate {
task_id => "%{id}"
code => "
map['id'] ||= event.get('id')
map['@metadata'] = event.get('@metadata')
map['nested_path'] ||= []
map['nested_path'] < < {'nested_field' => event.get('nested_field')}
"
push_previous_map_as_event => true
timeout =>5
timeout_tags => ['aggregated']
}
}
}
output {
//only ingest to ES when aggregated. timeout_tags put it there
if "aggregated" in [tags]{
elasticsearch {
index => "aindex"
document_type => "%{[@metadata][type]}"
document_id => "%{id}"
hosts => ......
action => "update"
doc_as_upsert => true
}
}
}

If all your comments are stored in a table vertically then you can use SQL Server 2016's JSON Path feature to write a function that generates a column which is the nested datatype you want to ingest(You will need to use cross apply along with it). Once you have that ready you can also use the following ruby filter.

ruby {
  code => "
  require 'json'
  json_value = JSON.parse(event.get('column_fromsql_containing_json').to_s)
  event.set('name_of_nestedfield_in_index',json_value)
"
}
1 Like

In your sql query you can generate child objects in json format and make a filter in logstash very simple. Follow the example below.

input {

stdin {}

jdbc {

    jdbc_driver_library => "C:\Server\java\jdbc\sqljdbc_7.4\enu\mssql-jdbc-7.4.1.jre8.jar"

    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"

    jdbc_connection_string => "jdbc:sqlserver://127.0.0.1:11433;databaseName=Northwind;"

    jdbc_user => "****"

    jdbc_password => "****"

    statement => "  select p.ProductID, p.ProductName,

                    (select CategoryID, CategoryName from Categories where CategoryID = c.CategoryID for json path) as jsonvalue,

                    (select CategoryID, CategoryName from Categories for json path) as jsonvaluecategorias

                    from Products p

                    join Categories c on c.CategoryID =p.CategoryID"

    connection_retry_attempts => 3

}

}

filter {

json {

    source => "jsonvalue"

    target => "Categoria"

    remove_field => "jsonvalue"

}

json {

    source => "jsonvaluecategorias"

    target => "Categorias"

    remove_field => "jsonvaluecategorias"

}

}

output {

elasticsearch {

    hosts => "http://localhost:9200"        

    index => "xpto8"                        

}        

}

Hi,
You can also try filter with Jdbc streaming.

You can check for an example in this post