I'm trying to import data from a mysql database with the jdbc input plugin and based on the way I'm getting the data out of the database, I'm running into concurrency errors similar to the one below
[2018-03-09T17:40:02,130][WARN ][logstash.outputs.elasticsearch] Failed action. {:status=>409, :action=>["update", {:_id=>"348", :_index=>"ci", :_type=>"doc", :_routing=>nil, :_retry_on_conflict=>1}, #<LogStash::Event:0x4bf3f637>], :response=>{"update"=>{"_index"=>"ci", "_type"=>"doc", "_id"=>"348", "status"=>409, "error"=>{"type"=>"version_conflict_engine_exception", "reason"=>"[doc][348]: version conflict, current version [2] is different than the one provided [1]", "index_uuid"=>"BNoN9HgYQKuWCCtwBoNjpQ", "shard"=>"4", "index"=>"ci"}}}}
[2018-03-09T17:40:02,152][WARN ][logstash.outputs.elasticsearch] Failed action. {:status=>409, :action=>["update", {:_id=>"348", :_index=>"ci", :_type=>"doc", :_routing=>nil, :_retry_on_conflict=>1}, #<LogStash::Event:0x57c6a792>], :response=>{"update"=>{"_index"=>"ci", "_type"=>"doc", "_id"=>"348", "status"=>409, "error"=>{"type"=>"version_conflict_engine_exception", "reason"=>"[doc][348]: version conflict, current version [2] is different than the one provided [1]", "index_uuid"=>"BNoN9HgYQKuWCCtwBoNjpQ", "shard"=>"4", "index"=>"ci"}}}}
The data I'm trying to get out of mysql is information about devices in our environment, so hostname, ipaddress, os version, etc. The problem is that these attributes are stored in a very normalized database so to get the data out of the database, I have a query that produces the following type of output:
ciid, devicename, attributename, attrbutevalue
3915, testdevice01, type, server
3915, testdevice01, ipaddress, 1.2.3.4
3915, testdevice01, ostype, linux
3915, testdevice01, status, active
3915, testdevice01, processorcount, 4
the attribute name and value columns are data from two different tables joined to the main device table so there are no pre-made columns for the attributes.
Currently what I do is I have a mutate filter convert the attributename and attributevalue for each row into its own field in the event before being sent to elasticsearch. However when there are lot of these rows to process from the query, I run into the concurrency issue.
Is there a better way of organizing this type of data so that it gets fully imported into elasticsearch?
my logstash config:
input {
jdbc {
# COMMON PARTS
jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://db-s:3306"
jdbc_user => "*****"
jdbc_password_filepath => "******"
jdbc_paging_enabled => "true"
jdbc_page_size => "100000"
# DB SPECIFIC PARTS
use_column_value => true
tracking_column => "updatedate"
tracking_column_type => "timestamp"
last_run_metadata_path => "/usr/share/logstash/.jdbc_last_run_ci"
statement_filepath => "/etc/logstash/conf.d/sql_queries/ci.sql"
schedule => "*/2 * * * *"
}
filter
{
mutate {
add_field => { "%{attributename}" => "%{attributevalue}"}
remove_field =>["attributename","attributevalue"]
}
}
output {
elasticsearch {
hosts => ["127.0.0.1:9200"]
user => "******"
password => "******"
index => "ci"
action => "update"
doc_as_upsert => "true"
document_id => "%{ciid}"
}
}