Ingest data with GCP Dataflow

Hello.

I'm trying to ingest data to ES with the use of GCP Dataflow and the template PubSub to Elasticsearch. So far I've tried a couple of different deployments which are all on GCP using the trial option over at elastic.co.

The data I'm trying to ingest are metrics from our devices in a simple JSON format. The dataflow is configured by using the Cloud ID for my deployment and a custom UDF to format the data from PubSub.

It all seems to go well, some metrics are ingested but then there is an error. view below:

There is always the same error. Once this error pops up no more data is ingested to ES.

Does anyone here have experience using GCP Dataflow to ingest data?

Update

I realize the image is not the easiest to read. Here is a detailed error log from GCP:

Error message from worker: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)

This Dataflow is using default configurations so I'd really expect this to work out of the box.

Welcome to our community! :smiley:

Please don't post pictures of text, logs or code. They are difficult to read, impossible to search and replicate (if it's code), and some people may not be even able to see them :slight_smile:

Thank you! I've posted a detailed error.

There's not a lot there is there :frowning:
Is there anything in the Elasticsearch logs at that time?

Can you show a sample document that has been indexed and one that failed to index? It is possible that there may be a mapping conflict but without seeing JSON examples it is hard to tell. It would probably also help if you could provide the mappings for the index you are indexing into when you get the error.

The only logs in elastic I can find is under Observability - Logs. And they show my data that I ingest. e.g.:

Showing entries from Jan 11, 14:29:03
14:29:03.000
gcp.pubsub
{"points":{"udp_active":true,"udp_high":6,"udp_medium":6,"udp_low":6},"labels":{"device_id":"5000009d6bbf9cee","place_id":7800},"timestamp":"2022-01-11T13:29:03Z"}

This is the log I ingested to elastic, are there any other logs from elastic?

Found them!

Here is one of the documents, there are around 5 million similar ones:

{
  "_index": ".ds-logs-gcp.pubsub-metrics-2022.01.11-000001",
  "_type": "_doc",
  "_id": "eGtVSX4BVV1vns0ot6nA",
  "_version": 1,
  "_score": 1,
  "_source": {
    "points": {
      "udp_active": true,
      "udp_high": 6,
      "udp_medium": 6,
      "udp_low": 6
    },
    "labels": {
      "device_id": "5000009d6bbf9cee",
      "place_id": 7800
    },
    "@timestamp": "2022-01-11T13:29:03Z",
    "agent": {
      "type": "dataflow",
      "name": "",
      "version": "1.0.0",
      "id": ""
    },
    "data_stream": {
      "type": "logs",
      "dataset": "gcp.pubsub",
      "namespace": "metrics"
    },
    "ecs": {
      "version": "1.10.0"
    },
    "message": "{\"points\":{\"udp_active\":true,\"udp_high\":6,\"udp_medium\":6,\"udp_low\":6},\"labels\":{\"device_id\":\"5000009d6bbf9cee\",\"place_id\":7800},\"timestamp\":\"2022-01-11T13:29:03Z\"}",
    "service": {
      "type": "gcp.pubsub"
    },
    "event": {
      "module": "gcp",
      "dataset": "gcp.pubsub"
    }
  },
  "fields": {
    "points.udp_medium": [
      6
    ],
    "points.udp_active": [
      true
    ],
    "points.udp_high": [
      6
    ],
    "points.udp_low": [
      6
    ],
    "data_stream.namespace": [
      "metrics"
    ],
    "labels.place_id": [
      7800
    ],
    "message": [
      "{\"points\":{\"udp_active\":true,\"udp_high\":6,\"udp_medium\":6,\"udp_low\":6},\"labels\":{\"device_id\":\"5000009d6bbf9cee\",\"place_id\":7800},\"timestamp\":\"2022-01-11T13:29:03Z\"}"
    ],
    "data_stream.type": [
      "logs"
    ],
    "service.type": [
      "gcp.pubsub"
    ],
    "agent.type": [
      "dataflow"
    ],
    "labels.device_id": [
      "5000009d6bbf9cee"
    ],
    "@timestamp": [
      "2022-01-11T13:29:03.000Z"
    ],
    "event.module": [
      "gcp"
    ],
    "agent.id": [
      ""
    ],
    "ecs.version": [
      "1.10.0"
    ],
    "data_stream.dataset": [
      "gcp.pubsub"
    ],
    "agent.name": [
      ""
    ],
    "agent.version": [
      "1.0.0"
    ],
    "event.dataset": [
      "gcp.pubsub"
    ]
  }
}

The one below it has the same index but different ids.

Edit: Actually, all 5 million documents have the same timestamp.