Mysql to Elasticsearch Sync with Logstash & input JDBC plugin


(Matej) #1

Hello,
i am trying to synchronize my MySql database to ElasticSearch with usage of Logstash and JDBC input plugin. As far I have working "dummy" prototype to sync entire Sql table row to single document. My challenge comes now because I am trying to achieve next:
Customer has many addresses
Customer has many contacts...

I would like to have document which looks like this (focused on contact entity but i believe the is the same behaviour also for multiple has many relations..)

Customer {
id:1,
name: "testq",
lastname: "lastname",
contacts: {
{
id:1,
contact:"testemail@email.com"},
{
id:2,
contact:"123456763545"}
},
addresses:{
{addressdata1},{addressdata2}
}
....
}

Currently i figure out how to sync customer entity and keep it updated (to update existing one).
I try to achieve the same for "contacts" and "addresses" and other entities..

With help of other topic: https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html#plugins-filters-aggregate-usecases

I manage to get to this code:
filter {
aggregate {
task_id => "%{cust_id}_customer"
code => "
map['id'] = event.get('cust_id')
map['firstname'] = event.get('firstname')
map['lastname'] = event.get('lastname')
map['username'] = event.get('username')
map['contacts'] ||=
map['contacts'] << {
'customer_contact_id' => event.get('customer_contact_id'),
'contact' => event.get('contact'),
'used_for' => event.get('used_for'),
'type' => event.get('type'),
'status' => event.get('status'),
'created' => event.get('created'),
'modified' => event.get('modified')
}
event.cancel()
"
push_previous_map_as_event => true
timeout => 5000
}
}

My questions will be next:

  1. Is there a way not to specify all fields on customer level to sync? (fistname, lastname) but just to append new contacts, addresses other... array into customer model?
  2. Is there a way to update contact data by their "customer.contacts.id" like on customer level? (currently all records are delted and reinserted)?
  3. Is there a way not to specify each contact fields bust just create arrays..{} and append them to contacts?
  4. I face that not every time all records are synched. One customer has 4 contacts but in most synches min 2 are synched. any purposal why this is happening?

Mysql query i run is:
SELECT Customer.,Customer.id as cust_id,CustomerContact.,CustomerContact.id as customer_contact_id
FROM customers__customers as Customer
LEFT JOIN customers__customer_contacts as CustomerContact ON CustomerContact.customer_id = Customer.id
ORDER by Customer.id asc

which return all contacts for customer.

I am also thinking about having one document with uuid of customer id and then bunch of arrays
{
id:1
customer:{
id:1
name: testq
....},
contacts:{
{ id:,
contact:"testemail@email.com"...},
...},
addresses:{}
}

Which is more optimal form for update and maintenance and especially for further ES searches?

Thanks for reply.


(Guy Boertje) #2

Typically you would use jdbc_streaming or jdbc_static to achieve the "JOIN".

So:
jdbc input (fetch customers) -> some filters -> jdbc_streaming(use customer_contact_id to query contacts table) -> jdbc_streaming(use customer_address_id to query addresses) -> ES

this is denormalising.


(Matej) #3

Hello,
i solved my case by using your purposed plugin jdbc_streaming filter and next config:
filter {
jdbc_streaming {
id => "contacts"
jdbc_driver_library => "/usr/share/java/mysql.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/23123"
jdbc_user => "123123"
jdbc_password => "123123"
statement => "SELECT * FROM customers__customer_contacts as CustomerContact WHERE customer_id = :id"
parameters => { "id" => "id"}
target => "contacts"
}
jdbc_streaming {
id => "addresses"
jdbc_driver_library => "/usr/share/java/mysql.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/13"
jdbc_user => "123123"
jdbc_password => "123123"
statement => "SELECT * FROM customers__customer_addresses WHERE customer_id = :id"
parameters => { "id" => "id"}
target => "addresses"
}
}

Thanks for pointing me in right direction! :slight_smile:


(Guy Boertje) #4

Good to know.

Experiment with the cache settings so that more reference (customers/contacts) are in memory while running but bear in mind volatility - how often does is a customer or contact updated etc.