Append object to nested field but get overwritten

Hello,

We've got a problem with a logstash JDBC configuration. We have a large tabel (~ 15 million records) which we want to insert into our elasticsearch.

A source (MySQL) record has a domain column which is used as the document id.

id |  domain         | duns  | company_name
1  |  example.com    | 1     | company 1
2  |  example.com    | 2     | company 2
3  |  other.com      | 6     | company 6

Now we want to create a document like below. The document has a companies nested field which contains all companies which are associated to that domain. When a source record comes along with a domain that is already present as a document, it must add the company data in the nested companies column. The elastic document would look like this;

{
 _id: 'example.com',
 identifier: 'example.com',
 companies: [{
     duns: 1
     company_name:  company 1
   },
   {
     duns: 2
     company_name:  company 2
   }]
},
{
   Next document....
}

We use the following script right now;

    input {
      jdbc {
        jdbc_connection_string => "connectionstring"
        jdbc_user => "${MYSQL_USERNAME}"
        jdbc_password => "${MYSQL_PASSWORD}"
        jdbc_driver_library => "/usr/share/logstash/lib/mysql-connector-java-5.1.49.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        schedule => "* * * * *"
        statement => "SELECT IF(rtlgdb.domain is not null and rtlgdb.domain != '', rtlgdb.domain, concat('no_domain_', rtlgdb.duns)) as identifier, rtlgdb.*
                      FROM rtlg_data.samr_rtlg_db as rtlgdb
                      WHERE duns > :sql_last_value
                      ORDER BY duns ASC"
        use_column_value => true
        tracking_column => "duns"
        clean_run => false
        jdbc_paging_enabled => true
        jdbc_page_size => 50
        jdbc_fetch_size => 50
      }
    }

    filter {
      aggregate {
        task_id => "%{identifier}"
        code => "
          map['identifier'] ||= event.get('identifier')
          map['companies'] ||= []
          map['companies'] << {
            'duns' => event.get('duns'),
            'company_name' => event.get('company_name')
          }
          event.cancel()
        "
        push_previous_map_as_event => true
        timeout => 3
      }
    }

    output {
      stdout { codec => json_lines }
      elasticsearch {
        ssl => true
        user => "${ELASTIC_USERNAME}"
        password => "${ELASTIC_PASSWORD}"
        cacert => "/usr/share/logstash/config/certs/elastic-pkcs12.pem"
        hosts => "https://elasticsearch-master:9200"
        index => "bnl_webcontent"
        document_id => "%{identifier}"
        action => "update"
        doc_as_upsert => true
      }
    }

This seems to be working with a small queried dataset, but as soon as we let it run on the complete dataset the nested field gets overwritten and only the latest retreived company exists in the nested field.

The index is created with;

PUT bnl_webcontent
{
  "mappings": {
    "properties": {
      "companies": {
        "type": "nested"
      }
    }
  }
}

My Question is; am i using the right script logic to get the result i want? Strangely when i change the SQL query to

WHERE domain = 'example.com'
AND duns > :sql_last_value

It works as expected

Thanks!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.