How to set source from input stream during creation of new indexes


(pmukh) #1

While creating new index is there any way to set the source from an input stream instead of currently supported types like string/byte array/ByteReference and so on. I tried building IndexRequest as follows but ended up getting IOException while reading from stream. The stream was created from a valid JSON file. Since the application is likely to index documents concurrently we are seeing OOM errors in ES. Just to give you a context, the following piece of code is running as a plugin with ElasticSearch server. We don't want to read a lot of documents as a byte array into memory. Especially large ones. However the documents to index is a mixed bad (large(several MB large) + small(KB)). Is InputStreamStreamInput is the right API for this purpose? It doesn't seem so, are there any options without reading the entire source documents into memory?

BulkRequest request = new BulkRequest();
// repeat this for batch of documents
IndexRequest request = new IndexRequest(indexName, "some_type", id);
indexReq.readFrom(getStreamInput(doc)) // IOException is thrown here. If I replace the line with indexReq.source(JSONHelper.toBytes(doc) seems to work fine
buildRequest.add(request);
// end
m_bulkAction.execute(bulkRequest, listener); // instance of TransportBulkAction

private StreamInput getStreamInput(AttachmentDocument doc) {
//AttachmentDoc POJO will get converted to file in JSON format using Jackson Parser
File f = // details omitted.
FileInputStream fis = new FileInputStream(f);
inputStream = new InputStreamStreamInput(new FileInputStream(f));
return inputStream;
// error handling omitted..
}
java.io.IOException: Can't read unknown type [34]
at org.elasticsearch.common.io.stream.StreamInput.readGenericValue(StreamInput.java:434)
at org.elasticsearch.common.io.stream.StreamInput.readMap(StreamInput.java:358)
at org.elasticsearch.transport.TransportMessage.readFrom(TransportMessage.java:68)
at org.elasticsearch.action.ActionRequest.readFrom(ActionRequest.java:69)
at org.elasticsearch.action.support.replication.ShardReplicationOperationRequest.readFrom(ShardReplicationOperationRequest.java:205)
at org.elasticsearch.action.index.IndexRequest.readFrom(IndexRequest.java:643)
at com.cw.es.plugin.action.CustomIndexTransportAction.createIndexRequest(CustomIndexTransportAction.java:169)
at com.cw.es.plugin.action.CustomIndexAttachmentTransportAction.lambda$null$237(CustomIndexAttachmentTransportAction.java:112)
at com.cw.es.plugin.action.CustomIndexAttachmentTransportAction$$Lambda$5/1404010203.apply(Unknown Source)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:401)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at com.cw.es.plugin.action.CustomIndexAttachmentTransportAction.lambda$null$238(CustomIndexAttachmentTransportAction.java:118)
at com.cw.es.plugin.action.CustomIndexAttachmentTransportAction$$Lambda$3/1505198309.apply(Unknown Source)
at com.cw.common.util.StreamHelper.lambda$processParallel$304(StreamHelper.java:59)
at com.cw.common.util.StreamHelper$$Lambda$4/1383835929.call(Unknown Source)
at java.util.concurrent.ForkJoinTask$AdaptedCallable.exec(ForkJoinTask.java:1424)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1689)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)


(David Pilato) #2

readFrom and writeTo are internal methods to serialize objects between 2 nodes. Don't use them.

I'm afraid you have to load your JSON document in memory before sending it to elasticsearch. I don't recall from the top of my head another way to do it but may be I'm wrong...


(pmukh) #3

Much thanks for your quick response.
Could BulkProcessor be an option? I believe in BulkProcessor caller has the ability to set the size for the number of requests, say 1G. But what happens if a single request exceeds that limit? What is the expected behavior? Is there a way to chunk such large document into some configurable bytes and index them as a single document? Sorry too many questions here, but I am stuck on this issue for a few days without a viable solution.


(David Pilato) #4

What? You have a single doc which is > 1gb?

I think that the bulk processor will execute that correctly.


(pmukh) #5

It would be very rare to have a single large file of that size. Our application ingests document (emails/attachments/files) from enterprise's active directory and we might encounter them.
Our most common use case would be to index a collection of files in batches and if could restrict the size in GB/MB. Thats why it would be worth exploring BulkProcessor.


(David Pilato) #6

Be careful. You need to have enough memory on the client side and on each node to manage this huge document.

The BulkProcessor won't split your index request in multiparts. It will execute the bulk when the bulk size is above the limit you set.

If you set a limit to 100mb for example, it will send the bulk over the wire only when you reach 100mb of requests. If you have docs with 1mb size, it means that you will execute the bulk after 100 docs.
If you add to the same bulk a doc with 1gb size, this one will be sent immediately over the wire but your node needs to be able to manage it.

And you probably need to increase the default http.max_content_length which is by default 100mb. See https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-http.html. But to be honest, I would prefer breaking the source in smaller documents on the client side instead of sending super big documents.

My 2 cents.


(pmukh) #7

David, much thanks for your very useful comments. We are not planning to send such large doc over the wire. in fact this is how it works. Crawler on the compute node uploads the attachments and loose files(all sizes) on S3/AWS and send a URL to ES. There is a custom plugin created by us that runs on every node of ES. The plugin downloads file from S3, reads it in the memory, converts to byte array and executes a bulk request call to create index.
It is in the custom plugin we want to make sure that we don't read too much in the memory so as to not starve ElasticSearch node of memory. I am thinking BulkProcessor will be useful here, since we can limit the size of request in terms of MB/GB and read only finite number of bytes from the downloaded files.

For single large file, you have suggested a very interesting solution. Is it possible that by breaking the source into smaller document, can it still be indexed as a single document or as separate document. Does it impact our search queries if we index a large document as separate document? Or is there a way to manage association between different parts?


(system) #8