Logstash jdbc plugin with MSSQL

Hi Logstash gurus,

I am trying to fetch data from MSSQL using the Logstash jdbc plugin. I am on logstash 7.17.4.
I am using the below config for the input:

input {
    jdbc {
		jdbc_connection_string => "jdbc:sqlserver://HOST:PORT;databaseName=AGS;integratedSecurity=true;trustServerCertificate=true;Trusted_Connection=true;"
        jdbc_user => "${JDBC_USER}"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
        statement => "SELECT TOP(100) * from DBTABLE with(nolock) where LogDate > :sql_last_value ORDER BY LogDate ASC"
        tracking_column => "logdate"
        tracking_column_type => "timestamp"
        use_column_value => true
        last_run_metadata_path => "D:\Elastic\Logstash717\logs\.logstash_jdbc_last_run"
        schedule => "*/10 * * * * *"
    }
}

It works, however not as I expected. At the first run, the file D:\Elastic\Logstash717\logs.logstash_jdbc_last_run from last_run_metadata_path is initialized with 1970-01-01T00:00:00.000, but then, with only few sql executions, it quickly gets to current date:

[2023-10-12T05:09:22,032][INFO ][logstash.inputs.jdbc     ][gos][d445a8b13f63b88db9814df5173b9b99d1b89b68c865011e671f3fb2ebeaabc0] (0.060240s) SELECT TOP(100) * from DBTABLE with(nolock) where LogDate > '1970-01-01T00:00:00.000' ORDER BY LogDate ASC
[2023-10-12T05:09:31,273][INFO ][logstash.inputs.jdbc     ][gos][d445a8b13f63b88db9814df5173b9b99d1b89b68c865011e671f3fb2ebeaabc0] (0.007982s) SELECT TOP(100) * from DBTABLE with(nolock) where LogDate > '2023-10-11T08:00:04.093' ORDER BY LogDate ASC
[2023-10-12T05:09:40,225][INFO ][logstash.inputs.jdbc     ][gos][d445a8b13f63b88db9814df5173b9b99d1b89b68c865011e671f3fb2ebeaabc0] (0.008682s) SELECT TOP(100) * from DBTABLE with(nolock) where LogDate > '2023-10-11T15:00:06.327' ORDER BY LogDate ASC
[2023-10-12T05:09:50,303][INFO ][logstash.inputs.jdbc     ][gos][d445a8b13f63b88db9814df5173b9b99d1b89b68c865011e671f3fb2ebeaabc0] (0.009608s) SELECT TOP(100) * from DBTABLE with(nolock) where LogDate > '2023-10-11T22:00:12.550' ORDER BY LogDate ASC
[2023-10-12T05:10:00,341][INFO ][logstash.inputs.jdbc     ][gos][d445a8b13f63b88db9814df5173b9b99d1b89b68c865011e671f3fb2ebeaabc0] (0.004309s) SELECT TOP(100) * from DBTABLE with(nolock) where LogDate > '2023-10-12T05:00:14.237' ORDER BY LogDate ASC
[2023-10-12T05:10:10,397][INFO ][logstash.inputs.jdbc     ][gos][d445a8b13f63b88db9814df5173b9b99d1b89b68c865011e671f3fb2ebeaabc0] (0.003175s) SELECT TOP(100) * from DBTABLE with(nolock) where LogDate > '2023-10-12T12:00:15.930' ORDER BY LogDate ASC
[2023-10-12T05:10:20,517][INFO ][logstash.inputs.jdbc     ][gos][d445a8b13f63b88db9814df5173b9b99d1b89b68c865011e671f3fb2ebeaabc0] (0.001379s) SELECT TOP(100) * from DBTABLE with(nolock) where LogDate > '2023-10-12T12:00:15.930' ORDER BY LogDate ASC

Why is that? How is the :sql_last_value being incremented?
I was expecting it to be incremented with the schedule (10s) in order to fetch all the data from 1970-01-01 until today.

Thanks,
Catalin

Any feedback please?

Thank you!
Catalin

Hi.

As far as I know, last run is set to 1970 and if you run logstash, it processes everything at the beginning and saves the last record's date on the tracking column as last run. This means in the next run, if you have new records, it reads everything coming after this last run. If you do not have new records, it does not read. This is for preventing logstash read multiple times.

