Jdbc-streaming plugin seems not working

thanks @guyboertje , it worked :slight_smile:

Hi,

I've configured logstash exactly the same as here:

jdbc_streaming {
  jdbc_driver_library => "/etc/logstash/mysql-connector-java-5.1.42/mysql-connector-java-5.1.42-bin.jar"
  jdbc_driver_class => "com.mysql.jdbc.Driver"
  jdbc_connection_string => "jdbc:mysql://localhost:3306/test_db"
  jdbc_user => "root"
  jdbc_password => "test"
  parameters => { "ostype" => "major" }
  statement => "select value from test_table WHERE keyid = :ostype"
  target => "loo"
  use_cache => "false"
}

major is a logstash variable that comes from logs.

Each time MySQL query runs, it gets "NULL" instead of "major":

170518 14:50:29 25 Query select value from test_table WHERE keyid = NULL

Any suggestions?

Thanks!

Can you please post the output error you got? Did you see, in debug mode, what jdbc receive?

I'm assuming that inside test_db there's a table called test_table, am i right?

That's at LS startup:

16:19:39.171 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/jdbc_driver_library = "/etc/logstash/mysql-connector-java-5.1.42/mysql-connector-java-5.1.42-bin.jar"
16:19:39.171 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/jdbc_driver_class = "com.mysql.jdbc.Driver"
16:19:39.171 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/jdbc_connection_string = "jdbc:mysql://localhost:3306/test_db"
16:19:39.172 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/jdbc_user = "root"
16:19:39.172 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/jdbc_password =
16:19:39.172 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/parameters = {"ostype"=>"major"}
16:19:39.172 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/statement = "select value from test_table WHERE keyid = :ostype"
16:19:39.172 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/target = "loo"
16:19:39.172 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/use_cache = false
16:19:39.172 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/id = "7c0e1c9d2c02a7fec055a98665a5d3551194ccc4-3"
16:19:39.172 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/enable_metric = true
16:19:39.172 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/add_tag = []
16:19:39.172 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/@remove_tag = []
16:19:39.172 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/add_field = {}
16:19:39.173 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/remove_field = []
16:19:39.173 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/periodic_flush = false
16:19:39.173 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/jdbc_validate_connection = false
16:19:39.173 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/jdbc_validation_timeout = 3600
16:19:39.173 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/default_hash = {}
16:19:39.173 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/tag_on_failure = ["_jdbcstreamingfailure"]
16:19:39.173 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/tag_on_default_use = ["_jdbcstreamingdefaultsused"]
16:19:39.174 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/cache_expiration = 5.0
16:19:39.174 [LogStash::Runner] DEBUG logstash.filters.jdbcstreaming - config LogStash::Filters::JdbcStreaming/cache_size = 500

