Sending mutiple events to single event using multiline codec for email output?


(Yaswanth ) #1

I am using logstash 2.4.0

My config is like this:

        input {
      file {
        path => "F:\logstash-2.4.0\logstash-2.4.0\bin\slowlog.txt"
        start_position => "beginning"
        codec => multiline {
          # Grok pattern names are valid! :)
          pattern => "^%{TIMESTAMP_ISO8601} "
          what => previous
        }
      }
    }
    
    filter {
        grok {
           match => [ "message", "\[%{TIMESTAMP_ISO8601:TIMESTAMP}\]\[%{LOGLEVEL:LEVEL}%{SPACE}\]\[%{DATA:QUERY}\]%{SPACE}\[%{DATA:QUERY1}\]%{SPACE}\[%{DATA:INDEX-NAME}\]\[%{DATA:SHARD}\]%{SPACE}took\[%{DATA:TOOK}\],%{SPACE}took_millis\[%{DATA:TOOKM}\], types\[%{DATA:types}\], stats\[%{DATA:stats}\], search_type\[%{DATA:search_type}\], total_shards\[%{NUMBER:total_shards}\], source\[%{DATA:source_query}\], extra_source\[%{DATA:extra_source}\],"]
        }
    
        # ==> add this filter to convert TOOKM to integer
        mutate {
            convert => { "TOOKM" => "integer" }
        }
    
        # ==> use TOOKM field instead
        if [TOOKM] > 30 {
            
        } else {
            drop { }
        }
    }
    output {
       stdout { codec => rubydebug }
    }

My output is like this:

{
      "@timestamp" => "2017-05-10T18:14:47.269Z",
         "message" => "[2017-01-14 10:59:58,591][WARN ][index.search.slowlog.query] [yaswanth] [bank][3] took[50ms], took_millis[50], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r",
        "@version" => "1",
            "path" => "F:\\logstash-2.4.0\\logstash-2.4.0\\bin\\picaso.txt",
            "host" => "yaswanth",
       "TIMESTAMP" => "2017-01-14 10:59:58,591",
           "LEVEL" => "WARN",
           "QUERY" => "index.search.slowlog.query",
          "QUERY1" => "yaswanth",
      "INDEX-NAME" => "bank",
           "SHARD" => "3",
            "TOOK" => "50ms",
           "TOOKM" => 50,
           "types" => "details",
     "search_type" => "QUERY_THEN_FETCH",
    "total_shards" => "5",
    "source_query" => "{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}"
}
{
      "@timestamp" => "2017-05-10T18:14:47.270Z",
         "message" => "[2017-01-14 10:59:58,591][WARN ][index.search.slowlog.query] [yaswanth] [bank][2] took[50.2ms], took_millis[50], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r",
        "@version" => "1",
            "path" => "F:\\logstash-2.4.0\\logstash-2.4.0\\bin\\picaso.txt",
            "host" => "yaswanth",
       "TIMESTAMP" => "2017-01-14 10:59:58,591",
           "LEVEL" => "WARN",
           "QUERY" => "index.search.slowlog.query",
          "QUERY1" => "yaswanth",
      "INDEX-NAME" => "bank",
           "SHARD" => "2",
            "TOOK" => "50.2ms",
           "TOOKM" => 50,
           "types" => "details",
     "search_type" => "QUERY_THEN_FETCH",
    "total_shards" => "5",
    "source_query" => "{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}"
}

But what i want is like this

{
          "@timestamp" => "2017-05-10T18:14:47.269Z",
             "message" => "[2017-01-14 10:59:58,591][WARN ][index.search.slowlog.query] [yaswanth] [bank][3] took[50ms], took_millis[50], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r",[2017-01-14 10:59:58,591][WARN ][index.search.slowlog.query] [yaswanth] [bank][2] took[50.2ms], took_millis[50], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r"
            "@version" => "1",
                "path" => "F:\\logstash-2.4.0\\logstash-2.4.0\\bin\\picaso.txt",
                "host" => "yaswanth",
           "TIMESTAMP" => "2017-01-14 10:59:58,591",
               "LEVEL" => "WARN",
               "QUERY" => "index.search.slowlog.query",
              "QUERY1" => "yaswanth",
          "INDEX-NAME" => "bank",
               "SHARD" => "3",
                "TOOK" => "50ms",
               "TOOKM" => 50,
               "types" => "details",
         "search_type" => "QUERY_THEN_FETCH",
        "total_shards" => "5",
        "source_query" => "{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}"
    }

