Problems with WIP Event Hubs output plugin for logstash

I am currently developing an Azure Event Hubs output plugin for Logstash and I have ran into a problem where after running for around 30 minutes logstash will stop accepting flushes from telegraf for a period of time (3-10 minutes) and it looks like it is hung up processing and sending already received events.

My technology stack:

Telegraf sending to logstash over tcp.

Logstash Configured like so:
    input {
      tcp {
        port => 5046
        codec => json
      }
    }
    filter {
      mutate {
        # Tags is a keyword in logstash, causes mutate to autoformat tags values when parsing. This prevents that.
        rename => {"tags" => "tag"}
        # Removes logstash auto generated fields
        # remove_field => ["@timestamp","host","port","@version"]
      }


    }
    output {
      eventhubs {
        service_namespace => "test"
        event_hub => "test1"
        sas_key_name => "WriteAnyHub"
        sas_key => "*redacted*"
        partition_key => ["[tag][host]", "[tag][sql_instance]", "[tag][instance]", "[tag][scope]", "[tag][objectname]", "[tag][what]"]
      }
    }

Logstash is using default pipeline settings.

The output plugin is formatted like this with all the necessary stuff:

def register
      @connection = create connection
end

def receive(event)
     format event into event hub data
     @connection.send data to event hubs synchronously  
end

First question I have is how does logstash run output plugins? Does it call register once and then call receive as each event comes in, keeping the initial connection and using it?

Should I be using multi receive? I am sending time order sensitive data so it needs to be as synchronous as possible.

Do you think there would be any performance problems sending over AMQP?

The only code that may be inefficient is the code I use to get the partition key for sending to event hubs.

private
  def get_partition_key(event)
    temp_string = ""
    if @partition_key == nil
      return SecureRandom.uuid
    end
    for i in 0..(@partition_key.length - 1) do
      temp = (event.get(@partition_key[i]))
      if temp != nil
        if (i == 0)
          temp_string += (temp)
        else
          temp_string += ("." + temp)
        end
      end
    end
    return temp_string
    
  end # def get_partition_key

Is there anything wrong with this code?

If you need more context or code just ask and I will post below.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.