400 malformed issue while inserting into Elastic using JAVA Apache Beam

Hi,

We are trying to inject data using our Apache Beam JAVA code to Elastic search Index. But whenever we try to inject the data we get a malformed error 400. This is the code I am trying to insert data into Elasticsearch. Can someone help?

PCollection jsonRecords = pipeline.apply(READ_PARQUET_FILES,
ParquetIO.read(helpJoinedRecordSchema)
.from(helpDailyFolder))
.setCoder(AvroCoder.of(GenericRecord.class, helpJoinedRecordSchema))
.apply(CONVERT_TO_JSON,
ParDo.of(new HelpDailyGenericRecordToJsonFn(indexName)));

    PCollection<String> json = jsonRecords
            .apply("Create Bulk Requests", ParDo.of(new DoFn<String, String>() {
                @ProcessElement
                public void processElement(@Element String json, OutputReceiver<String> out) {
                    String documentId = UUID.randomUUID().toString();
                    try {
                        String indexActionJson = "{ \"index\" : { \"_index\" : \"" + indexName + "\", \"_id\" : \"" + documentId + "\" } }";
                        String bulkRequestJson = indexActionJson + "\n" + json + "\n";
                        //log.info("bulk " + bulkRequestJson);
                        out.output(bulkRequestJson);
                    } catch (Exception e) {
                        log.error("Error creating Bulk API request JSON: ", e);
                    }
                }
            }));
    json.apply("Write to Elasticsearch", ElasticsearchIO.write()
            .withConnectionConfiguration(connectElasticSearchIndex(indexName))
            .withMaxBatchSize(BATCH_SIZE_BYTES)
            .withMaxBatchSizeBytes(5000));

Caused by: org.elasticsearch.client.ResponseException: method [POST], host [xxxxxx:9243], URI [/aws_live/_doc/_bulk], status line [HTTP/1.1 400 Bad Request]
{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"Malformed content, found extra data after parsing: START_OBJECT"}],"type":"illegal_argument_exception","reason":"Malformed content, found extra data after parsing: START_OBJECT"},"status":400}

i suspect some thing with data format. because the error Malformed content found extra data after parsing: START_OBJECT typically indicates an issue with the format of the JSON being sent to Elasticsearch’s bulk API ….The bulk API expects newline-delimited JSON (NDJSON) format, where each line contains a separate JSON object and actions (such as “index”) are specified on a separate line from the document.

Yes I thought that could be the issue. So I tried to print my data before doing the Elastic write operation. I could notice the JSON formatted with the index name and my actual data in another line with new line space as shown below. This is how i could see the values formed when i am trying to print the json logs. Also i tried using ElasticsearchIO.docToBulk() instead of ElasticsearchIO.write(). When i use docToBulk() method i dont get this error but i dont see the data getting written to my Elasticsearch index. So i am not sure what i am missing here?

{ "index" : { "_index" : "xxxx", "_id" : "9ee466d5-a72d-4fa1-bde6-96dad3e4d503" } }
{"id":null,"myfields":null}

{ "index" : { "_index" : "xxxx", "_id" : "9ee466d5-a72d-4fa1-bde6-96dad3e4d504" } }
{"id":null,"myfields":null}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.