Hey there,
I am attempting to merge the "message" field for syslog logs using the aggregate pipeline, and have been beating my head against a wall trying to get this scenario to work. I'm on the verge of giving up and implementing this using redis and ruby filters. Maybe someone can suggest where I'm going off the rails.
I'm ingesting logs using rsyslogd and its omkafka module, sending log events to Kafka using message keys based on the source machine's hostname.
The Kafka topic has exactly the same number of partitions as I've allocated worker threads. I have thoroughly tested whether each Kafka input filter on my logstash machines is assigned to one and only one partition (and they are).
Not every log I'm processing comes from a process that can generate multiple lines of output. The ones that do are tracked in a Redis set (via a Ruby filter). I've added logic to my filters to make sure that the aggregate filters can't even run unless they're being applied to a program that does in fact generate multiple log lines.
In my case, it's easy to tell the difference between "start" events and what I am calling "continuation" events for a given process. The "message" field in start events doesn't begin with "-->" and the continuation events do.
Here's my problem.
- There is no "end log" event. The same process from the same host logging to the same Kafka partition just sends a new "start event".
- I can't find a way to explicitly and asynchronously flush the aggregate filter's log to Elasticsearch when a new start event is detected. It seems that the log is just being overwritten by the next event without ever completing the pipeline and being sent on to the Elasticsearch output filter.
- I'm not concerned about timeouts since all of these log messages are sent within milliseconds for any given multiline log. I am, however, concerned about the existing aggregate log being lost when a new start event is detected, which seems to be what's happening.
Frankly I find the examples in the documentation not especially helpful for this kind of log. It may be that I'm trying to do something this filter just can't do, in which case I will build an asynchronous method of trapping and aggregating these events in Redis and running another process periodically to flush the completed ones. But if someone in the know could tell me whether what I'm attempting with the aggregate filter is even achievable, I would very much appreciate it.