JDBC Sqlserver to elastic search : one input to different output?


(V) #1

Hello,

I am quite new to logstash configuration, in fact I learned about logstash altogether yesterday.
Please feel free to direct me to documentation if my question is excessively obvious, here it goes :

I have a sqlserver2016 database that I want to index in elasticsearch using logstash.

Here is my logstash config file which somewhat works :

input {
jdbc {
jdbc_driver_library => "C:\elastic\Microsoft-JDBC-Driver-6.0-for-SQL-Server\sqljdbc_6.0\enu\jre8\sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://10.11.12.13:1433;databaseName=testdb1;integratedSecurity=false;user=ElasticExtractor;password=flyingweisels;"
jdbc_user => "ElasticExtractor"
jdbc_password => "flyingweisels"
statement => "select top 150000 Item_ID itemid,merchant_id merchantid,modelnumber,language_id from items order by Item_Id desc"
}
}
output {
elasticsearch {
hosts => "localhost:9200"
index => "testdata"
document_type => "testtype"
document_id => "%{itemid}"
}
}

So what this file is supposed to do, as configured is insert 150k items in elasticSearch. Somewhat it only imports about a third of that, such as 62 382 in this case. If I try to insert 50k it only inserts about 20k.
Is there an obvious reason why it would do that ?

Here is the current execution log :

[2017-09-01T08:16:31,923][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>[], :added=>[http://localhost:9200/]}}
[2017-09-01T08:16:31,927][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2017-09-01T08:16:32,006][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2017-09-01T08:16:32,007][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2017-09-01T08:16:32,042][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-*", "version"=>50001, "settings"=>{"index.refresh_interval"=>"5s"}, "mappings"=>{"_default_"=>{"_all"=>{"enabled"=>true, "norms"=>false}, "dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"*", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date", "include_in_all"=>false}, "@version"=>{"type"=>"keyword", "include_in_all"=>false}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2017-09-01T08:16:32,050][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::ElasticSearch", :hosts=>["//localhost:9200"]}
[2017-09-01T08:16:32,053][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2017-09-01T08:16:32,219][INFO ][logstash.pipeline        ] Pipeline main started
[2017-09-01T08:16:32,313][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2017-09-01T08:16:32,643][INFO ][logstash.inputs.jdbc     ] (0.050000s) select top 150000 Item_ID itemid,merchant_id merchantid,modelnumber from items order by Item_Id desc
[2017-09-01T08:16:49,805][WARN ][logstash.agent           ] stopping pipeline {:id=>"main"}

Second thing is, let's say I want to insert a row from SQL server that comes from this input, what plugin can I use so that if the row has a specific "merchant_id" it goes in an elastic TYPE named with that ID. Also, if it has a specific "language" it goes in an elastic INDEX with that language as the name.
Can that be done ?
Should I simply create multiple Logstash config files, one for each of those tasks ?


(Magnus Bäck) #2

So what this file is supposed to do, as configured is insert 150k items in elasticSearch. Somewhat it only imports about a third of that, such as 62 382 in this case. If I try to insert 50k it only inserts about 20k.
Is there an obvious reason why it would do that ?

I assume item_id is unique for all rows in the result set?

Second thing is, let’s say I want to insert a row from SQL server that comes from this input, what plugin can I use so that if the row has a specific “merchant_id” it goes in an elastic TYPE named with that ID. Also, if it has a specific “language” it goes in an elastic INDEX with that language as the name.

You can just reference the fields in the elasticsearch output's index and document_type option like you currently do with the document_id option.

https://www.elastic.co/guide/en/logstash/current/event-dependent-configuration.html#logstash-config-field-references

Keep in mind that ES indexes have a certain overhead so you'll want to keep the number reasonably low.

Should I simply create multiple Logstash config files, one for each of those tasks ?

Sure, if that's how you prefer to organize things.


(V) #3

You are absolutely right about the item_id possibly not being unique. It indeed was not as I was doing a SQL join that returned multiple times the same item with different LEFT JOIN columns values.

Secondly,the problem I had with the document_type and the index not getting assigned properly was so dumb also...

Basically I had it setup with capital letters and elastic seems to get angry at anything capitalized in the names of indexes and pretty much everything else :

doesn't work :
output {
elasticsearch {
hosts => "localhost:9200"
index => "%{Language}"
document_type => "%{MerchantID}"
document_id => "%{itemid}"
}

works :
output {
elasticsearch {
hosts => "localhost:9200"
index => "%{language}"
document_type => "%{merchantid}"
document_id => "%{itemid}"
}


(system) #4

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.