Unable to push incremental data from Cassandra DB to daily index using logstash

Hello,
I am trying to create a daily index to fetch data from Cassandra DB using JDBC driver logstash.
As soon it creates a new index with new date it is again pulling the whole data , which is already available in yesterday's index. Here is my conf File:

input{
        jdbc{
                # mysql jdbc connection string to our backup databse
                jdbc_connection_string => "jdbc:cassandra://10.113.213.167:9042/imi_datamartdb"

                # the user we wish to excute our statement as
                jdbc_user => "elk_user"
                jdbc_password => "Zsw2@Mji9@elk"

                # the path to our downloaded jdbc driver
                jdbc_driver_library => "/opt/CassandraJdbcDriver/cassandrajdbc1.1.jar"

                # the name of the driver class for mysql
                jdbc_driver_class => "com.dbschema.CassandraJdbcDriver"

                # the control statement
                #record_last_run=>true
                #type => "product"
               statement => "SELECT * FROM imi_datamartdb.server_performance where time_stamp > :sql_last_value ALLOW Filtering"
                use_column_value => true
                tracking_column => "time_stamp"
                record_last_run => true
                schedule => "*/5 * * * *"
                clean_run => true
                last_run_metadata_path => "/opt/elk/.logstash_jdbc_last_run"


}
}
filter{ }
output
{
 if [kpi_name] in ["DiskFilesystemUtilization","CpuUtilization","MemoryUtilization"]{
elasticsearch{
#protocol => https
index => "imiperformance-%{+YYYY.MM.dd}"
}

Could you please suggest on this.

Do not use clean_run => true (remove it) as this resets the sql_last_value. You can use it once, say to re-ingest to a new index after debugging but you should not use it for normal Logstash operations.

I have removed clean_run => true and still I am facing the same issue. Index created today has all the records from previous days. could you please suggest.

Ok. I suspect this is a coercion issue.

We do not have a specific adapter for Cassandra SQL so the default plain SQL is used. Each adapter is responsible for knowing how string literals and timestamp literals should be formatted.

At the debug logging level, what does the SQL statement look like after sql_last_value has been substituted?

I get these logs after starting logstash conf file:

[2019-02-01T08:55:12,436][INFO ][logstash.inputs.jdbc ] (0.348773s) SELECT * FROM imi_datamartdb.server_performance where time_stamp > 0 ALLOW Filtering

So that is starting from scratch.

According to this the timestamp literal can be an integer.

While testing, you may want to add LIMIT 100 to the sql statement.

When the run is finished, what data is in the /opt/elk/.logstash_jdbc_last_run file?

It is a real time data pulling , so I dont want to stop the process. Currently I can see below date in .logstash_jdbc_last_run file:

'2019-01-31 14:17:38.514940000 Z'

With this setting

tracking_column => "time_stamp"

The input is using this format of value, I am not sure whether Cassandra handles this properly.

'2019-01-31 14:17:38.514940000 Z'

Try the numeric tracking_column_type.
tracking_column_type => "numeric"

Do you have a SQL command line tool for CassandraSQL?
Can you show me what the output of one record looks like?

Thank you. i will try this.
Unfortunately , I dont have direct access to CassandraSQL where I can run queries.

Hello,
I got this from DB Team:
host_name | time_stamp | account_name | geo | host_type | hosting_type | kpi_name | kpi_subtype | kpi_type | subsystem_update | unit | value_current | value_max
---------------------------+---------------------------------+--------------+------+-----------+--------------+------------------+-------------+-------------+------------------+------+---------------+-----------
******** | 2018-11-30 23:28:47.000000+0000 | null | null | null | null | Zombie_Processes | procs | Performance | NO | null | 0 | null
*********| 2018-11-30 23:29:49.000000+0000 | null | null | null | null | HOST | time | Performance | NO | s | 0.026073 | 50.000000

Are you able to CAST() this back to the numeric format?

The CQL docs say

timestamp
64-bit signed integer representing the date and time since epoch (January 1 1970 at 00:00:00 GMT) in milliseconds.

Every 5 minutes the scheduler will run. This will "build" a new statement SELECT * FROM imi_datamartdb.server_performance where time_stamp > 0 ALLOW Filtering where the 0 is replaced with the text you saw in the file. e.g. SELECT * FROM imi_datamartdb.server_performance where time_stamp > '2019-01-31 14:17:38.514940000 Z' ALLOW Filtering but I don't know if the literal should be single, double quoted or other [].

The integer milliseconds equivalent of your timestamp is 1548944258514 and so I assume the query should look like SELECT * FROM imi_datamartdb.server_performance where time_stamp > 1548944258514 ALLOW Filtering

At this point it is important that you add LIMIT 100 to the statement so we can see how the statement is updated for each scheduled run, you can drop the schedule down to 1 minute or change the limit to 1000.

Hello,

Where I can see that it has changed value from 0 to value updated in file?
I just see last log:
SELECT * FROM imi_datamartdb.server_performance where time_stamp >0 ALLOW Filtering
and it does not generate any log after this.

Each time the scheduler runs, you should see a new statement logged, i.e. every 5 minutes.

Hello,

I have tried with Limiting the records, but it starts the process and does not create any index.
each minute it just throw the log
SELECT * FROM imi_datamartdb.server_performance where time_stamp >0 ALLOW Filtering
and .logstash_jdbc_last_run only shows value 0.

I have removed limit constraint from query and I dont any logs generated after the 1st query which is :
SELECT * FROM imi_datamartdb.server_performance where time_stamp >0 ALLOW Filtering
and immediately no file created as .logstash_jdbc_last_run

I am stuck, please help me.

Logs for your info:

[2019-02-06T07:25:05,542][INFO ][logstash.inputs.jdbc ] (0.210450s) SELECT * FROM imi_datamartdb.server_performance where time_stamp > 0 LIMIT 1000 ALLOW Filtering
Using authentication as user 'elk_user'
[2019-02-06T07:30:00,287][INFO ][com.datastax.driver.core.ClockFactory] Using native clock to generate timestamps.

[2019-02-06T07:30:01,694][INFO ][logstash.inputs.jdbc ] (0.195891s) SELECT * FROM imi_datamartdb.server_performance where time_stamp > 0 LIMIT 1000 ALLOW Filtering
Using authentication as user 'elk_user'
[2019-02-06T07:35:00,400][INFO ][com.datastax.driver.core.ClockFactory] Using native clock to generate timestamps.

[2019-02-06T07:35:01,396][INFO ][logstash.inputs.jdbc ] (0.217050s) SELECT * FROM imi_datamartdb.server_performance where time_stamp > 0 LIMIT 1000 ALLOW Filtering

Before you send these to elasticsearch we should find out what the events received at the output section look like.

Please swap the elasticsearch output piece with stdout { codec => rubydebug } and paste one of the events printed to stdout - you can use LIMIT 1 for this.

[2019-02-06T11:30:04,834][INFO ][logstash.inputs.jdbc ] (0.065439s) SELECT * FROM imi_datamartdb.server_performance where time_stamp > 0 LIMIT 1 ALLOW Filtering
{
"@timestamp" => 2019-02-06T11:30:05.055Z,
"time_stamp" => 2018-11-30T23:28:47.000Z,
"unit" => "null",
"kpi_name" => "Zombie_Processes",
"geo" => "null",
"hosting_type" => "null",
"account_name" => "null",
"kpi_subtype" => "procs",
"subsystem_update" => "NO",
"value_max" => "null",
"@version" => "1",
"value_current" => "0",
"kpi_type" => "Performance",
"host_name" => "*****",
"host_type" => "null"
}

again after 5 mints: same time_stamp:
[2019-02-06T11:40:00,949][INFO ][logstash.inputs.jdbc ] (0.252149s) SELECT * FROM imi_datamartdb.server_performance where time_stamp > 0 LIMIT 1 ALLOW Filtering
{
"@timestamp" => 2019-02-06T11:40:00.966Z,
"time_stamp" => 2018-11-30T23:28:47.000Z,
"unit" => "null",
"kpi_name" => "Zombie_Processes",
"geo" => "null",
"hosting_type" => "null",
"account_name" => "null",
"kpi_subtype" => "procs",
"subsystem_update" => "NO",
"value_max" => "null",
"@version" => "1",
"value_current" => "0",
"kpi_type" => "Performance",
"host_name" => *****,
"host_type" => "null"
}

Ok, so it looks like the jdbc input is converting the time_stamp field to a Logstash Timestamp datatype (above, the value is not in quotes like the @timestamp field).

Please try adding tracking_column_type => "timestamp" to the jdbc input settings.

Still use LIMIT 1 but set the schedule for every 15 seconds. schedule => "*/5 * * * * *" (5 * to the right). Delete the file /opt/elk/.logstash_jdbc_last_run before you start Logstash.

Then post the logged statement lines, the contents of /opt/elk/.logstash_jdbc_last_run (is it changes too quickly then change the schedule to 20 seconds) and the output on stdout.

      "@timestamp" => 2019-02-06T13:26:20.920Z,
             "geo" => "null",
   "value_current" => "0.024584",
      "time_stamp" => 2018-12-01T18:44:59.000Z,
    "hosting_type" => "null",
"subsystem_update" => "NO",
        "@version" => "1",
        "kpi_name" => "_HOST_",
       "host_type" => "null",
     "kpi_subtype" => "time",
    "account_name" => "null",
       "host_name" => "*******",
       "value_max" => "50.000000"

}
Using authentication as user 'elk_user'
[2019-02-06T13:27:20,144][INFO ][com.datastax.driver.core.ClockFactory] Using native clock to generate timestamps.
[
[2019-02-06T13:27:20,906][INFO ][logstash.inputs.jdbc ] (0.055150s) SELECT * FROM imi_datamartdb.server_performance where time_stamp > '2018-12-01 18:44:59.000000' LIMIT 1 ALLOW Filtering
{
"kpi_type" => "Performance",
"unit" => "",
"@timestamp" => 2019-02-06T13:27:20.928Z,
"geo" => "",
"value_current" => "0",
"time_stamp" => 2018-12-18T07:28:43.000Z,
"hosting_type" => "",
"subsystem_update" => "NO",
"@version" => "1",
"kpi_name" => "Zombie_Processes",
"host_type" => "",
"kpi_subtype" => "procs",
"account_name" => "",
"host_name" => "*******",
"value_max" => ""

[2019-02-06T13:28:20,799][INFO ][logstash.inputs.jdbc ] (0.045477s) SELECT * FROM imi_datamartdb.server_performance where time_stamp > '2018-12-18 07:28:43.000000' LIMIT 1 ALLOW Filtering
{
"kpi_type" => "Performance",
"unit" => "s",
"@timestamp" => 2019-02-06T13:28:20.809Z,
"geo" => "",
"value_current" => "0.015405",
"time_stamp" => 2018-12-18T13:28:46.000Z,
"hosting_type" => "",
"subsystem_update" => "NO",
"@version" => "1",
"kpi_name" => "PING",
"host_type" => "",
"kpi_subtype" => "time",
"account_name" => "",
"host_name" => "****",
"value_max" => "50.000000"