I want to send all the message fields from multiple events to a single event for sending email .

Is there anything wrong in the above config ?

Thanks


(Javier) #2

Hello @Yaswanth:

I have realized that using filter plugin aggregate may serve to your intentions.

https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html

The aim of this filter is to aggregate information available among several events (typically log lines) belonging to a same task, and finally push aggregated information into final task event.


(Yaswanth ) #3

Thanks @Xavy . Based on your suggestion i had updated my config like this:

input {
  file {
    path => "F:\logstash-2.4.0\logstash-2.4.0\bin\picaso.txt"
    start_position => "beginning"
  }
}

filter {
    grok {
       match => [ "message", "\[%{TIMESTAMP_ISO8601:TIMESTAMP}\]\[%{LOGLEVEL:LEVEL}%{SPACE}\]\[%{DATA:QUERY}\]%{SPACE}\[%{DATA:QUERY1}\]%{SPACE}\[%{DATA:INDEX-NAME}\]\[%{DATA:SHARD}\]%{SPACE}took\[%{DATA:TOOK}\],%{SPACE}took_millis\[%{DATA:TOOKM}\], types\[%{DATA:types}\], stats\[%{DATA:stats}\], search_type\[%{DATA:search_type}\], total_shards\[%{NUMBER:total_shards}\], source\[%{DATA:source_query}\], extra_source\[%{DATA:extra_source}\],"]
    }

    # ==> add this filter to convert TOOKM to integer
    mutate {
        convert => { "TOOKM" => "integer" }
    }

    # ==> use TOOKM field instead
    if [TOOKM] > 30 {
     aggregate {
      task_id => "%{message}"
      code => "event.set('message')"
      end_of_task => true
       timeout => 120
      }
        
    } else {
        drop { }
    }
}
output {
   stdout { codec => rubydebug }
}

