How to use 'Esbolt' in Storm Trident


#1

I used EsBolt at storm topology like this:
builder.setBolt(topic + "WriteEsbolt", new EsBolt("{index}/{type}")

but I cannot use that at storm Trident..
Trident use not bolt but operator..
How can I insert tuple data in Elastic search?


(eliasah) #2

Can you try to give more details about you are doing? It's not very clear.


#3

I implemented storm topology like this(using spout, bolt)

builder.setSpout("logSpout", createKafkaSpout(topicNm), kafkaSpoutNum);
builder.setBolt("PreProcessing", new LogPreprocessing(), 3).shuffleGrouping("logSpout");
builder.setBolt("WriteEsbolt", new EsBolt("{indexname}/{typename}"), writeEsBoltNum).shuffleGrouping("PreProcessing", "esTuple")					.addConfiguration(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickFreqSecs);

But, I changed this topology using Trident.... I cannot use EsBolt()

  TridentTopology topology = new TridentTopology();
  Stream spoutStreamAccess = topology.newStream("str", createKafkaSpout(topicNm));
  Stream parsedStream = spoutStreamAccess.each(new Fields("str"), new LogPreprocessing(), new Fields("fileInfo","fileDtm", "fileCnts", "fileName", "esTuple"));
  parsedStream.each(new Fields("esTuple"), new EsBolt("{indexname}/{typename}"))

(system) #4