Logstash jdbc input checkpoint

hi all,

I am using logstash to select data from a database and index them. my logstash configuration is as following:

input {
jdbc {
jdbc_driver_library => "/home/srahimi/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
jdbc_connection_string => "jdbc:oracle:thin:@10.0.221.163:1521:xe"
jdbc_user => "hr"
jdbc_password => "hr"
statement => "SELECT * from HR.COUNTRIES"
}
}
output {
elasticsearch {
hosts => ["http://192.168.170.153:9200"]
index => "dbxe-%{+yyyy.MM.dd}"
user => "logstash_interna20"
password => "x-pack-test-password"
}
stdout { codec => rubydebug }
}

suppose, database consists of millions records, and logstash select data from database everyday, at once (by setting schedule). In addition, in future days, many new records are added to database (this process is random, it means that the new data will not be added every day).
how can i define a checkpoint in logstash so that it can understand the last time where data selected and indexed from database , and for the next records use the check point in select statement to select data after that time; because, if logstash does select to database and search among all the records everyday without any condition, it decreases database performance.
could you please advise me about this?

There are different way you can achieve this.
Lets say you have uniq field emp_id in this oracle database. if yes you can track that field

tracking_column => "emp_id"
you have to use > :sql_last_value as well I believe
and next time it will start from last emp_id

For example in my case
statement => "select * from jobs where job > :sql_last_value"
tracking_column=>"job"
record_last_run => true
last_run_metadata_path => "/tmp/logstash-job.2019.lastrun"

Second method you can use is search by timestamp if you have index this database.

I tested both method on two different database and it work. here is example

statement_filepath => "/etc/logstash/conf.d/sql/job_status_1hour.sql"
clean_run=>true
schedule => "*/30 * * * *"

output section has document_id => "%{job_num}" as it is uniq field in database.
query is simple
select * from jobs where status_updated > sysdate - interval '60' minute

second method will never go and scan any changes.
For example if your employee "sahere37" will changes address your elk will still sees old address.

That is why I found first method more reliable. but for that you need some kind of filed in your db table to track it.

many thanks. it is so helpful.

db has a time column, and I want to select db after a specific time, is it possible to change the initial value of "sql_last_value"?

sql_last_value is not any number from db. it is recoreded by logstash when it will run it.

it will run full table scan first time depending upon your query, since then it will track the column define in tracking_column and only get you result higher then that column.

if you post your example it would be better to suggest the solution.

sql_last_value can be either time the statement was executed or a value from the recordset.
When using tracking_column etc it is a value from the recordset. Thesql_last_value variable is updated after each row is processed so recordset ordering is important.

To "seed" the file, run LS once with a special query that returns the one row that has the value you want to start from (use stdout output if you don't want this in Elasticsearch or delete it in ES). Then switch to the proper production config and the statement should start from the value persisted in the file atlast_run_metadata_path ` (before running prod config check this file has the value you expect).

many thanks for you reply,
in sql database, there is a Date column, I want to save the last one in sql_las_value. my logstash config is as following:

input {
 jdbc {
    jdbc_driver_library => "F:\driver\sqljdbc_6.0\enu\jre8\sqljdbc42.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://local:5453;databaseName=TotalTXN"
    jdbc_user => "sss"
    jdbc_password => "sss123"
    schedule => "* * * * *"
	clean_run => "true"
    statement => "SELECT MAX(date) FROM dbo.TotalTxn_Control"
   use_column_value => "true"
   tracking_column => "date"
     }
}
output {
  elasticsearch { 
    hosts => ["http://192.168.170.153:9200"]
    index => "mssql3-%{+yyyy.MM.dd}"
    user => "logstash_internal25"
    password => "x-pack-test-password"
 }
  stdout { codec => rubydebug }
}

actually there are two issue:
1-
the maximum value of date for example is "1397/01/01" but logstash considers it as "February 1th 1397 " and show it in kibana. how can i handle this issue? I want to "1397/01/01" be shown in kibana and also be saved as sql_last_value
2-
when logstash starts, the value of sql_last_value (in .logstash_jdbc_last_run) is always 0 and is not for example as "1397/01/01". how can i handle this issue?

Answering Q2 first...

Storing and re-using sql_last_value is only useful if you have a SQL statement that makes use of it via substitution internally.

Here are two examples.

  1. The column tracked is a TIMESTAMP (or DATETIME) datatype column, for example executed_at.
    SELECT * FROM jobs WHERE executed_at > :sql_last_value ORDER BY executed_at.
    Here you need three settings:
    tracking_column_type => "timestamp"
    tracking_column => "executed_at"
    use_tracking_column => true
  2. The column tracked is a NUMERIC data type, for example job_number
    SELECT * FROM jobs WHERE job_number > :sql_last_value ORDER BY job_number.
    Here you need only two settings (tracking_column_type defaults to "numeric"):
    tracking_column => "job_number"
    use_tracking_column => true

About Q1...
If the datatype of date is a TIMESTAMP or a SQL DATE then the jdbc input will convert the column (using the jdbc column type info) to an appropriate Logstash datatype. For DATE and TIMESTAMP this is a Logstash Timestamp datatype (essentially the number of seconds or milliseconds offset from the UNIX epoch 1970-01-01. In Elasticsearch and Kibana dates and times are also stored in a similar way so that date fields can be compared.
This means that there are internal and external representations of dates.

         0 -> 1970/01/01 or 1970-01-01T00:00:00.000 UTC
1551174712 -> Tuesday, 26 February 2019 09:51:52 GMT+00:00 or 2019-02-26T09:51:52

Locale and language settings determine which external representation is given by default.
If you don't want or need date math in ES or Kibana you can CAST or FORMAT the date to a string representation but you still need the date as timestamp for the sql_last_value.

SELECT FORMAT (date, 'yyy/MM/DD') as date, date as tracking
FROM dbo.TotalTxn_Control
WHERE date > :sql_last_value
ORDER BY date

With settings

tracking_column_type => "timestamp"
tracking_column => "tracking"
use_tracking_column => true
remove_field => ["tracking"] # optional

You will probably want to use a new index pattern as the old mapping will associate date with timestamp and not string.

1 Like

many thanks , it has been solved, thanks alot.
just a question? what is the reason of using "order by"?

i am using following config without "order by" and it works? what is the difference of using it?

jdbc {
    jdbc_driver_library => "F:\driver\sqljdbc_6.0\enu\jre8\sqljdbc42.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://local:3333;databaseName=TotalTXN"
    jdbc_user => "***"
    jdbc_password => "***"
    statement => "
	DECLARE @DDate1  CHAR(10)
   select @DDate1=REPLACE(MAX(Date),'/','') from TotalTXN.dbo.TotalTxn_Control
   select TOP 2 [BaseDate_Accept_Setel]
       from dbo.vw_TotalTXN where (BaseDate_Accept_Setel<=@DDate1) AND (BaseDate_Accept_Setel> :sql_last_value)
	"
use_column_value => "true"
tracking_column => "basedate_accept_setel"
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.