We built simple, scalable, real-time Postgres to Elasticsearch sink

Hi everyone :waving_hand:,

I’m Eric, one of the maintainers of Sequin—an open‑source Change Data Capture (CDC) engine for Postgres. We recently shipped an Elasticsearch sink and wanted to share the approach (and answer any questions) with the Elastic community directly.

Why another CDC → ES solution?
We wanted a zero‑lag path from Postgres tables to Elasticsearch indexes without triggers, cron jobs, Kafka, or nightly ETLs—just sub‑second updates via the Bulk API.

What Sequin does in 30 sec

  • Taps logical replication in Postgres
  • Turns every INSERT / UPDATE / DELETE into JSON
  • Streams the events to your chosen destination (Kafka, SQS, SNS, … now Elasticsearch)

Key features of the ES sink

Capability Details
Zero‑lag search Changes show up in the index in ≈ sub‑second latency
Bulk API + back‑pressure Batches up to 10 k docs / request
Transforms Optional functions to reshape / filter docs
Backfill + live tail Fast initial load, then continuous WAL tailing
Throughput ~40–45 MB/s per sink in internal tests. Scale out by running multiple sinks

Quick‑start example (sequin.yaml)

# Stream the `products` table → ES index `products`
databases:
  - name: app
    hostname: your‑rds:5432
    database: app_prod
    username: postgres
    password: ****
    slot_name: sequin_slot
    publication_name: sequin_pub

sinks:
  - name: products-to-es
    database: app
    table: products
    transform_module: "my-es-transform"   # optional
    destination:
      type: elasticsearch
      endpoint_url: "https://es.internal:9200"
      index_name: "products"
      auth_type: "api_key"
      auth_value: "<base64-api-key>"

transforms:
  - name: "my-es-transform"
    transform:
      type: "function"
      code: |-
        def transform(action, record, changes, _metadata) do
          # Drop sensitive values before indexing
          %{record: Map.drop(record, ["sensitive-value"])}
        end

Common Q&A

  • Upserts or REPLACE? Bulk index op (create‑or‑replace).
  • Deletes? Row delete → bulk delete with same _id.
  • _id strategy? Concat of primary key(s) by default (custom logic welcome).
  • Partial updates / scripts? Not yet—tell us your use‑case!
  • Mapping clashes? ES errors surface in Sequin logs with the exact bulk line that failed.

How we can help

We're seeing the Elasticsearch sink become one of our most popular destinations - so we want to join the community to ensure we're around to answer questions if they come up

— Eric & the Sequin team

3 Likes

Hey Eric, thanks for sharing this — really cool approach!

I like how Sequin cuts out the usual complexity (Kafka, cron jobs, etc.) and keeps things fast and simple with sub-second syncs. Using logical replication straight to Elasticsearch via the Bulk API sounds super efficient, especially for real-time indexing needs.

The transform_module is a great touch too — being able to filter or clean data before it hits the index makes the setup flexible for different use cases.

Congrats on the Elasticsearch sink — definitely looks useful. Looking forward to seeing how the project grows!

1 Like