JDBC_Streaming informations

Hi,

I'm actually working on a Logstash demo.

The idea is simple :
I use the Kafka Input Method to read my String (message)
AND i'm trying to do a SQL query with the JDBC_Streaming filter (if possible not with ruby_code method) as type of :
statement => "SELECT * FROM Objects WHERE Objects.message = %{message}"

I've tried some like :


parameters => { "getter" => "%{message}"}
statement => "SELECT * FROM Objects WHERE Objects.message = :getter"


statement => "SELECT * FROM Objects WHERE Objects.message = :getter"
parameters => { "getter" => "%{message}"}


parameters => {"getter" => "SELECT * FROM Item WHERE reference='message1'"}
statement => ":getter"


statement => "SELECT * FROM Objects WHERE Objects.message = event.get('message')"


parameters => { "getter" => "%{[message]}" }
parameters => { "getter" => "%{[message]}%" }
parameters => { "getter" => "%{message}%" }


And in most case I've => _jdbcstreamingfailure",
or _jdbcstreamingdefaultsused"
or both...

my filter is made this way :

filter {
jdbc_streaming {
jdbc_driver_library => "/PATH/mysql-connector-java-5.1.36/mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://PATH/DB"
jdbc_user => "USER"
jdbc_password => "PASSWORD"

statement => "SELECT * FROM Item WHERE reference = :getter"
parameters => { "getter" => "1/nouvellerepublique/11979"}
target => "country_details"

}
}

If someone got an idea I'm all ears,

Best Regard,
AH_M7

Have you looked in the Logstash log for clues?

Thanks for your answer Magnus,

I have tried,

I have launch the instance this way :
sudo /usr/share/logstash/bin/logstash -f logstash.conf -l .
and /usr/share/logstash/bin/logstash -f logstash.conf -l file.log (chmod 777 on file.log)
but nothing has been written.

I supposed it was due to the fact that no error was declared by logstash.
The only information I get is :
{
"@timestamp" => 2017-12-20T11:51:08.613Z,
"@version" => "1",
"country_details" => [
[0] {}
],
"message" => "1/nouvellerepublique/11979",
"tags" => [
[0] "_jdbcstreamingdefaultsused"
]
}

That is not considerate by Logstash as an error but do not solve my issue.

AH_M7

So your message field has the value: 1/nouvellerepublique/11979 and you have that exact text in a column of your database.

Parameters is a key/value Hash or dictionary. The key (LHS) is the text that is substituted for in the SQL statement SELECT * FROM Item WHERE reference = :getter. The value (RHS) is the field name in your event. The plugin reads the value from this key out of the event and substitutes that value into the statement. Quoting is done for you - you do not need to put quotes in the statement.
e.g. parameters => { "getter" => "message" }

Only use the field interpolation syntax on the RHS if you need to add a prefix or join two event field values together to build the substitution value. For example, imagine an IOT message that has an id and a location id and you have a DB of sensor registrations that have a column of id-loc_id, in this case your parameter hash would look like this:
parameters => { "sensor_registration" => "%{[id]}-%{[loc_id]}" }

Thanks for your answer guyboertje,

But my issue not seems to be fix, I tried your way and I result with a =>
{
"@timestamp" => 2017-12-21T14:40:25.518Z,
"@version" => "1",
"country_details" => [
[0] {}
],
"message" => "1/nouvellerepublique/11979",
"tags" => [
[0] "_jdbcstreamingdefaultsused"
]
}

So I don't know why... my actual code is :


input {
kafka {
bootstrap_servers => "------"
topics => "-----"
codec => plain{
charset => "UTF-8"
}
}
}

filter {
jdbc_streaming {
jdbc_driver_library => "----/Logstash_test/mysql-connector-java-5.1.36/mysql-connector-java-5.1.36-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://----:3306/----"
jdbc_user => "----"
jdbc_password => "----"
parameters => { "getter" => "%{[message]}"}
statement => "SELECT * FROM Item WHERE reference = :getter"
target => "country_details"
}
}

output {
stdout { codec => rubydebug }
}


with "message" => "1/nouvellerepublique/11979"
statement => "SELECT * FROM Item WHERE reference = 1/nouvellerepublique/11979"
give this record :
{
"@timestamp" => 2017-12-21T14:50:25.548Z,
"@version" => "1",
"country_details" => [
[0] {}
],
"message" => "1/ladepechedumidi/ldmpub2017285003",
"tags" => [
[0] "_jdbcstreamingdefaultsused"
]
}


with "message" => "1/nouvellerepublique/11979"
statement => "SELECT * FROM Item WHERE reference = '1/nouvellerepublique/11979'"
give the good answer


So i've supposed that the problem is due to the "/" char composition of my message.

But,
"message" => "1/nouvellerepublique/11979"
parameters => { "getter" => "%{[message]}"}
statement => "SELECT * FROM Item WHERE reference = ':getter'"

Do not give the good answer...

So I don't have much idea...
Are my "/" char replace by "/" || "//" or something else in the statement ?
Or did I do it the wrong way ?

I'm all ears,

Best Regard,
AH_M7

1 Like

By the way I tried with


"message" => "5"
parameters => { "getter" => "%{[message]}"}
statement => "SELECT * FROM Item WHERE reference = :getter"


It did not give me the good answer,


"message" => "5"
parameters => { "getter" => "%{[message]}"}
statement => "SELECT * FROM Item WHERE reference = ':getter'"


neither,
but


"message" => "5"
parameters => { "getter" => "%{[message]}"}
statement => "SELECT * FROM Item WHERE reference = '5'"

and

"message" => "5"
parameters => { "getter" => "%{[message]}"}
statement => "SELECT * FROM Item WHERE reference = 5"


Are working.. so are my question :
Is it without RUBY_code possible to make dynamical sql queries in the FILTER ?
Is it possible possible to do dynamical sql queries with jdbc_streaming ?

Best Regard,
AH_M7

Can you turn on query logging in the MySql engine?
We need to see the actual select statement that the plugin is sending to your db.
Please post it here.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.