When using a timestamp tracking column, when it fetches the result set the input extracts the tracking column from each row and persists the last value it sees to last_run_metadata_path.

Are you trying to do paging to fetch the entire table, but only 100 rows every 10 seconds?

Hi Badger,

Yes, I am trying to fetch the entire table, but in batches of 100 or 1000 rows every 10s. Is there a proper way to do that? Should I not use the paging and remove the top(100)?
If I query the DB using another tool, I get more than 100000 rows, but with logstash and the above config, I get only 10s of rows ingested.

Thank you,
Catalin

I've also noticed the :sql_last_value parameter is incremented with 7h.
How can I decrease it?

You can use this setting if you do not want to use last run property.

input {
  jdbc {
    jdbc_connection_string => "jdbc:sqlserver://HOST:PORT;databaseName=AGS;integratedSecurity=true;trustServerCertificate=true;Trusted_Connection=true;"
    jdbc_user => "${JDBC_USER}"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    statement => "SELECT TOP(100) * from DBTABLE with(nolock) ORDER BY LogDate ASC"
    schedule => "*/10 * * * * *"
  }
}

Alternatively you can set the paging.

jdbc_paging_enabled => true,
jdbc_paging_mode => "explicit",
jdbc_page_size => 100000

Source: jdbc input plugin

It looks like TOP and ORDER BY are not being applied in the expected order. Perhaps try something like

SELECT TOP(100) * FROM 
   (SELECT * FROM DBTABLE with(nolock) where LogDate > :sql_last_value ORDER BY LogDate ASC) 

Not sure if you need as someAlias or not, I don't really speak SQL.

Or else use the paging that the input supports as Murat suggested.

Hi Murat, Badger,

Let me try what you suggested and I'll get back to you afterwards.

Thank you both!
Catalin

Hi Murat,

Does it make sense to keep TOP(100) if I set the jdbc_page_size => 100000 ?
I've tried the below:

jdbc_paging_enabled => true
jdbc_paging_mode => "explicit"
jdbc_page_size => 10000
statement => "SELECT * from DBTABLE with(nolock) ORDER BY LogDate ASC"

but it doesn't seem right.
The table count is 127174, while the index is currently 785970 and keeps growing. So I believe I'm getting duplicates....

Thanks,
Catalin

Hi Badger,

I've tried your suggestion and I get:

Exception when executing JDBC query {:exception=>Sequel::DatabaseError, :message=>"Java::ComMicrosoftSqlserverJdbc::SQLServerException: The ORDER BY clause is invalid in views, inline functions, derived tables, subqueries, and common table expressions, unless TOP, OFFSET or FOR XML is also specified."

Thanks,
Catalin

Hi Murat, Badger, I think I've nailed it!

The below input config is working!

input {
    jdbc {
		jdbc_connection_string => "jdbc:sqlserver://DBHOST:DBPORT;databaseName=DBNAME;integratedSecurity=true;trustServerCertificate=true;Trusted_Connection=true;"
        jdbc_user => "${JDBC_USER}"
        jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
		jdbc_paging_enabled => true
		jdbc_page_size => 10000
        statement => "SELECT *, DATEDIFF(s, '1970-01-01 00:00:00', LogDate) as unix_timestamp_in_secs from DBTABLE with(nolock) where DATEDIFF(s, '1970-01-01 00:00:00', LogDate) > :sql_last_value and DATEDIFF(s, '1970-01-01 00:00:00', LogDate) < DATEDIFF(s, '1970-01-01 00:00:00', CURRENT_TIMESTAMP) ORDER BY LogDate ASC OFFSET 0 ROWS"
        tracking_column => "unix_timestamp_in_secs"
        tracking_column_type => "numeric"
		record_last_run => true
		clean_run => true
        use_column_value => true
		last_run_metadata_path => "D:\Elastic\Logstash717\logs\.logstash_jdbc_last_run"
        schedule => "*/10 * * * * *"
    }
}

Basically, I am using a numeric tracking column (the unix timestamp in seconds) instead of a timestamp. Also, I make sure the LogDate is between the :sql_last_value and the CURRENT_TIMESTAMP.
Also, ORDER BY needs the OFFSET 0 ROWS.
The select will by default get batches of jdbc_page_size (10000) until it fetches all table data.

Thank you for all your help! Much appreciated!
Catalin

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.