Ingest data with GCP Dataflow

Hello.

I'm trying to ingest data to ES with the use of GCP Dataflow and the template PubSub to Elasticsearch. So far I've tried a couple of different deployments which are all on GCP using the trial option over at elastic.co.

The data I'm trying to ingest are metrics from our devices in a simple JSON format. The dataflow is configured by using the Cloud ID for my deployment and a custom UDF to format the data from PubSub.

It all seems to go well, some metrics are ingested but then there is an error. view below:

There is always the same error. Once this error pops up no more data is ingested to ES.

Does anyone here have experience using GCP Dataflow to ingest data?

Update

I realize the image is not the easiest to read. Here is a detailed error log from GCP:

Error message from worker: java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)
java.io.IOException: Error writing to Elasticsearch, some elements could not be inserted:
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO.checkForErrors(ElasticsearchIO.java:231)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.flushBatch(ElasticsearchIO.java:1483)
        com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIO$Write$WriteFn.finishBundle(ElasticsearchIO.java:1449)

This Dataflow is using default configurations so I'd really expect this to work out of the box.

Welcome to our community! :smiley:

Please don't post pictures of text, logs or code. They are difficult to read, impossible to search and replicate (if it's code), and some people may not be even able to see them :slight_smile:

Thank you! I've posted a detailed error.

There's not a lot there is there :frowning:
Is there anything in the Elasticsearch logs at that time?

Can you show a sample document that has been indexed and one that failed to index? It is possible that there may be a mapping conflict but without seeing JSON examples it is hard to tell. It would probably also help if you could provide the mappings for the index you are indexing into when you get the error.

The only logs in elastic I can find is under Observability - Logs. And they show my data that I ingest. e.g.:

Showing entries from Jan 11, 14:29:03
14:29:03.000
gcp.pubsub
{"points":{"udp_active":true,"udp_high":6,"udp_medium":6,"udp_low":6},"labels":{"device_id":"5000009d6bbf9cee","place_id":7800},"timestamp":"2022-01-11T13:29:03Z"}

This is the log I ingested to elastic, are there any other logs from elastic?

Found them!

Here is one of the documents, there are around 5 million similar ones:

{
  "_index": ".ds-logs-gcp.pubsub-metrics-2022.01.11-000001",
  "_type": "_doc",
  "_id": "eGtVSX4BVV1vns0ot6nA",
  "_version": 1,
  "_score": 1,
  "_source": {
    "points": {
      "udp_active": true,
      "udp_high": 6,
      "udp_medium": 6,
      "udp_low": 6
    },
    "labels": {
      "device_id": "5000009d6bbf9cee",
      "place_id": 7800
    },
    "@timestamp": "2022-01-11T13:29:03Z",
    "agent": {
      "type": "dataflow",
      "name": "",
      "version": "1.0.0",
      "id": ""
    },
    "data_stream": {
      "type": "logs",
      "dataset": "gcp.pubsub",
      "namespace": "metrics"
    },
    "ecs": {
      "version": "1.10.0"
    },
    "message": "{\"points\":{\"udp_active\":true,\"udp_high\":6,\"udp_medium\":6,\"udp_low\":6},\"labels\":{\"device_id\":\"5000009d6bbf9cee\",\"place_id\":7800},\"timestamp\":\"2022-01-11T13:29:03Z\"}",
    "service": {
      "type": "gcp.pubsub"
    },
    "event": {
      "module": "gcp",
      "dataset": "gcp.pubsub"
    }
  },
  "fields": {
    "points.udp_medium": [
      6
    ],
    "points.udp_active": [
      true
    ],
    "points.udp_high": [
      6
    ],
    "points.udp_low": [
      6
    ],
    "data_stream.namespace": [
      "metrics"
    ],
    "labels.place_id": [
      7800
    ],
    "message": [
      "{\"points\":{\"udp_active\":true,\"udp_high\":6,\"udp_medium\":6,\"udp_low\":6},\"labels\":{\"device_id\":\"5000009d6bbf9cee\",\"place_id\":7800},\"timestamp\":\"2022-01-11T13:29:03Z\"}"
    ],
    "data_stream.type": [
      "logs"
    ],
    "service.type": [
      "gcp.pubsub"
    ],
    "agent.type": [
      "dataflow"
    ],
    "labels.device_id": [
      "5000009d6bbf9cee"
    ],
    "@timestamp": [
      "2022-01-11T13:29:03.000Z"
    ],
    "event.module": [
      "gcp"
    ],
    "agent.id": [
      ""
    ],
    "ecs.version": [
      "1.10.0"
    ],
    "data_stream.dataset": [
      "gcp.pubsub"
    ],
    "agent.name": [
      ""
    ],
    "agent.version": [
      "1.0.0"
    ],
    "event.dataset": [
      "gcp.pubsub"
    ]
  }
}

The one below it has the same index but different ids.

Edit: Actually, all 5 million documents have the same timestamp.

I was able to fix that by reducing the lifetime of the documents.
It looks like it tries to ingest the same documents again and again until the message gets dismissed.

Reducing the lifetime of the documents leads to getting data, but there is still a lag. We are in contact with google to get that fixed.

Hello, I'm trying to follow this thread to debug my own issue with a GCP dataflow job. I have the same error message from Dataflow as shown above, but I can't find the actual logs in Observability - Logs. How can I find the log messages showing my data being ingested?

I've also tried streaming my logs and running a successful dataflow job that inserts data, but I don't see any log statements show up as a result of the job.

1 Like

Welcome to our community! :smiley:

It's better if you can please start your own topic on this, to save confusion :slight_smile:

Hi gustafkisi. How did you locate the logs that show your ingested documents?

Thanks warkolm. I've created a separate thread (link below).

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.