Hi,
We are trying to inject data using our Apache Beam JAVA code to Elastic search Index. But whenever we try to inject the data we get a malformed error 400. This is the code I am trying to insert data into Elasticsearch. Can someone help?
PCollection jsonRecords = pipeline.apply(READ_PARQUET_FILES,
ParquetIO.read(helpJoinedRecordSchema)
.from(helpDailyFolder))
.setCoder(AvroCoder.of(GenericRecord.class, helpJoinedRecordSchema))
.apply(CONVERT_TO_JSON,
ParDo.of(new HelpDailyGenericRecordToJsonFn(indexName)));
PCollection<String> json = jsonRecords
.apply("Create Bulk Requests", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(@Element String json, OutputReceiver<String> out) {
String documentId = UUID.randomUUID().toString();
try {
String indexActionJson = "{ \"index\" : { \"_index\" : \"" + indexName + "\", \"_id\" : \"" + documentId + "\" } }";
String bulkRequestJson = indexActionJson + "\n" + json + "\n";
//log.info("bulk " + bulkRequestJson);
out.output(bulkRequestJson);
} catch (Exception e) {
log.error("Error creating Bulk API request JSON: ", e);
}
}
}));
json.apply("Write to Elasticsearch", ElasticsearchIO.write()
.withConnectionConfiguration(connectElasticSearchIndex(indexName))
.withMaxBatchSize(BATCH_SIZE_BYTES)
.withMaxBatchSizeBytes(5000));
Caused by: org.elasticsearch.client.ResponseException: method [POST], host [xxxxxx:9243], URI [/aws_live/_doc/_bulk], status line [HTTP/1.1 400 Bad Request]
{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"Malformed content, found extra data after parsing: START_OBJECT"}],"type":"illegal_argument_exception","reason":"Malformed content, found extra data after parsing: START_OBJECT"},"status":400}