My output in the screen is like this:

  "message" => "[2017-01-14 10:59:58,591][WARN ][index.search.slowlog.query] [yaswanth] [bank][2] took[50.2ms], took_millis[50], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r",
        "@version" => "1",
      "@timestamp" => "2017-05-11T03:13:53.563Z",
            "path" => "F:\\logstash-2.4.0\\logstash-2.4.0\\bin\\picaso.txt",
            "host" => "yaswanth",
       "TIMESTAMP" => "2017-01-14 10:59:58,591",
           "LEVEL" => "WARN",
           "QUERY" => "index.search.slowlog.query",
          "QUERY1" => "yaswanth",
      "INDEX-NAME" => "bank",
           "SHARD" => "2",
            "TOOK" => "50.2ms",
           "TOOKM" => 50,
           "types" => "details",
     "search_type" => "QUERY_THEN_FETCH",
    "total_shards" => "5",
    "source_query" => "{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}",
            "tags" => [
        [0] "_aggregateexception"
    ]
}
{
         "message" => "[2017-01-14 10:59:58,593][WARN ][index.search.slowlog.query] [yaswanth] [bank][1] took[52.2ms], took_millis[52], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r",
        "@version" => "1",
      "@timestamp" => "2017-05-11T03:13:53.564Z",
            "path" => "F:\\logstash-2.4.0\\logstash-2.4.0\\bin\\picaso.txt",
            "host" => "yaswanth",
       "TIMESTAMP" => "2017-01-14 10:59:58,593",
           "LEVEL" => "WARN",
           "QUERY" => "index.search.slowlog.query",
          "QUERY1" => "yaswanth",
      "INDEX-NAME" => "bank",
           "SHARD" => "1",
            "TOOK" => "52.2ms",
           "TOOKM" => 52,
           "types" => "details",
     "search_type" => "QUERY_THEN_FETCH",
    "total_shards" => "5",
    "source_query" => "{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}",
            "tags" => [
        [0] "_aggregateexception"
    ]

What i want in my final event should have all the message fields in the above logs like

{

 "message" => [2017-01-14 10:59:58,591][WARN ][index.search.slowlog.query] [yaswanth] [bank][2] took[50.2ms], took_millis[50], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r",[2017-01-14 10:59:58,593][WARN ][index.search.slowlog.query] [yaswanth] [bank][1] took[52.2ms], took_millis[52], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"sort\":[{\"balance\":{\"order\":\"asc\"}}]}], extra_source[], \r"

}

Whether my approach for the scenario is correct? I dont know exactly what to keep in task_id and code in aggregate filter to produce desired result.
Thanks


(Javier) #4

Hello:

Your new config is not working as it is triggering an aggregate exception:

if code execution raises an exception, the error is logged and event is tagged _aggregateexception

Unfortunately I don't have here a 2.3 scenario where I can test your use case, but I think that something similar to the example in doc should work:

code => "map['aggr_message'] += event.get('message')"

This should create a new field aggr_message

Regarding the task_id, it may be whatever you want ("%{QUERY}" sounds OK to me)

Actually, if you look at the doc, "Example #4" looks quite similar to your needs (it aggregates a text field from several events into just one), with the only difference that the example aggregates country_name field, while you want to aggregate the message one.

Hope this brings some light on how you may achieve your needs :wink:


(Yaswanth ) #5

Thanks so much @Xavy

Even i tried with by giving this

if [TOOKM] > 15 {
    aggregate {
      task_id => "%{QUERY}"
      code => "map['aggr_message'] += event.get('message')"
      end_of_task => true
       timeout => 120
      }

    } else {
        drop { }
    }

It is throwing error:

Aggregate exception occurred {:error=>#<NoMethodError: undefined method `+' for nil:NilClass>, :code=>"map['aggr_message'] += event.get('message')", :map=>{}, :event_data=>{"message"=>"[2017-04-25 04:40:05,240][TRACE][index.search.slowlog.query] [data-0] [data-apr-2017][4] took[20.6ms], took_millis[20], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"query\":{\"bool\":{\"must\":[{\"terms\":{\"articleId\":[316249486]}}]}}}], extra_source[],", "@version"=>"1", "@timestamp"=>"2017-05-11T06:54:50.932Z", "path"=>"/home/itadmin/logstash/logstash-2.4.1/slow.txt", "host"=>"kibana", "TIMESTAMP"=>"2017-04-25 04:40:05,240", "LEVEL"=>"TRACE", "QUERY"=>"index.search.slowlog.query", "QUERY1"=>"data-0", "INDEX-NAME"=>"data-apr-2017", "SHARD"=>"4", "TOOK"=>"20.6ms", "TOOKM"=>20, "types"=>"publishedarticle", "search_type"=>"QUERY_THEN_FETCH", "total_shards"=>"5", "source_query"=>"{\"query\":{\"bool\":{\"must\":[{\"terms\":{\"articleId\":[316249486]}}]}}}", "@metadata"=>{"path"=>"/home/itadmin/logstash/logstash-2.4.1/slow.txt"}}, :level=>:error}
Aggregate exception occurred {:error=>#<NoMethodError: undefined method `+' for nil:NilClass>, :code=>"map['aggr_message'] += event.get('message')", :map=>{}, :event_data=>{"message"=>"[2017-04-25 05:47:02,335][TRACE][index.search.slowlog.query] [data-0] [data-apr-2017][4] took[20.8ms], took_millis[20], types[details], stats[], search_type[QUERY_THEN_FETCH], total_shards[5], source[{\"query\":{\"bool\":{\"must\":[{\"terms\":{\"articleId\":[316252085]}}]}}}], extra_source[],", 

Thanks


(Javier) #6

Hello:

It looks the code does not create the field by itself, so I would try using message to store the final message field (something like this:

  code => "map['message'] += event.get('message')"

(Yaswanth ) #7

After using too, It is showing the same error

Thanks


(Yaswanth ) #8

Thanks @Xavy

You was very helpful . As you said i interpreted #ex 4 in documentation which is similar to my scenario and used it .

  aggregate {
        task_id => "%{LEVEL}"
        code => "
          map['LEVEL'] = event.get('LEVEL')
          map['messages'] ||= []
          map['messages'] << {'message' => event.get('message')}
          event.cancel()
        "
        push_previous_map_as_event => true
        timeout => 3
      }

It worked fine . I used against the LEVEL field.

Thanks


(Javier) #9

You're welcome :wink: If any of my posts particularly helped you inf finding the solution, I would thank you if you could mark it as "Solution"

Glad of being of any help


(system) #10

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.