How can I use ES Java API in Storm bolt?


#1

I have some questions about Elasticsearch and Apache Strom.

First, I insert some data to ElasticSearch in Strom bolt typle by tuple.
After that, I want to read the count of that Data I inserted by ElasticSearch query( like "_count?q=body:error...")

but,
when I use JAVA API - CountResponse or MultiSearchResponse in Storm bolt,

  1. it execute sometimes sucessful or fail.
  2. Elastic Search cannot read the data just after I insert.

when it was fail, (I implemented Java api in ZEsCountBolt.java)

  1. error occurs : node = nodeBuilder().node();
    console
    org.elasticsearch.ElasticsearchTimeoutException: Interrupted while waiting for initial discovery state
    at org.elasticsearch.discovery.DiscoveryService.waitForInitialState(DiscoveryService.java:95)
    at org.elasticsearch.node.internal.InternalNode.start(InternalNode.java:265)
    at org.elasticsearch.node.NodeBuilder.node(NodeBuilder.java:166)
    at storm.starter.bolt.ZEsCountBolt.prepare(ZEsCountBolt.java:51)
    at backtype.storm.topology.BasicBoltExecutor.prepare(BasicBoltExecutor.java:43)
    at backtype.storm.daemon.executor$fn__3439$fn__3451.invoke(executor.clj:692)
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Unknown Source)

  2. error occurs : .execute().actionGet();
    console
    org.elasticsearch.ElasticsearchIllegalStateException: Future got interrupted
    at org.elasticsearch.action.support.AdapterActionFuture.actionGet(AdapterActionFuture.java:47)
    at storm.starter.bolt.ZEsCountBolt.prepare(ZEsCountBolt.java:109)
    at backtype.storm.topology.BasicBoltExecutor.prepare(BasicBoltExecutor.java:43)
    at backtype.storm.daemon.executor$fn__3439$fn__3451.invoke(executor.clj:692)
    at backtype.storm.util$async_loop$fn__460.invoke(util.clj:461)
    at clojure.lang.AFn.run(AFn.java:24)
    at java.lang.Thread.run(Unknown Source)
    Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(Unknown Source)
    at org.elasticsearch.common.util.concurrent.BaseFuture$Sync.get(BaseFuture.java:278)
    at org.elasticsearch.common.util.concurrent.BaseFuture.get(BaseFuture.java:117)
    at org.elasticsearch.action.support.AdapterActionFuture.actionGet(AdapterActionFuture.java:45)
    ... 6 more


(Costin Leau) #2

This forum is focused on the elasticsearch-hadoop connector, which does feature integration with Storm. However it does that by using the REST (not Java) connectivity for multiple reasons explained in its reference documentation.
It looks like you are using your own bolts in which case, there's no much help that this forum can help rather the Storm list. Based on the exception, it might be because the Java client is async and Storm is likely to interrupt the bolt and put it to sleep if the call is non-blocking as it assumes there isn't any work to be done.
Spend some time looking at the life-cycle of a Bolt and how it reacts to threading and take it from there.
It can be tricky and sometimes quite frustrating as the threading nature can lead to unpredictable, out-of-order execution based on how the threads start/stop.


(system) #3