While creating new index is there any way to set the source from an input stream instead of currently supported types like string/byte array/ByteReference and so on. I tried building IndexRequest as follows but ended up getting IOException while reading from stream. The stream was created from a valid JSON file. Since the application is likely to index documents concurrently we are seeing OOM errors in ES. Just to give you a context, the following piece of code is running as a plugin with ElasticSearch server. We don't want to read a lot of documents as a byte array into memory. Especially large ones. However the documents to index is a mixed bad (large(several MB large) + small(KB)). Is InputStreamStreamInput is the right API for this purpose? It doesn't seem so, are there any options without reading the entire source documents into memory?
BulkRequest request = new BulkRequest();
// repeat this for batch of documents
IndexRequest request = new IndexRequest(indexName, "some_type", id);
indexReq.readFrom(getStreamInput(doc)) // IOException is thrown here. If I replace the line with indexReq.source(JSONHelper.toBytes(doc) seems to work fine
buildRequest.add(request);
// end
m_bulkAction.execute(bulkRequest, listener); // instance of TransportBulkAction
private StreamInput getStreamInput(AttachmentDocument doc) {
//AttachmentDoc POJO will get converted to file in JSON format using Jackson Parser
File f = // details omitted.
FileInputStream fis = new FileInputStream(f);
inputStream = new InputStreamStreamInput(new FileInputStream(f));
return inputStream;
// error handling omitted..
}
java.io.IOException: Can't read unknown type [34]
at org.elasticsearch.common.io.stream.StreamInput.readGenericValue(StreamInput.java:434)
at org.elasticsearch.common.io.stream.StreamInput.readMap(StreamInput.java:358)
at org.elasticsearch.transport.TransportMessage.readFrom(TransportMessage.java:68)
at org.elasticsearch.action.ActionRequest.readFrom(ActionRequest.java:69)
at org.elasticsearch.action.support.replication.ShardReplicationOperationRequest.readFrom(ShardReplicationOperationRequest.java:205)
at org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:643)
at com.cw.es.plugin.action.CustomIndexTransportAction.createIndexRequest(CustomIndexTransportAction.java:169)
at com.cw.es.plugin.action.CustomIndexAttachmentTransportAction.lambda$null$237(CustomIndexAttachmentTransportAction.java:112)
at com.cw.es.plugin.action.CustomIndexAttachmentTransportAction$$Lambda$5/1404010203.apply(Unknown Source)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at com.cw.es.plugin.action.CustomIndexAttachmentTransportAction.lambda$null$238(CustomIndexAttachmentTransportAction.java:118)
at com.cw.es.plugin.action.CustomIndexAttachmentTransportAction$$Lambda$3/1505198309.apply(Unknown Source)
at com.cw.common.util.StreamHelper.lambda$processParallel$304(StreamHelper.java:59)
at com.cw.common.util.StreamHelper$$Lambda$4/1383835929.call(Unknown Source)
at java.util.concurrent.ForkJoinTask$AdaptedCallable.exec(ForkJoinTask.java:1424)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1689)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)