Hello,
We've got a problem with a logstash JDBC configuration. We have a large tabel (~ 15 million records) which we want to insert into our elasticsearch.
A source (MySQL) record has a domain
column which is used as the document id.
id | domain | duns | company_name
1 | example.com | 1 | company 1
2 | example.com | 2 | company 2
3 | other.com | 6 | company 6
Now we want to create a document like below. The document has a companies
nested field which contains all companies which are associated to that domain. When a source record comes along with a domain that is already present as a document, it must add the company data in the nested companies
column. The elastic document would look like this;
{
_id: 'example.com',
identifier: 'example.com',
companies: [{
duns: 1
company_name: company 1
},
{
duns: 2
company_name: company 2
}]
},
{
Next document....
}
We use the following script right now;
input {
jdbc {
jdbc_connection_string => "connectionstring"
jdbc_user => "${MYSQL_USERNAME}"
jdbc_password => "${MYSQL_PASSWORD}"
jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-5.1.49.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
schedule => "* * * * *"
statement => "SELECT IF(rtlgdb.domain is not null and rtlgdb.domain != '', rtlgdb.domain, concat('no_domain_', rtlgdb.duns)) as identifier, rtlgdb.*
FROM rtlg_data.samr_rtlg_db as rtlgdb
WHERE duns > :sql_last_value
ORDER BY duns ASC"
use_column_value => true
tracking_column => "duns"
clean_run => false
jdbc_paging_enabled => true
jdbc_page_size => 50
jdbc_fetch_size => 50
}
}
filter {
aggregate {
task_id => "%{identifier}"
code => "
map['identifier'] ||= event.get('identifier')
map['companies'] ||= []
map['companies'] << {
'duns' => event.get('duns'),
'company_name' => event.get('company_name')
}
event.cancel()
"
push_previous_map_as_event => true
timeout => 3
}
}
output {
stdout { codec => json_lines }
elasticsearch {
ssl => true
user => "${ELASTIC_USERNAME}"
password => "${ELASTIC_PASSWORD}"
cacert => "/usr/share/logstash/config/certs/elastic-pkcs12.pem"
hosts => "https://elasticsearch-master:9200"
index => "bnl_webcontent"
document_id => "%{identifier}"
action => "update"
doc_as_upsert => true
}
}
This seems to be working with a small queried dataset, but as soon as we let it run on the complete dataset the nested field gets overwritten and only the latest retreived company exists in the nested field.
The index is created with;
PUT bnl_webcontent
{
"mappings": {
"properties": {
"companies": {
"type": "nested"
}
}
}
}
My Question is; am i using the right script logic to get the result i want? Strangely when i change the SQL query to
WHERE domain = 'example.com'
AND duns > :sql_last_value
It works as expected
Thanks!