Multiple Elastic Search Outputs for two different Azure event hubs as input in Logstash

I have to configure log stash to read from 2 different event hubs and write them to two different Elasticsearch index as shown below:

input {
   azure_event_hubs {
     config_mode => "advanced"
     threads => 8
     decorate_events => true
     initial_position => "beginning"
     storage_connection => ""
    storage_container => "event-hub-logs"
     event_hubs => [
        {"Firsteventhub" => {
         event_hub_connection => "***"
         consumer_group => "logstash"
         }
        },
        {"Secondeventhub" => {
         event_hub_connection => "***"
        consumer_group => "logstash2"
            }
        }
     ]
 }
}
filter {
   json {
      source => "message"
   }
}

output {
   elasticsearch {
        hosts => ["host"]
        ssl => false
        ssl_certificate_verification => false
        user => "**"
        password => "***"
        index => "test index1"
        ilm_enabled => false
   }
elasticsearch {
        hosts => ["host"]
        ssl => false
        ssl_certificate_verification => false
        user => "**"
        password => "***"
        index => "test index2"
        ilm_enabled => false
   }
}

  1. How do I use filters to send the firsteventhub logs into testindex1 and Secondeventhub logs to testindex2 ?

  2. Also is one storage container enough or should i use different storage containers for different eventhubs?
    Could you please help me out here as i am stuck with this?. Thanks in advance!

There is an open issue requesting that add_field be a per-event_hub option, but it has not been implemented.

Instead of using a single plugin in advanced mode, you could use two plugins in basic mode, and have them each add a unique tag to the events, then use if "uniqueTag1" in [tags] { to route them to different outputs. No filters needed for this functionality.

Since you are using decorete_events => true, you have the name of the event hub in the @metadata field.

The name will be in the field [@metadta][azure_event_hubs][name] so you could use it to filter in your output.

output {
    if [@metadta][azure_event_hubs][name] == "Firsteventhub" {
        elasticsearch { output for index 1 }
    } else if [@metadta][azure_event_hubs][name] == "Secondeventhub" {
        elasticsearch { output index 2 }
    }
}

I do not see any issue about using the same storage container, but I prefer to use one per eventhub.

Getting the below error after making the change

[2022-10-20T18:07:43,745][WARN ][com.microsoft.azure.eventprocessorhost.PartitionPump][main][1400938c3650184d2dbddb078307173436a8f6aa021b8c4bf7dcc69bd618b03a] host logstash-553d9839-6f42-47b8-9096-452681992ff2: 1:Got exception from onEvents
org.logstash.MissingConverterException: Missing Converter handling for full class name=org.apache.qpid.proton.amqp.Binary, simple name=Binary
        at org.logstash.Valuefier.fallbackConvert(Valuefier.java:118) ~[logstash-core.jar:?]
        at org.logstash.Valuefier.convert(Valuefier.java:96) ~[logstash-core.jar:?]
        at org.logstash.ConvertedMap.newFromMap(ConvertedMap.java:74) ~[logstash-core.jar:?]
        at org.logstash.Valuefier.lambda$initConverters$15(Valuefier.java:178) ~[logstash-core.jar:?]
        at org.logstash.Valuefier.convert(Valuefier.java:94) ~[logstash-core.jar:?]
        at org.logstash.ext.JrubyEventExtLibrary$RubyEvent.safeValueifierConvert(JrubyEventExtLibrary.java:367) ~[logstash-core.jar:?]
        at org.logstash.ext.JrubyEventExtLibrary$RubyEvent.ruby_set_field(JrubyEventExtLibrary.java:126) ~[logstash-core.jar:?]
        at org.logstash.ext.JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.call(JrubyEventExtLibrary$RubyEvent$INVOKER$i$2$0$ruby_set_field.gen) ~[jruby.jar:?]
        at org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:835) ~[jruby.jar:?]
        at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:207) ~[jruby.jar:?]
        at home.devl.logstash_minus_8_dot_1_dot_0.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_azure_event_hubs_minus_1_dot_4_dot_3.lib.logstash.inputs.processor.RUBY$block$onEvents$2(/home/devl/logstash-8.1.0/vendor/bundle/jruby/2.5.0/gems/logstash-input-azure_event_hubs-1.4.3/lib/logstash/inputs/processor.rb:51) ~[?:?]
        at org.jruby.ir.targets.YieldSite.yield(YieldSite.java:112) ~[jruby.jar:?]
        at home.devl.logstash_minus_8_dot_1_dot_0.logstash_minus_core.lib.logstash.codecs.delegator.RUBY$block$decode$2(/home/devl/logstash-8.1.0/logstash-core/lib/logstash/codecs/delegator.rb:64) ~[?:?]
        at org.jruby.ir.targets.YieldSite.yield(YieldSite.java:112) ~[jruby.jar:?]
        at home.devl.logstash_minus_8_dot_1_dot_0.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_plain_minus_3_dot_1_dot_0.lib.logstash.codecs.plain.RUBY$method$decode$0(/home/devl/logstash-8.1.0/vendor/bundle/jruby/2.5.0/gems/logstash-codec-plain-3.1.0/lib/logstash/codecs/plain.rb:54) ~[?:?]
        at home.devl.logstash_minus_8_dot_1_dot_0.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_codec_minus_plain_minus_3_dot_1_dot_0.lib.logstash.codecs.plain.RUBY$method$decode$0$__VARARGS__(/home/devl/logstash-8.1.0/vendor/bundle/jruby/2.5.0/gems/logstash-codec-plain-3.1.0/lib/logstash/codecs/plain.rb:49) ~[?:?]
        at org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:80) ~[jruby.jar:?]
        at org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:70) ~[jruby.jar:?]
        at org.jruby.ir.targets.InvokeSite.invoke(InvokeSite.java:197) ~[jruby.jar:?]
        at home.devl.logstash_minus_8_dot_1_dot_0.logstash_minus_core.lib.logstash.codecs.delegator.RUBY$block$decode$1(/home/devl/logstash-8.1.0/logstash-core/lib/logstash/codecs/delegator.rb:62) ~[?:?]
        at org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:138) ~[jruby.jar:?]
        at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58) ~[jruby.jar:?]
        at org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:32) ~[jruby.jar:?]
        at org.jruby.runtime.Block.call(Block.java:147) ~[jruby.jar:?]
        at org.logstash.instrument.metrics.MetricExt.doTime(MetricExt.java:160) ~[logstash-core.jar

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.