The issue of fetching duplicate data in Logstash

I have written the following code within the input section, but I'm experiencing a problem where data is being fetched redundantly. Which part should I modify?

logstash.conf

input {
    beats {
        port => 5044
    }
    tcp {
        port => 50000
    }
    jdbc {
       ......

        statement => 'SELECT 
                      op.id as "id", p.id as "productId" 
                      FROM product_entity as p 
                      left join product_option_entity as op on op."productId" = p.id 
                      WHERE p."supplierId" IS NOT NULL AND op.id > :sql_last_value ORDER BY op.id ASC'
        schedule => "* * * * *"
        jdbc_fetch_size => 100
        use_column_value => true
        tracking_column => "op.id"
        tracking_column_type => "numeric"
        record_last_run => true
        clean_run => true
    }
}

filter { ...... }

output {
    elasticsearch {
        hosts => "elasticsearch:9200"
        user => "${LOGSTASH_INTERNAL_PASSWORD}"
        password => "${LOGSTASH_INTERNAL_PASSWORD}"
        index => "product-db"
        manage_template => true
        template => '/usr/share/logstash/template/product-template.json'
        template_name => "product"
        template_overwrite => true
    }
    stdout { codec => json_lines }
}

It seems that there is an issue with the condition "op.id > :sql_last_value" in the code. It's continuously fetching data that has already been retrieved, causing a problem of duplicate data being inserted.

When I modified the code as follows, it operates normally.

input {
    beats {
        port => 5044
    }
    tcp {
        port => 50000
    }
    jdbc {
        ......

        statement => 'SELECT id, name FROM product_option_entity WHERE id > :sql_last_value ORDER BY id ASC LIMIT :size OFFSET :offset'
        schedule => "* * * * *"
        jdbc_paging_enabled => true
        jdbc_paging_mode => "explicit"
        jdbc_page_size => 10000
        record_last_run => true
        clean_run => true
        tracking_column_type => "numeric"
        tracking_column => "id"
        use_column_value => true
        last_run_metadata_path => "/usr/share/logstash/last_run_metadata"
    }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.