Sending mutiple events to single event using multiline codec for email output?

I am using logstash 2.4.0

My config is like this:

        input {
      file {
        path => "F:\logstash-2.4.0\logstash-2.4.0\bin\slowlog.txt"
        start_position => "beginning"
        codec => multiline {
          # Grok pattern names are valid! :)
          pattern => "^%{TIMESTAMP_ISO8601} "
          what => previous
        }
      }
    }
    
    filter {
        grok {
           match => [ "message", "\[%{TIMESTAMP_ISO8601:TIMESTAMP}\]\[%{LOGLEVEL:LEVEL}%{SPACE}\]\[%{DATA:QUERY}\]%{SPACE}\[%{DATA:QUERY1}\]%{SPACE}\[%{DATA:INDEX-NAME}\]\[%{DATA:SHARD}\]%{SPACE}took\[%{DATA:TOOK}\],%{SPACE}took_millis\[%{DATA:TOOKM}\], types\[%{DATA:types}\], stats\[%{DATA:stats}\], search_type\[%{DATA:search_type}\], total_shards\[%{NUMBER:total_shards}\], source\[%{DATA:source_query}\], extra_source\[%{DATA:extra_source}\],"]
        }
    
        # ==> add this filter to convert TOOKM to integer
        mutate {
            convert => { "TOOKM" => "integer" }
        }
    
        # ==> use TOOKM field instead
        if [TOOKM] > 30 {
            
        } else {
            drop { }
        }
    }
    output {
       stdout { codec => rubydebug }
    }

My output is like this:

{
      "@timestamp" => "2017-05-10T18:14:47.269Z",
         "message" => "[2017-01-14 10:59:58,591][WARN ][index.search.slowlog.query] [yaswanth] [bank][3] took[50ms], took_millis[50], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r",
        "@version" => "1",
            "path" => "F:\\logstash-2.4.0\\logstash-2.4.0\\bin\\picaso.txt",
            "host" => "yaswanth",
       "TIMESTAMP" => "2017-01-14 10:59:58,591",
           "LEVEL" => "WARN",
           "QUERY" => "index.search.slowlog.query",
          "QUERY1" => "yaswanth",
      "INDEX-NAME" => "bank",
           "SHARD" => "3",
            "TOOK" => "50ms",
           "TOOKM" => 50,
           "types" => "details",
     "search_type" => "QUERY_THEN_FETCH",
    "total_shards" => "5",
    "source_query" => "{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}"
}
{
      "@timestamp" => "2017-05-10T18:14:47.270Z",
         "message" => "[2017-01-14 10:59:58,591][WARN ][index.search.slowlog.query] [yaswanth] [bank][2] took[50.2ms], took_millis[50], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r",
        "@version" => "1",
            "path" => "F:\\logstash-2.4.0\\logstash-2.4.0\\bin\\picaso.txt",
            "host" => "yaswanth",
       "TIMESTAMP" => "2017-01-14 10:59:58,591",
           "LEVEL" => "WARN",
           "QUERY" => "index.search.slowlog.query",
          "QUERY1" => "yaswanth",
      "INDEX-NAME" => "bank",
           "SHARD" => "2",
            "TOOK" => "50.2ms",
           "TOOKM" => 50,
           "types" => "details",
     "search_type" => "QUERY_THEN_FETCH",
    "total_shards" => "5",
    "source_query" => "{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}"
}

But what i want is like this

{
          "@timestamp" => "2017-05-10T18:14:47.269Z",
             "message" => "[2017-01-14 10:59:58,591][WARN ][index.search.slowlog.query] [yaswanth] [bank][3] took[50ms], took_millis[50], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r",[2017-01-14 10:59:58,591][WARN ][index.search.slowlog.query] [yaswanth] [bank][2] took[50.2ms], took_millis[50], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r"
            "@version" => "1",
                "path" => "F:\\logstash-2.4.0\\logstash-2.4.0\\bin\\picaso.txt",
                "host" => "yaswanth",
           "TIMESTAMP" => "2017-01-14 10:59:58,591",
               "LEVEL" => "WARN",
               "QUERY" => "index.search.slowlog.query",
              "QUERY1" => "yaswanth",
          "INDEX-NAME" => "bank",
               "SHARD" => "3",
                "TOOK" => "50ms",
               "TOOKM" => 50,
               "types" => "details",
         "search_type" => "QUERY_THEN_FETCH",
        "total_shards" => "5",
        "source_query" => "{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}"
    }

