I am currently developing an Azure Event Hubs output plugin for Logstash and I have ran into a problem where after running for around 30 minutes logstash will stop accepting flushes from telegraf for a period of time (3-10 minutes) and it looks like it is hung up processing and sending already received events.
My technology stack:
Telegraf sending to logstash over tcp.
Logstash Configured like so:
input {
tcp {
port => 5046
codec => json
}
}
filter {
mutate {
# Tags is a keyword in logstash, causes mutate to autoformat tags values when parsing. This prevents that.
rename => {"tags" => "tag"}
# Removes logstash auto generated fields
# remove_field => ["@timestamp","host","port","@version"]
}
}
output {
eventhubs {
service_namespace => "test"
event_hub => "test1"
sas_key_name => "WriteAnyHub"
sas_key => "*redacted*"
partition_key => ["[tag][host]", "[tag][sql_instance]", "[tag][instance]", "[tag][scope]", "[tag][objectname]", "[tag][what]"]
}
}
Logstash is using default pipeline settings.
The output plugin is formatted like this with all the necessary stuff:
def register
@connection = create connection
end
def receive(event)
format event into event hub data
@connection.send data to event hubs synchronously
end
First question I have is how does logstash run output plugins? Does it call register once and then call receive as each event comes in, keeping the initial connection and using it?
Should I be using multi receive? I am sending time order sensitive data so it needs to be as synchronous as possible.
Do you think there would be any performance problems sending over AMQP?
The only code that may be inefficient is the code I use to get the partition key for sending to event hubs.
private
def get_partition_key(event)
temp_string = ""
if @partition_key == nil
return SecureRandom.uuid
end
for i in 0..(@partition_key.length - 1) do
temp = (event.get(@partition_key[i]))
if temp != nil
if (i == 0)
temp_string += (temp)
else
temp_string += ("." + temp)
end
end
end
return temp_string
end # def get_partition_key
Is there anything wrong with this code?
If you need more context or code just ask and I will post below.