How to manage multiline events based on a random field

Hello all,

I've been facing a problem related to multiline events lately, and I am needing a little bit of your help for this. My syslog server is sending multi-line events. One single event gathers several lines, and the indicator that proves a particular event line is part of a multi-line event is a random number that defines a user connection session. Here is a custom generated log file:

Feb 16 17:29:04 slot1/APM-LTM notice apd[5515]: 01490010:5: 1ec2b273: Username 'cjones'
Feb 16 17:29:04 slot1/APM-LTM warning apd[5515]: 01490106:4: 1ec2b273: AD module: authentication with 'cjones' failed: Preauthentication failed, principal name: cjones@GEEKO.COM. Invalid user credentials.  (-1765328360)
Feb 16 17:10:04 slot1/APM-LTM notice apd[5515]: 01490010:5: d8b5a591: Username 'gbridget'
Feb 16 17:10:04 slot1/APM-LTM err apd[5515]: 01490107:3: d8b5a591: AD module: authentication with 'gbridget' failed: Clients credentials have been revoked, principal name: gbridget@GEEKO.COM. User account is locked  (-1765328366)
Feb 16 17:29:04 slot1/APM-LTM notice apd[5515]: 01490005:5: 1ec2b273: Following rule 'fallback' from item 'AD Auth' to ending 'Deny'
Feb 16 17:29:04 slot1/APM-LTM notice apd[5515]: 01490102:5: 1ec2b273: Access policy result: Logon_Deny

Above are the lines related to two different connections defined by the following user sessions: d8b5a591(user gbridget) and 1ec2b273(user cjones). user sessions are the only indicators that connect those lines to two different events. not to mention that the line events are interlaced.

The problem is that I am at loss as how to explain the above to grok filter with a multiline plugin, knowing that the latter offers too few options. In fact , the notion of "previous" and "next" line cannot be applied here for instance, so the grok options "pattern" and "what" cannot be used, since the events are not necessarily consecutive.

I would really appreciate it if someone could shed some light on this and tell me if at least it is feasable or not.

Do you really need to join these lines into a single event? To me they're separate but related events and I wouldn't attempt to merge them. If you insist, have a look at the collate and aggregate filters.

Are there other alternatives? I actually have no idea what else I could do to perform what I've got in mind. I'd like to gather the lines related to a particular connection into one single event, hence, one single document, so that I could query the latter to fetch all I need about the related connection, and visualize it via KIBANA.

It would be something like this:, but rather than dealing with consecutive multiline events, one'd deal with interlaced events.

I have set the number of threads to multiple and can't measure the amount of time an event can last so neither of the filters you mentioned can work in my environment :frowning: And I've been told that ES documents cannot be joined in order to be queried, I am really stuck. Any clue for this? :smiley:

Requesting help again :slight_smile:

I've got exactly the same question. Can't believe this isn't a common issue.
For me I got exception messages within my log file including multiline
stack traces. So it's not suitable to have one event for each line of my stack
The solution would be to set a field, e.g. @timestamp, as a delimiter since
this is the real separator for the log entries but this seems not to work at

My delimiter is the dynamic id. @timestamp can't be a delimiter since each line of log has its own timestamp. The last log line of a particular event may come 2 hours after the first one, and meanwhile other events may occur.

The dynamic id? Could you be so kind to post your .conf file, please?

If I had to configure a multiline, it would be something like this:

stream_identity => "%{dynamic_id}"
pattern  => "."
what => "previous"
add_tag => ["multiline"]

But I have a number of threads => 3. Multiline filter cannot be applied since the number of threads is superior to 1. In the other hand, Multiline codec doesn't provide with the stream_identity field. What should I do?

Up to me, aggregate filter answers precisely to your need.
It allows to aggregate multiple lines into one event, using a correlation id. In your case, the correlation id is your field "dynamic_id".
And aggregate manages interlaced lines, contrary to multiline plugin.

That said :

  • to work fine, you need a "end session" marker in your logs. Without that, you can't know when the session is finished and aggregated event should be sent to elasticsearch.
  • technically, aggregate filter could be used with mulitple threads. It would work fine if your end line is several seconds after intermediate lines. If not, with multiple threads, you risk that end line could be processed before an intermediate line and then loose this intermediate line.

First of all, thanks for answering. It helps.

It is ok if there is only one second between first line and end line? and meanwhile other lines belonging to other events would come between them?

I'll try that out. If it works, you would've made my day.

If there is a about one second, to me, it's OK.
And aggregate plugin is designed to process multiple tasks (or sessions in your case) that come in the same time.
So I think it is the plugin which answers your need.