I want to send all the message fields from multiple events to a single event for sending email .

Is there anything wrong in the above config ?

Thanks

Hello @Yaswanth:

I have realized that using filter plugin aggregate may serve to your intentions.

https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html

The aim of this filter is to aggregate information available among several events (typically log lines) belonging to a same task, and finally push aggregated information into final task event.

Thanks @Xavy . Based on your suggestion i had updated my config like this:

input {
  file {
    path => "F:\logstash-2.4.0\logstash-2.4.0\bin\picaso.txt"
    start_position => "beginning"
  }
}

filter {
    grok {
       match => [ "message", "\[%{TIMESTAMP_ISO8601:TIMESTAMP}\]\[%{LOGLEVEL:LEVEL}%{SPACE}\]\[%{DATA:QUERY}\]%{SPACE}\[%{DATA:QUERY1}\]%{SPACE}\[%{DATA:INDEX-NAME}\]\[%{DATA:SHARD}\]%{SPACE}took\[%{DATA:TOOK}\],%{SPACE}took_millis\[%{DATA:TOOKM}\], types\[%{DATA:types}\], stats\[%{DATA:stats}\], search_type\[%{DATA:search_type}\], total_shards\[%{NUMBER:total_shards}\], source\[%{DATA:source_query}\], extra_source\[%{DATA:extra_source}\],"]
    }

    # ==> add this filter to convert TOOKM to integer
    mutate {
        convert => { "TOOKM" => "integer" }
    }

    # ==> use TOOKM field instead
    if [TOOKM] > 30 {
     aggregate {
      task_id => "%{message}"
      code => "event.set('message')"
      end_of_task => true
       timeout => 120
      }
        
    } else {
        drop { }
    }
}
output {
   stdout { codec => rubydebug }
}

