Azure event hub input

I have an azure event hub i want to use the azure event input plugin to grab the data....I’ve gotten Logstash to connect in to the hub.. but im getting a strange error when the conf is running.... hoping someone has an idea about this.

Apr 27 16:04:17 ip-10-0-6-44 logstash[269082]: [2021-04-27T16:04:17,779][INFO ][logstash.inputs.azureeventhubs][my-app][xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx] Event Hub registration complete.  {:event_hub_name=>"my-app"}
Apr 27 16:04:17 ip-10-0-6-44 logstash[269082]: [2021-04-27T16:04:17,785][ERROR][logstash.inputs.azureeventhubs][my-app][xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx] Event Hub failure while registering. {:event_hub_name=>"my-app", :exception=>java.util.concurrent.CompletionException: com.microsoft.azure.eventhubs.IllegalEntityException: Failure getting partition ids for event hub, :backtrace=>["com.microsoft.azure.eventprocessorhost.PartitionManager.lambda$cachePartitionIds$4(PartitionManager.java:80)", "java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)", "java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)", "java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)", "java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)", "java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)", "java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)", "java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)", "java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)", "java.base/java.lang.Thread.run(Thread.java:834)"]}
Apr 27 16:04:17 ip-10-0-6-44 logstash[269082]: [2021-04-27T16:04:17,792][ERROR][logstash.inputs.azureeventhubs][my-app][xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx] Event Hub encountered an error. {:event_hub_name=>"my-app", :exception=>java.util.concurrent.CompletionException: com.microsoft.azure.eventhubs.IllegalEntityException: Failure getting partition ids for event hub, :backtrace=>["com.microsoft.azure.eventprocessorhost.PartitionManager.lambda$cachePartitionIds$4(PartitionManager.java:80)", "java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)", "java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)", "java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)", "java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)", "java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)", "java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)", "java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)", "java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)", "java.base/java.lang.Thread.run(Thread.java:834)"]}
Apr 27 16:04:17 ip-10-0-6-44 logstash[269082]: [2021-04-27T16:04:17,796][INFO ][logstash.inputs.azureeventhubs][my-app][xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx] Event Hub my-app is closed.

It is opening the event hub and attempting to retrieve but there’s an “Failure getting partition ids for event hub” message that I think is relating to the process not being able to navigate down from the root into the sub folders.. and when I check it there are folders there.. but they are empty… these folders 1 -4.. would this be considered a way of partioning the data maybe ??
After it errors it closes the connection into the event hub… so part of this is correct… but im not so sure about the rest of it or even how to tell it to start at root and works its way down.

my conf is very simple to start with, i've opted not to use filters at the stage until i see the sort of data coming into the index.

input {
   azure_event_hubs {
      event_hub_connections => ["Endpoint=sb://my-app-hub.servicebus.windows.net/;SharedAccessKeyName=logstash;SharedAccessKey=accesskeyredacted=;EntityPath=my-app"]
      threads => 8
      decorate_events => true
      consumer_group => "$Default"
      storage_connection => "DefaultEndpointsProtocol=https;AccountName=eventhublogstash;AccountKey= accountkeyredacted ==;EndpointSuffix=core.windows.net"
   }
}
filter {
}
output {
  elasticsearch {
      hosts => ["https://localhost:9243"]
      user => "redacted"
      password => "redacted"
      action => "create"
      index => "my-app"
      id => "my-app"
    }
}

so im going by the guide here [Azure Event Hubs plugin | Logstash Reference [7.12] | Elastic but apart from advanced mode for multiple event hubs... there isnt much thats descibing how you can direct the connection on how its to process the hub topology.

any ideas ??

1 Like

So im still getting this error, and havn't yet found a solution to this. i've rebuilt the Event hub environment, i have data in the event hub blob... but still getting this Failure getting partition ids for event hub error.

There is only mention of permissions based on the sas policy setting to manage... which is set... and to be sure i unset it, saved and then reset it then saved... and still get the error. its like it connects... but doesn't know where to go..... is there anything in the input plugin that allows me to specify where the plugin is to start in order to navigate to the partitions ?? or at least see and watch where the plugin actually fails at during the connection process??

Event Hub failure while registering. {:event_hub_name=>"evnth ublogstashtest", :exception=>java.util.concurrent.CompletionException: com.microsoft.azure.eventhubs.IllegalEntityException: Failure getting partition ids for event hub, :backtrace=>["com.microsoft.azure.eventprocessorhost.PartitionManager.lambda$cache PartitionIds$4(PartitionManager.java:80)", "java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java: 930)", "java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907)", "java.base/java.util. concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478)", "java.base/java.util.concurrent.Executors$RunnableAd apter.call(Executors.java:515)", "java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)", "java.base/java.util.co ncurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)", "java.base/java.util.conc urrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)", "java.base/java.util.concurrent.ThreadPoolExecutor$Worker. run(ThreadPoolExecutor.java:628)", "java.base/java.lang.Thread.run(Thread.java:834)"]}

My config currently

     input {
       azure_event_hubs {
         config_mode => "advanced"
         threads => 8
         decorate_events => true
         storage_connection => "DefaultEndpointsProtocol=redacted"
         event_hubs => [
            {"evnthublogstashtest" => {
             event_hub_connection => "Endpoint=sb://redacted"
             initial_position => "beginning"
             consumer_group => "$Default"
            }}
         ]
       }
    }
    filter {
    }
    output {
      elasticsearch {
          hosts => ["localhost:9243"]
          user => "redacted"
          password => "redacted"
          action => "create"
          index => "my-app"
          id => "my-app"
        }
    }
1 Like

so i figured out what the issue is here.

my logstash sits in my AWS environment and is heavily locked down.
whilst the connection strings etc as all https what the logstash documentation DOES NOT TELL YOU is that the plugin actually requires tcp/5671 and tcp5672 outbound in order to make the plugin work.

its a two stage operation;
1st stage uses https to authenticate and get access using connection strings and also sharedaccesskey
2nd stage uses a java component inside the plugin using the AMQP protocol to traverse and navigate the the partitions and pull the data.

Solution:
Your logstash system must have outbound tcp/443 and tcp/5671+5672 allowed in order for this plugin to work.

2 Likes

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.