Reading logs from DB - Avoiding of duplicate records

Hello,
I would like to read LOGS from database from table logs. These logs I would like to send using logstash to syslog server.

I would to ensure sending events only once:

  • I mark events that are going to be sent
  • I select marked events (during this time, some events can arrive and these events are not marked)
  • These sevents I can sent to remote syslog server
  • I can delete logs from table that had already been sent

1) Update some records (marking)

UPDATE logs SET marked_by_logstash = true WHERE marked_by_logstash = false  ;

2) Select data from database based on marks

SELECT * FROM logs WHERE marked_by_logstash = true  ;

3) After marking and reading - Start with deleting unnecessary rows

DELETE FROM logs WHERE marked_by_logstash = true  ;

I tried to do with 3x jdbc in input section in Logstash but I found out that executing input JDBC sequentially is not possible - here.

Is it possible to read rows from DATABASE only once to avoid sending duplicate records?
Do you have any idea how to do?
Vasek

It is possible if there is a column with either a date or a sequence.

1 Like

Thank you @Badger !

I am guessing that this can be achieved with these 5 parameters. Am I right?

use_column_value => true
tracking_column => "timestamp-of-generating-event"
tracking_column_type => "timestamp"
last_run_metadata_path => "/etc/logstash/jdbc_last_run_metadata_path/test"
record_last_run => true

I do know why, but events are still duplicated.

Logstash pipeline configuration file

input {
  jdbc {
    jdbc_validate_connection => true
    jdbc_driver_library => "/etc/logstash/java-libs/mariadb-java-client-2.4.2.jar"
    jdbc_driver_class => "Java::org.mariadb.jdbc.Driver"
    jdbc_connection_string => "jdbc:mariadb://10.88.88.150:3306/test_database"
    jdbc_user => "*****"
    jdbc_password => "*******"
    schedule => "*/1 * * * *"
    statement => "SELECT DATE_FORMAT(timestamp, '%Y-%m-%dT%TZ') AS timestamp_generating_event, severity, message from logs ORDER BY timestamp_generating_event ASC"
    sql_log_level => "debug"
    record_last_run => true
    last_run_metadata_path => "/etc/logstash/jdbc_last_run_metadata_path/mariadb"
    use_column_value => true
    tracking_column => "timestamp_generating_event"
    tracking_column_type => "timestamp"
  }
}

Logstash is querying to database every minute.

Generating events to database
Every second I am generating row (event) to database.

Observing last_run
cat /etc/logstash/jdbc_last_run_metadata_path/mariadb
now file is empty

*** What happend ***
I generated 8 events to database:

generating 1 event - 2019-06-23T14:10:50.750
generating 2 event - 2019-06-23T14:10:51.761
generating 3 event - 2019-06-23T14:10:52.771
generating 4 event - 2019-06-23T14:10:53.781
generating 5 event - 2019-06-23T14:10:54.793
generating 6 event - 2019-06-23T14:10:55.805
generating 7 event - 2019-06-23T14:10:56.818
generating 8 event - 2019-06-23T14:10:57.829

I checked this records in database:

SELECT DATE_FORMAT(timestamp, '%Y-%m-%dT%TZ') AS timestamp_generating_event, severity, message from logs ORDER BY timestamp_generating_event DESC"

Output:

timestamp_generating_event	severity        message
2019-06-23T14:10:57Z    info-8  obsah zpravy 8
2019-06-23T14:10:56Z    info-7  obsah zpravy 7
2019-06-23T14:10:55Z    info-6  obsah zpravy 6
2019-06-23T14:10:54Z    info-5  obsah zpravy 5
2019-06-23T14:10:53Z    info-4  obsah zpravy 4
2019-06-23T14:10:52Z    info-3  obsah zpravy 3
2019-06-23T14:10:51Z    info-2  obsah zpravy 2
2019-06-23T14:10:50Z    info-1  obsah zpravy 1

Based on schedule statement in jdbc input plugin

schedule => "*/1 * * * *"

Logstash performed the query:

SELECT DATE_FORMAT(timestamp, '%Y-%m-%dT%TZ') AS timestamp_generating_event, severity, message from logs ORDER BY timestamp_generating_event ASC

So I checked last_run file:

cat /etc/logstash/jdbc_last_run_metadata_path/mariadb
--- 2019-06-23 14:10:57.000000000 +00:00

Logstash insert 8 documents to Elasticsearch index. That's ok.

Now Logstash - jdbc input plugin - is waiting (1 minute) to next round .
I tried to generate 4 events more.

generating 9 event - 2019-06-23T14:11:45.230
generating 10 event - 2019-06-23T14:11:46.239
generating 11 event - 2019-06-23T14:11:47.248
generating 12 event - 2019-06-23T14:11:48.257

When 1 minute elapsed, logstash started with query to database again.

In this time, content of database was:

timestamp_generating_event	severity        message
2019-06-23T14:11:48Z    info-12 obsah zpravy 12
2019-06-23T14:11:47Z    info-11 obsah zpravy 11
2019-06-23T14:11:46Z    info-10 obsah zpravy 10
2019-06-23T14:11:45Z    info-9  obsah zpravy 9
2019-06-23T14:10:57Z    info-8  obsah zpravy 8
2019-06-23T14:10:56Z    info-7  obsah zpravy 7
2019-06-23T14:10:55Z    info-6  obsah zpravy 6
2019-06-23T14:10:54Z    info-5  obsah zpravy 5
2019-06-23T14:10:53Z    info-4  obsah zpravy 4
2019-06-23T14:10:52Z    info-3  obsah zpravy 3
2019-06-23T14:10:51Z    info-2  obsah zpravy 2
2019-06-23T14:10:50Z    info-1  obsah zpravy 1

Content of last_run:

cat /etc/logstash/jdbc_last_run_metadata_path/mariadb     
--- 2019-06-23 14:11:48.000000000 +00:100: 

And Index had 20 document. It is 8 + 12.
Every round logstash adds 12 documents to ES index again.

What am I doing wrong? Could you please help me?

You need a WHERE clause in the query that references sql_last_value (which is a parameter populated from the last_run_metadata). The documentation has an example.

1 Like

It works. Awesome. You made my day! Thank you @Badger

Just one additional question:
Is it possible to preserve milliseconds? 15:36:12 .459

cat /etc/logstash/jdbc_last_run_metadata_path/mariadb                                                                                                               
--- 2019-06-23 15:36:12.000000000 +02:00

It is saving the last value it extracted from the database. If that includes milliseconds I would expect it to save them.

1 Like

You are right. It works fine. Thank you.

In addition, if you want to avoid duplicates you can use filter-fingerprint hashing the concatenation of the fields you consider and use the result as id (document_id). Or just use a primary key as id (for a better distribution, shard routing by default uses the id, you can use figerprint to MUMUR3 with the primary key).

1 Like

Hi @Miguel1 thank you for tip. The filter-fingerpriting can be very useful If we deciced to store data to Elasticsearch. In case of sending data to remote syslog I think it cannot help.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.