That's when event comes in:
16:31:13.473 [[main]>worker7] DEBUG logstash.pipeline - filter received {"event"=>{"@timestamp"=>2017-05-17T14:55:46.588Z, "offset"=>0, "beat"=>{"hostname"=>"ip-10-0-5-208", "name"=>"ip-10-0-5-208", "version"=>"5.0.2"}, "input_type"=>"stdin", "@version"=>"1", "source"=>"", "message"=>"184.74.151.130 [2017-05-17T14:55:46+00:00] "GET /p.gif?cb=1302978897 HTTP/1.1" 200 43 0.000 "http://www.msn.com/" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36"", "type"=>"log"}}
16:31:13.474 [[main]>worker7] DEBUG logstash.filters.jdbcstreaming - Executing JDBC query {:statement=>"select value from test_table WHERE keyid = :ostype", :parameters=>{:ostype=>nil}}

If this is your event...

{"@timestamp"=>2017-05-17T14:55:46.588Z,
  "offset"=>0,
  "beat"=>{"hostname"=>"ip-10-0-5-208", "name"=>"ip-10-0-5-208", "version"=>"5.0.2"},
  "input_type"=>"stdin",
  "@version"=>"1",
  "source"=>"",
  "message"=>"184.74.151.130 [2017-05-17T14:55:46+00:00] \"GET /p.gif?cb=1302978897 HTTP/1.1\" 200 43 0.000 \"http://www.msn.com/\" \"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36\"",
  "type"=>"log"
}

what filters are you using to fill in the major field for the event - before you try to use it in the jdbc lookup?

I'm using useragent filter. Also tried to use parsed fields from kv. Neither have worked...

16:31:13.474 [[main]>worker7] DEBUG logstash.filters.jdbcstreaming - Executing JDBC query {:statement=>"select value from test_table WHERE keyid = :ostype", :parameters=>{:ostype=>nil}}

it's not finding the major field, so I guess something before is not working(filters useragent you said?). Major should be in your event but :parameters=>{:ostype=>nil}} it is not there.

Hello!
i have some questions please, i have two configurations files
first one: i recuperate first database with jdbc and i make the result in csv file

second configuration: i recuperate an other database and i use translate filter , i make search in csv file that i saved and if two column are the same i take the result

My question is if it is possible to make that with jdbc streaming and regroub two files configurations and make only one file with two database

sorry for my english , thank you

Would you please rewrite in english? Thank you

translate {
field => "field1"
destination => "code"
dictionary_path => "/tmp/Result.csv"
refresh_interval => "300"
}
this is my translate filter i search for field1 in dictionary_path and if he find it , it return the code and insert it in the database
i wnat to make the same thing using jdbc streaming

Yes. That is exactly why we wrote the jdbc_streaming filter plugin. :grinning:

this is what i want:

input{
jdbc{
statement =>" select field1, field2, field3 from database1"
......
}
filter {

jdbc_streaming {
statement => " select field 4, field 5 from database2"

( what i want when field2=field4 Add field5 into the first database and make output in ES)
}

No need for csv and translate.

LS            DB1            DB2
|              |              |
Inputs <-------/              |
|                             |
Transformations 1             |
|                             |
Enhancements <----------------/
|
Transformations 2
|
Outputs

Transformations 1 would be changes you need to the event fields/values to do before the JDBC Streaming enhancement - like mutates.
Transformations 2 would be changes you need to the event fields/values to do after the enhancement merged data.

thank you i ask how can i make that:

statement => " select field 4, field 5 from database2"

( what i want when field2=field4 Add field5 into the first database and make output in ES)
}

EDITED

input{
  jdbc{
    statement => "select field1, field2, field3 from database1"
    ......
  }
}
filter {
  jdbc_streaming {
    statement => "select field5 from database2 where field 4 = :field2val"
    parameters => {"field2val" => "field2"}
}

Replace fieldX with your real field names and databaseX with your real database names, should work.

Remember to look at the cache config options - this will prevent too many network calls to database2.
The cache uses the substituted parameter hash as the key e.g.

{"field2" => "foo"} will cache the recordset for database2 where field4 == "foo" or an empty recordset if there is no field4 == "foo"
if you set a default this will be used for field5 and the event is tagged with "_defaultused" or similar.
{"field2" => "bar"} will cache the recordset for database2 where field4 == "bar"

thank you for your clear answer, for me field2val is not static value that i want to search i want if the two column field 2 and field 4 are the same i return field5

i'm so sorry if i didn't understand well what you said

NOTE:
If database2 is changing often i.e. field4 == "foo", field5 == "apples" now and field4 == "foo", field5 == "oranges" 5 minutes later then you should use a small cache expiry time (default is 5 seconds).

If database2 is (mostly) static reference data then long cache expiry is better - you should change it from the default.

Also find out how many distinct values of field4 you have in database2 this will help determine the best cache size (default is 500).
The cache is in memory so you do not want the JVM to run out of heap by caching too many records.

Think of field2val as a label that "glues" the field2 name to the place in the query string that the Event value of field2 must be put.

CORRECTION:
I switched field2 and field4val around by mistake. It should be like this. I have edited my original post.

input{
  jdbc{
    statement => "select field1, field2, field3 from database1"
    ......
  }
}
filter {
  jdbc_streaming {
    statement => "select field5 from database2 where field 4 = :field2val"
    parameters => {"field2val" => "field2"}
}
1 Like

thank you very much my database2 changes dialy i think i have to minimize cache , i still confused about field2val , i ask if i can make:
filter {
jdbc_streaming {
statement => "select field5 from database2 where field 4 = :field2"

whithout pasisng by field2val