How to load nested documents in Elasticsearh using logstash

I have a product database with 100.000 products. Each product has its own specs. The number and type of specs differ for each product. So my database has 2 tables with a 1:N relation: a product table (about 100.000 records) and a spec table (about 600.000 records). The product-ID is the key for the product table and product-ID + sequenceNo is the key for the spec. I want to index this database in Elasticsearch as one nested document, using logstash. Can someone tell me how to do this.

Structure of my document index:
{
"mappings": {
"products": {
"properties": {
"@timestamp": {
"type": "date"
},
"@version": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"productid": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 20
}
}
},
"description": {
"type": "text",
"analyzer": "dutch",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 120
}
}
},
"specs": {
"type": "nested",
"properties": {
"productid": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 20
}
}
},
"seqno": {
"type": "long"
},
"featurename": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 60
}
}
},
"featurevalue": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 60
}
}
}
}
}
}
}
}
}

My Logstash configuration file looks like this:

input {
jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.42-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://......"
jdbc_user => "....."
jdbc_password => "........"
jdbc_validate_connection => true
schedule => "* * * * *"
statement => "select * from webproducts"
type => "products"
}

jdbc {
jdbc_driver_library => "mysql-connector-java-5.1.42-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://......"
jdbc_user => "....."
jdbc_password => "........"
jdbc_validate_connection => true
schedule => "* * * * *"
statement => "select * from webspecs"
type => "specs"
}
}

filter {
mutate {
add_field => { "[@metadata][type]" => "%{type}" }
remove_field => ["type"]
}
}

output {
if [@metadata][type] == "products" {
elasticsearch {
hosts => [ "localhost:9200" ]
user => ......
password => ......
index => .........
document_type => "%{[@metadata][type]}"
document_id => "%{productid}"
}
}

if [@metadata][type] == "specs" {
elasticsearch {
hosts => [ "localhost:9200" ]
user => ......
password => ......
index => .........
document_type => "%{[@metadata][type]}"
document_id => "%{sku}.%{seqno}"
}
}
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.