My output in the screen is like this:

  "message" => "[2017-01-14 10:59:58,591][WARN ][index.search.slowlog.query] [yaswanth] [bank][2] took[50.2ms], took_millis[50], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r",
        "@version" => "1",
      "@timestamp" => "2017-05-11T03:13:53.563Z",
            "path" => "F:\\logstash-2.4.0\\logstash-2.4.0\\bin\\picaso.txt",
            "host" => "yaswanth",
       "TIMESTAMP" => "2017-01-14 10:59:58,591",
           "LEVEL" => "WARN",
           "QUERY" => "index.search.slowlog.query",
          "QUERY1" => "yaswanth",
      "INDEX-NAME" => "bank",
           "SHARD" => "2",
            "TOOK" => "50.2ms",
           "TOOKM" => 50,
           "types" => "details",
     "search_type" => "QUERY_THEN_FETCH",
    "total_shards" => "5",
    "source_query" => "{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}",
            "tags" => [
        [0] "_aggregateexception"
    ]
}
{
         "message" => "[2017-01-14 10:59:58,593][WARN ][index.search.slowlog.query] [yaswanth] [bank][1] took[52.2ms], took_millis[52], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r",
        "@version" => "1",
      "@timestamp" => "2017-05-11T03:13:53.564Z",
            "path" => "F:\\logstash-2.4.0\\logstash-2.4.0\\bin\\picaso.txt",
            "host" => "yaswanth",
       "TIMESTAMP" => "2017-01-14 10:59:58,593",
           "LEVEL" => "WARN",
           "QUERY" => "index.search.slowlog.query",
          "QUERY1" => "yaswanth",
      "INDEX-NAME" => "bank",
           "SHARD" => "1",
            "TOOK" => "52.2ms",
           "TOOKM" => 52,
           "types" => "details",
     "search_type" => "QUERY_THEN_FETCH",
    "total_shards" => "5",
    "source_query" => "{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}",
            "tags" => [
        [0] "_aggregateexception"
    ]

What i want in my final event should have all the message fields in the above logs like

{

 "message" => [2017-01-14 10:59:58,591][WARN ][index.search.slowlog.query] [yaswanth] [bank][2] took[50.2ms], took_millis[50], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r",[2017-01-14 10:59:58,593][WARN ][index.search.slowlog.query] [yaswanth] [bank][1] took[52.2ms], took_millis[52], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r"

}

Whether my approach for the scenario is correct? I dont know exactly what to keep in task_id and code in aggregate filter to produce desired result.
Thanks

Hello:

Your new config is not working as it is triggering an aggregate exception:

if code execution raises an exception, the error is logged and event is tagged _aggregateexception

Unfortunately I don't have here a 2.3 scenario where I can test your use case, but I think that something similar to the example in doc should work:

code => "map['aggr_message'] += event.get('message')"

This should create a new field aggr_message

Regarding the task_id, it may be whatever you want ("%{QUERY}" sounds OK to me)

Actually, if you look at the doc, "Example #4" looks quite similar to your needs (it aggregates a text field from several events into just one), with the only difference that the example aggregates country_name field, while you want to aggregate the message one.

Hope this brings some light on how you may achieve your needs :wink:

Thanks so much @Xavy

Even i tried with by giving this

if [TOOKM] > 15 {
    aggregate {
      task_id => "%{QUERY}"
      code => "map['aggr_message'] += event.get('message')"
      end_of_task => true
       timeout => 120
      }

    } else {
        drop { }
    }

It is throwing error:

Aggregate exception occurred {:error=>#<NoMethodError: undefined method `+' for nil:NilClass>, :code=>"map['aggr_message'] += event.get('message')", :map=>{}, :event_data=>{"message"=>"[2017-04-25 04:40:05,240][TRACE][index.search.slowlog.query] [data-0] [data-apr-2017][4] took[20.6ms], took_millis[20], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"query\":{\"bool\":{\"must\":[{\"terms\":{\"articleId\":[316249486]}}]}}}], extra_source[],", "@version"=>"1", "@timestamp"=>"2017-05-11T06:54:50.932Z", "path"=>"/home/itadmin/logstash/logstash-2.4.1/slow.txt", "host"=>"kibana", "TIMESTAMP"=>"2017-04-25 04:40:05,240", "LEVEL"=>"TRACE", "QUERY"=>"index.search.slowlog.query", "QUERY1"=>"data-0", "INDEX-NAME"=>"data-apr-2017", "SHARD"=>"4", "TOOK"=>"20.6ms", "TOOKM"=>20, "types"=>"publishedarticle", "search_type"=>"QUERY_THEN_FETCH", "total_shards"=>"5", "source_query"=>"{\"query\":{\"bool\":{\"must\":[{\"terms\":{\"articleId\":[316249486]}}]}}}", "@metadata"=>{"path"=>"/home/itadmin/logstash/logstash-2.4.1/slow.txt"}}, :level=>:error}
Aggregate exception occurred {:error=>#<NoMethodError: undefined method `+' for nil:NilClass>, :code=>"map['aggr_message'] += event.get('message')", :map=>{}, :event_data=>{"message"=>"[2017-04-25 05:47:02,335][TRACE][index.search.slowlog.query] [data-0] [data-apr-2017][4] took[20.8ms], took_millis[20], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"query\":{\"bool\":{\"must\":[{\"terms\":{\"articleId\":[316252085]}}]}}}], extra_source[],", 

Thanks

Hello:

It looks the code does not create the field by itself, so I would try using message to store the final message field (something like this:

  code => "map['message'] += event.get('message')"

After using too, It is showing the same error

Thanks

Thanks @Xavy

You was very helpful . As you said i interpreted #ex 4 in documentation which is similar to my scenario and used it .

  aggregate {
        task_id => "%{LEVEL}"
        code => "
          map['LEVEL'] = event.get('LEVEL')
          map['messages'] ||= []
          map['messages'] << {'message' => event.get('message')}
          event.cancel()
        "
        push_previous_map_as_event => true
        timeout => 3
      }

It worked fine . I used against the LEVEL field.

Thanks

You're welcome :wink: If any of my posts particularly helped you inf finding the solution, I would thank you if you could mark it as "Solution"

Glad of being of any help

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.