How to continuously pull from Cassandra into Logstash

Hi all,

I'm trying to setup Logstash to continuously pull information from Cassandra, in the same fashion it works with file inputs. I'm using the Cassandra JDBC Driver and I can successfully pull the contents, but once I get the result of my SELECT statement, Logstash closes down. The next time I start it, it reads the entire table again.
Is there a way to configure it to run as a daemon and pull any new entries matching my statement?

Here is my current config:

input {
jdbc {
jdbc_connection_string => "jdbc:cassandra://hostname:9160"
jdbc_user => "cassandra"
jdbc_password => "cassandra"
jdbc_driver_library => "$PATH/cassandra_driver.jar"
jdbc_driver_class => "org.apache.cassandra.cql.jdbc.CassandraDriver"
statement => "SELECT * FROM keyspace.table"
}
}

output {
stdout {
codec => json_lines
}
}

Here's what happens when I start it:

root@012456ce9188:/# /opt/logstash/bin/logstash -f /test_cassandra.conf
Settings: Default pipeline workers: 8
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Logstash startup completed
.....
entries returned here, as expected
......
Logstash shutdown completed

Any help would be appreciated, I'm very new to Logstash.

I think you forgot to set schedule option:

There is no schedule by default. If no schedule is given, then the statement is run exactly once.

1 Like

Hi Thomas,

You are right, I had no schedule at all. Thanks for pointing me to the jdbc plugin documentation, it was something I had not found.

The solution to my problem has two parts:

  1. Adding a schedule, like you said.
  2. Using predefined parameters, so that I only select the entries which have been added since the last time the query was run.

Thanks again!

Hi Nkanov,

Can you please help me to use JDBC input plugin in logstash for cassandra. Here is my configuration,
I am using same JDBC driver as you mentioned.

input {
jdbc {
jdbc_connection_string => "jdbc:cassandra://localhost:9160/mydata"
jdbc_user => "cassandra"
jdbc_driver_library => "D:\driver\cassandrajdbc1.1"
jdbc_driver_class => "org.apache.cassandra.cql.jdbc.CassandraDriver"
statement => "SELECT * from sampledata"
}
}
output {

	stdout { codec => json }

}

error
D:\ELK\logstash-2.3.3\logstash-2.3.3\bin>logstash -f d:\cass.conf
io/console not supported; tty will not be manipulated
Settings: Default pipeline workers: 4
←[31mPipeline aborted due to error {:exception=>#<LogStash::ConfigurationError:
org.apache.cassandra.cql.jdbc.CassandraDriver not loaded. Are you sure you've in
cluded the correct jdbc driver in :jdbc_driver_library?>,

:backtrace=>["D:/ELK/logstash-2.3.3/logstash-2.3.3/vendor/bundle/jruby/1.9/gems/logstash-input-jdbc-3.0.2/lib/logstash/plugin_mixins/jdbc.rb:156:in
prepare_jdbc_connection'", "D:/ELK/logstash-2.3.3/logstash-2.3.3/vendor/bundle/jruby/1.9/gems/logstash-input-jdbc-3.0.2/lib/logstash/inputs/jdbc.rb:167:in register'", "D:/ELK/logstash-2.3.3/logstash-2.3.3/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.3-java/lib/logstash/pipeline.rb:330:in
start_inputs'", "org/jruby/RubyArray.java:1613:in each'", "D:/ELK/logstash-2.3.3/logstash-2.3.3/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.3-java/lib/logstash/pipeline.rb:329:in
start_inputs'", "D:/ELK/logstash-2.3.3/logstash-2.3.3/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.3-java/lib/logstash/pipeline.rb:180:in start_workers'", "D:/ELK/logstash-2.3.3/logstash-2.3.3/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.3-java/lib/logstash/pipeline.rb:136:in
run'", "D:/ELK/logstash-2.3.3/logstash-2.3.3/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.3-java/lib/logstash/agent.rb:473:in start_pipeline'"], :level=>:error}←[0m
stopping pipeline {:id=>"main"}
The signal HUP is in use by the JVM and will not work correctly on this platform

Thanks