Logstash to Elasticsearch JDBC Current Version Conflict Issue

I'm trying to import data from a mysql database with the jdbc input plugin and based on the way I'm getting the data out of the database, I'm running into concurrency errors similar to the one below

[2018-03-09T17:40:02,130][WARN ][logstash.outputs.elasticsearch] Failed action. {:status=>409, :action=>["update", {:_id=>"348", :_index=>"ci", :_type=>"doc", :_routing=>nil, :_retry_on_conflict=>1}, #<LogStash::Event:0x4bf3f637>], :response=>{"update"=>{"_index"=>"ci", "_type"=>"doc", "_id"=>"348", "status"=>409, "error"=>{"type"=>"version_conflict_engine_exception", "reason"=>"[doc][348]: version conflict, current version [2] is different than the one provided [1]", "index_uuid"=>"BNoN9HgYQKuWCCtwBoNjpQ", "shard"=>"4", "index"=>"ci"}}}}
[2018-03-09T17:40:02,152][WARN ][logstash.outputs.elasticsearch] Failed action. {:status=>409, :action=>["update", {:_id=>"348", :_index=>"ci", :_type=>"doc", :_routing=>nil, :_retry_on_conflict=>1}, #<LogStash::Event:0x57c6a792>], :response=>{"update"=>{"_index"=>"ci", "_type"=>"doc", "_id"=>"348", "status"=>409, "error"=>{"type"=>"version_conflict_engine_exception", "reason"=>"[doc][348]: version conflict, current version [2] is different than the one provided [1]", "index_uuid"=>"BNoN9HgYQKuWCCtwBoNjpQ", "shard"=>"4", "index"=>"ci"}}}}

The data I'm trying to get out of mysql is information about devices in our environment, so hostname, ipaddress, os version, etc. The problem is that these attributes are stored in a very normalized database so to get the data out of the database, I have a query that produces the following type of output:

ciid, devicename, attributename, attrbutevalue
3915, testdevice01, type, server
3915, testdevice01, ipaddress, 1.2.3.4
3915, testdevice01, ostype, linux
3915, testdevice01, status, active
3915, testdevice01, processorcount, 4

the attribute name and value columns are data from two different tables joined to the main device table so there are no pre-made columns for the attributes.

Currently what I do is I have a mutate filter convert the attributename and attributevalue for each row into its own field in the event before being sent to elasticsearch. However when there are lot of these rows to process from the query, I run into the concurrency issue.

Is there a better way of organizing this type of data so that it gets fully imported into elasticsearch?

my logstash config:

input {
         jdbc {
                   # COMMON PARTS
                   jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
                   jdbc_driver_class => "com.mysql.jdbc.Driver"
                   jdbc_connection_string => "jdbc:mysql://db-s:3306"
                   jdbc_user => "*****"
                   jdbc_password_filepath => "******"
                   jdbc_paging_enabled => "true"
                  jdbc_page_size => "100000"
 
                  # DB SPECIFIC PARTS
                  use_column_value => true
                  tracking_column => "updatedate"
                  tracking_column_type => "timestamp"
                  last_run_metadata_path => "/usr/share/logstash/.jdbc_last_run_ci"
                  statement_filepath => "/etc/logstash/conf.d/sql_queries/ci.sql"
                  schedule => "*/2 * * * *"
          }
filter
 {
        mutate {
                          add_field => { "%{attributename}" => "%{attributevalue}"}
                          remove_field =>["attributename","attributevalue"]
            }
 }

 output {
                 elasticsearch {
                         hosts => ["127.0.0.1:9200"]
                         user => "******"
                         password => "******"
                         index => "ci"
                         action => "update"
                         doc_as_upsert => "true"
                         document_id => "%{ciid}"
                 }
 }

My first thought is that while Elasticsearch allows us to partially update documents, using this functionality to compose documents feels a bit heavy-handed. Under the hood, the node that receives a request to update a document is getting the document from the shard that owns it, merging the new data onto the existing document, and then storing it. Under high concurrency, it makes sense that we would hit overlapping check-and-sets and thus: version conflicts.

Additionally, down at the lucene layer in Elasticsearch, what we're doing is storing a new document and "soft deleting" the previous version of the document; these extra versions will eventually get cleaned up when an index segment is merged, but they carry overhead nonetheless.


Logstash is an event pipeline; while there is some batching done for efficiency, each event is essentially processed alone, without context into other events, and without guarantee of ordering or adjacency of other events. This means that the best way to solve this problem within Logstash is to ensure that the input gives us exactly one event per logical grouping for our output.

If we know all possible values of attributename, we can "pivot" the data in our database query, to ensure that Logstash receives one event per ciid/devicename with columns for each possible attribute; while MySQL doesn't provide pivot functions, we can achieve something similar by clever use of GROUP BY, GROUP_CONCAT and IF statements:

SELECT
  ciid,
  devicename,
  GROUP_CONCAT(IF(attributename="type", attributevalue, NULL)) AS type,
  GROUP_CONCAT(IF(attributename="ipaddress", attributevalue, NULL)) AS ipaddress,
  GROUP_CONCAT(IF(attributename="ostype", attributevalue, NULL)) AS ostype,
  GROUP_CONCAT(IF(attributename="status", attributevalue, NULL)) AS status,
  GROUP_CONCAT(IF(attributename="processorcount", attributevalue, NULL)) AS processorcount
FROM devicedata
GROUP BY ciid, devicename
;
ciid	devicename	type	ipaddress	ostype	status	processorcount
3915	testdevice01	server	1.2.3.4	linux	active	4
3916	testdevice02	server	2.4.6.8	linux	passive	8

-- example on DB-Fiddle


If you're running MySQL 5.7.22 or later, the JSON_OBJECTAGG(key, value) aggregation function can be very useful, because it doesn't require that we know all the possible attributenames, and is a lot less verbose:

SELECT
  ciid,
  devicename,
  JSON_OBJECTAGG(attributename, attributevalue) AS attributes
FROM devicedata
GROUP BY ciid, devicename
;

ciid	devicename	attributes
3915	testdevice01	{"type": "server", "ostype": "linux", "status": "active", "ipaddress": "1.2.3.4", "processorcount": "4"}
3916	testdevice02	{"type": "server", "ostype": "linux", "status": "passive", "ipaddress": "2.4.6.8", "processorcount": "8"}

-- Example on DB Fiddle

This would cause Logstash to get an event with three properties:

  • ciid: a string representing the ciid,
  • devicename: a string representing the devicename, and
  • attributes: a string containing a JSON blob with the attributes, which could be extracted in a subsequent step with the json filter:
filter {
  json {
    source => attributes
  }
}

I hope this helps :slight_smile:

It definitely help.

I kind of suspected that I would have to try and find a way to get all the data into a single query output, but wasn't sure if I could get a successful workaround done in logstash alone.

Unfortunately the servers I'm working with are only on mysql 5.5 so the json_objecttagg wouldn't work, which is a shame since that sounds rather useful.

thanks!

I had experimented with group_concat, but ran into issues where some of the attributes I was getting were longer than the character limit for the command. Since I don't all the possible attributes that can be created I had tried concating that all together and then using the kv filter to break it down into their proper fields.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.