Synch pgsql with elastic with jdbc plugin

Hi I having trouble with syncing data from db to elastic, the problem is that when Iam loading data, sometimes Im missing 3, 4 records sometimes more in my elastic index
I have 10 workers which load data to database.
My logstash pipeline

input {
    jdbc {
        jdbc_driver_library => "/usr/share/logstash/postgresql.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_connection_string => "jdbc:postgresql://pgsql/broker"
        jdbc_user => "user"
        jdbc_password => "password"
        jdbc_paging_enabled => false
        jdbc_page_size => 100
        jdbc_validate_connection => true
        statement => "SELECT CAST(metadata AS text), pwp_id, updated_at::TEXT FROM pwps WHERE updated_at > :sql_last_value ORDER BY updated_at"
        use_column_value => true
        tracking_column => "updated_at"
        tracking_column_type => "timestamp"
        schedule => "*/2 * * * * *"
        record_last_run => true
        last_run_metadata_path => "/usr/share/logstash/.sync_pwp_last_run"
    }
}
filter {
    json {
        source => metadata
        remove_field => ["metadata"]
    }
}
output {
    elasticsearch {
        index => "index"
        hosts => ["http://elasticsearch:9200"]
        document_id => "%{id}"
    }
    stdout {
        codec => rubydebug
    }
}

and my table schema is

CREATE TABLE "public"."pwps" (
    "id" uuid NOT NULL,
    "pwp_id" character varying(100),
    "metadata" jsonb,
    "updated_at" timestamp(6) DEFAULT now(),
    "pwp_type_id" integer,
    CONSTRAINT "pwps_pkey" PRIMARY KEY ("id")
) WITH (oids = false);

Is threre anyone how had similar problem? And solved it?

Is id unique in SQL database ? if yes I guess the issue is with the index seeting parameter refresh_interval, the document is not immdiately available for search for any update by an other logstash worker if id is duplicated.
Reference : https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-update-settings.html

You may try to enforme more unicity by combining id and updated_at to get a unique document_id

Hope this can help

The id is unique, but I use upstream to override existing document.
The problem is that missing documents are never available in index, so I asume that they are never pushed to index

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.