Sending azure_event_hub messages to elasticsearch


(N) #1

I recently installed Elastic Stack for the first time using the steps at Digital Ocean. I then followed the steps at the Azure Event Hubs plugin page to link logstash to an Azure event hub. This seemed to go okay, but nothing is showing up in kibana or even elasticsearch as far as I can tell.

My config looks like this:

input {
  azure_event_hubs {
    config_mode => "advanced"
    event_hubs => [
      { "myentitypath" => {
        event_hub_connection => "Endpoint=sb://myendpoint.servicebus.windows.net/;SharedAccessKeyName=logstash;SharedAccessKey=alongrandomkey;EntityPath=myentitypath"
      }}
    ]
    threads => 8
    decorate_events => true
    consumer_group => "$Default"
    storage_connection => "DefaultEndpointsProtocol=https;AccountName=mystorageacct;AccountKey=alongrandomkey;EndpointSuffix=core.windows.net"
   }
}

I feel like I probably need to specify some output details? The only output config that I have is what I set up for filebeats:

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    manage_template => false
    index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
  }
}

Is there an output format that I need to specify for Azure Event Hub? Do I need to somehow create the indexes in ES?

Any help is much appreciated. As you can tell I don't really understand how all this works. :slight_smile:


(Nachiket) #2

Hi Nick,

Could you verify if you are receiving the information on STDOUT?

Try using the following config:

input {
  .... Your Input Settings .....
}
output {
  stdout { }
}

This will validate if the input configuration is working, then we can look for error on the output. A typical elasticsearch output looks as follows:

output {
    elasticsearch {
        action => "index"
        hosts => ["http://x.x.x.x:9200" ]
        index => "indice"
    }
}

I believe you are missing the action part in the elasticsearch config.


(N) #3

Thanks for the reply and very helpful stdout config! Using that I am able to see that the azure event hub data is definitely making it to logstash:

"@timestamp" => 2019-02-08T14:04:32.149Z,
   "message" => "{\"records\": [{ \"LogicalServerName\": \"myservername\", \"SubscriptionId\": \"mysubuuid\", \"ResourceGroup\": \"myRG\", \"time\": \"2019-02-08T14:00:21.7540000Z\"...

So next I tried adding your output block verbatim and nothing happens. I assume that's because "indice" needs to be an actual indice and not just the word, but how do I know which indice? And how do I create the indices that azure_event_hub expects? Filebeats has a handy tool that does it, but I don't see anything similar for this plugin.


(N) #4

Does anyone have any solutions to this? It seems odd that the documentation tells me how to ingest the data, but not how to display it. Are there any other docs someone can point me to that might help?


(N) #5

For the benefit of any future viewers, yes, just naming an index will cause logstash to create and use that index. Here's the file I ended up with:

input {
  azure_event_hubs {
    config_mode => "advanced"
    event_hubs => [
      { "eventhubentityname" => {
        event_hub_connection => "Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=logstash;SharedAccessKey=myaccesskey;EntityPath=eventhubentityname"
      }}
    ]
    threads => 8
    decorate_events => true
    consumer_group => "logstash"
    storage_connection => "DefaultEndpointsProtocol=https;AccountName=mystorageacct;AccountKey=mystorageacctkey;EndpointSuffix=core.windows.net"
    type => "azure_event_hub"
   }
}

# since I have multiple inputs going to ES, I use the type parameter to make sure only the event hub messages go to this index
output {
    if [type] == "azure_event_hub" {
        #stdout { }
        elasticsearch {
            action => "index"
            hosts => ["http://localhost:9200" ]
            index => "azure_event_hub"
        }
    }
}

Unfortunately this dumps all the useful info to a single 'message' field, so I still need to figure out how to make that doing something practical.


#6

Start with

filter { json { source => "message" } }

If that does not produce what you want then show us the output from

output { stdout { codec => rubydebug } }

and explain what you do not like about it.


(N) #7

Thanks for the info badger, the json is nested and is a set of events under message. Using your filter above I get a list of records with several very long json strings that could be called event[0], event[1], etc. It looks like basically like this:

{"records": [
{ "LogicalServerName": "myservername", "SubscriptionId": "mysubid-uuid", "ResourceGroup": "Prod", "time": "2019-02-15T17:44:23.3870000Z", "resourceId": "/SUBSCRIPTIONS/mysubid-uuid/RESOURCEGROUPS/PROD/PROVIDERS/MICROSOFT.SQL/SERVERS/myservername/DATABASES/MASTER", "category": "SQLSecurityAuditEvents", "operationName": "AuditEvent"},
{ "LogicalServerName": "myservername", "SubscriptionId": "mysubid-uuid", "ResourceGroup": "Prod", "time": "2019-02-15T17:44:23.3870000Z", "resourceId": "/SUBSCRIPTIONS/mysubid-uuid/RESOURCEGROUPS/PROD/PROVIDERS/MICROSOFT.SQL/SERVERS/myservername/DATABASES/MASTER", "category": "SQLSecurityAuditEvents", "operationName": "AuditEvent"}
]}

That led me to the split function in logstash and I ended up with this, which does basically what I want:

filter {
    if [type] == "azure_event_hub" {
        json {
            source => "message"
        }
        split {
            field => ["records"]
        }
    }
}

Thanks everyone who helped!


(system) closed #8

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.