Finally, if you give me line example for the "end event" and event json structure you expect at end, I can give you logstash configuration to do that (if you are interested).

Sure any help is welcome. An end line would look like this:

<141>May  2 15:04:01 slot1/LTM notice tmm1[11112]: 01490505:5: 00000000: PPP tunnel 0x000000000000 started.

And its JSON equivalent:

"_index": "logstash-XXXX.XX.XX",
"_type": "logstash",
  "_score": null,
  "_source": {
    "message": "<141>May  2 15:04:01 slot1/LTM notice tmm1[11112]: 01490505:5: 00000000: PPP tunnel 0x000000000000 started.\n",
    "@version": "1",
    "@timestamp": "2016-05-02T13:04:01.055Z",
    "type": "logstash",  
    "host": "",
    "tags": [
    "priority": 0,
    "severity": 0,
    "facility": 0, 
    "facility_label": "kernel",
    "severity_label": [
    "received_at": "2016-05-02T13:04:01.055Z",
    "syslog_severity_code": 5,
    "syslog_facility_code": 1,
    "syslog_facility": "user-level",
    "syslog_severity": "notice",
    "syslog_pri": "141",
    "logsource": "LTM",
    "program": "tmm1",
    "pid": "11112",
    "sequence": "01490505:5",
    "seqid": "00000000",
    "info": "PPP tunnel 0x000000000000 started.\n",
    "tunnel_id": "0x000000000000"
  "fields": {
    "@timestamp": [
    "received_at": [
  "highlight": {
    "message": [
      "<141>May  2 15:04:01 slot1/LTM notice tmm1[11112]: 01490505:5: @kibana-highlighted-field@00000000@/kibana-highlighted-field@: PPP tunnel 0x000000000000 started.\n"
    "seqid.raw": [
    "seqid": [
     "sort": [

OK, so :

  • the pattern for the end event is "tunnel .+ started.$" ?
  • how do you want lines are aggregated in final event ? message field as an array ? message field as a string with "\n" between each line ?
  • which is the correlation id field ? apm_seqid ?

Yes let's say my pattern is "PPP tunnel.*started.$"
In final event, I'd want the lines having the same dynamic id "seqid" be tied together and combined into one string message with "\n" between each. I don't consider having an array.


aggregate {
    task_id => "%{apm_seqid}"
    code => "map['message'] ||= '' ; map['message'] += event['message'] + '\n'"

if [message] =~ "PPP tunnel .* started.$" {
    aggregate {
        task_id => "%{seqid}"
        code => "event['message'] = map['message']"
        end_of_task => true
1 Like

Ok, I'll try out the configuration above, and give you my feedback as soon as possible. Thanks anyway.

Either I didn't understand how to use it or my configuration is wrong, I've added the following code:

    aggregate {
            task_id => "%{apm_seqid}"
            code => "map['message'] ||= '' ; map['message'] += event['message'] + '\n'"
            add_tag => ['start']

    if [message] =~ "^.*PPP tunnel*+started.$" {
    aggregate {
            task_id => "%{apm_seqid}"
            code => "event['message'] = map['message']"
            end_of_task => true
            add_tag => ['end']

All my messages are tagged with "start" as wanted in the configuration but the end seems impossible to be reached out, although the end exists; the "PPP.." log line isn't tagged with "end".

How am I supposed to see my tied up events in the end?

Another question, how to make sure all the log lines don't belong to the "map". Some events are very different and don't necessarily end with the "PPP tunnel" stuff. How to create an aggregate rule for each type of event?

Also, the line log: "PPP tunnel .* started.$" is grokked in the configuration that come before aggregate filter. Why not just representing it with its grok configuration instead of a regex expression?

Thanks in advance

I think your pattern in "if" is bad.
To be sure it works (and especially when your message finishes by \n, I advice you :
if [message] =~ "PPP tunnel .* started\." {

Thus, in "if" conditions, you can only use regex patterns, not grok patterns.

To reply to this question, aggregate will only process log lines with "apm_seqid" field. It is the correlation id used to identify aggregate map.
So only lines with "apm_seqid" field are processed, and one aggregate map is associated to one and only one "apm_seqid" value.

Thank you, it worked eventually with the configuration:

if [message] =~ ".* PPP tunnel .* started\." 

The expression "PPP..." is preceded by syslog details.

Anyway, it works fine for some lines, those are tied together, but I've noticed something; the events that occur at the same time (same second) are not gathered, and only the first or last event of the group occuring at the same time is selected to be included in the multiline event.

Is there a way to overcome this?