Logstash how to output the source log path in my message

Im trying to get the number of times an ERROR|Error|error or Warn loglevel shows up in my log file and depending of the amount of times it repeats send a email/slack to the service owner with a summary like that

CDP-error: 5 Errors on file XXXX.log in the last 3 hours
CDP-Warn: 10 Warning on file XXXX.log in the last 3 hours

Right now Im just testing but I dont know why seem not to work properly, Im able to get the count when error or Warn appears but Im not able to add the path of the file where this error is happening.

Im sending my error messages using echo and manually pushing it to the log file:

echo "18/10/31 12:01:54 ERROR netty.Dispatcher: Message RemoteProcessDisconnected(10.192.64.33:56936) dropped. Could not find BlockManagerEndpoint1." >> OptusCatPush_test_async_6.log

Here is my output:

Settings: Default pipeline workers: 80
Pipeline main started
{
      "@version" => "1",
    "@timestamp" => "2019-02-12T00:15:15.242Z",
       "message" => "dm0010.hkg1.turn.com",
       "w_level" => {
           "count" => 1,
         "rate_1m" => 0.0,
         "rate_5m" => 0.0,
        "rate_15m" => 0.0
    },
          "tags" => [
        [0] "metric_a"
    ]
}
{
      "@version" => "1",
    "@timestamp" => "2019-02-12T00:15:30.240Z",
       "message" => "dm0010.hkg1.turn.com",
       "w_level" => {
           "count" => 1,
         "rate_1m" => 0.0,
         "rate_5m" => 0.0,
        "rate_15m" => 0.0
    },
          "tags" => [
        [0] "metric_a"
    ]
}

or

Pipeline main started
{
      "@version" => "1",
    "@timestamp" => "2019-02-12T00:08:35.885Z",
       "message" => "dm0010.hkg1.turn.com",
       "w_level" => {
           "count" => 1,
         "rate_1m" => 0.0,
         "rate_5m" => 0.0,
        "rate_15m" => 0.0
    },
          "tags" => [
        [0] "metric_a"
    ]
}
CDP-ERROR: {"count":1,"rate_1m":0.0,"rate_5m":0.0,"rate_15m":0.0}
{
      "@version" => "1",
    "@timestamp" => "2019-02-12T00:08:50.886Z",
       "message" => "dm0010.hkg1.turn.com",
       "w_level" => {
           "count" => 1,
         "rate_1m" => 0.0,
         "rate_5m" => 0.0,
        "rate_15m" => 0.0
    },
          "tags" => [
        [0] "metric_a"
    ]
}
CDP-ERROR: {"count":1,"rate_1m":0.0,"rate_5m":0.0,"rate_15m":0.0}
{
      "@version" => "1",
    "@timestamp" => "2019-02-12T00:09:05.885Z",
       "message" => "dm0010.hkg1.turn.com",
       "w_level" => {
           "count" => 6,
         "rate_1m" => 0.0,
         "rate_5m" => 0.0,
        "rate_15m" => 0.0
    },
          "tags" => [
        [0] "metric_a"
    ]
}

Here is my config:

input {
  file {
    path => [ '/home/fxea/OptusCatPush_test_async_6.log' ]
    type => "oozie"
  }
}

filter {
  if [type] == "oozie" {
    grok {
      match => [ "message", "%{YEAR:year}\/%{MONTHNUM:month}\/%{MONTHDAY:day} %{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second} %{LOGLEVEL:loglevel} %{GREEDYDATA:message}" ]
    }
  }

  if [loglevel] =~ /"|WARN|Warn|warn|"/ {
    metrics {
      meter => "w_level"
      flush_interval => 5
      clear_interval =>10
      add_tag => "metric_a"
    }
  }
  else if [loglevel] =~ /"|ERROR|Error|error|"/ {
    metrics {
      meter => "e_level"
      flush_interval => 5
      clear_interval =>10
      add_tag => "metric_b"
    }
  }
}
output {
  if "metric_a" in [tags] and [w_level] {
    stdout {
      codec => line {
        format => "CDP-WARN: %{[w_level][count]} in file %{path}"
      }
    }
  }
  else if "metric_b" in [tags] and [e_level] {
    stdout {
      codec => line {
         format => "CDP-ERROR: %{[e_level][count]} in file %{path}"
      }
    }
  }
}

and send the log file to Elastic as default. Not included here yet

A metric filter injects new events into the event stream. It is not going to have any fields from the events that it is measuring.

If you need it to then you could use ruby to add it. Before the metric filter stash the path in a ruby class variable.

ruby { code => '@@path = event.get("path")' }

and after the last metric filter conditionally add it

if "metric_a" in [tags] or "metric_b" in [tags] {
    ruby { code => 'event.set("path", @@path)' }
}

This assumes that path is a constant

That seems not to work properly. Here the output I get:

Pipeline main started
Ruby exception occurred: undefined method `get' for #<LogStash::Event:0x49110163> {:level=>:error}
Ruby exception occurred: undefined method `get' for #<LogStash::Event:0xb00d33b> {:level=>:error}
Ruby exception occurred: undefined method `get' for #<LogStash::Event:0xc63cc37> {:level=>:error}
Ruby exception occurred: uninitialized class variable @@source_log in LogStash::Filters::Ruby {:level=>:error}
{
      "@version" => "1",
    "@timestamp" => "2019-02-22T23:04:21.558Z",
       "message" => "dm0010.hkg1.turn.com",
       "w_level" => {
           "count" => 3,
         "rate_1m" => 0.0,
         "rate_5m" => 0.0,
        "rate_15m" => 0.0
    },
          "tags" => [
        [0] "metric_a",
        [1] "_rubyexception"
    ]
}

here is my code:

input {
  file {
    path => [ '/home/fxea/ferchis.log' ]
    type => "oozie"
  }
}

filter {
  grok {
    match => { "message" =>  "%{YEAR:year}\/%{MONTHNUM:month}\/%{MONTHDAY:day} %{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second} %{LOGLEVEL:loglevel} %{GREEDYDATA:method}: %{GREEDYDATA:messages}" }
    add_tag => "loglevel_parsed"
  }

# Drop if theres failure parsing the log
  if "_grokparsefailure" in [tags] {
    drop { }
  }

# In order to get the path,
  ruby {
    code => '@@path= event.get("path")'
  }

  if [loglevel] =~ /"|WARN|warn|Warn|"/ {
    metrics {
      meter => "w_level"
      flush_interval => 5
      clear_interval => 10
      add_tag => ["metric_a"]
    }
  }
  else if [loglevel] =~ /"|ERROR|Error|error|"/ {
    metrics {
      meter => "e_level"
      flush_interval => 5
      clear_interval =>10
      add_tag => ["metric_b"]
    }
  }
  if "metric_a" in [tags] or "metric_b" in [tags]{
    ruby {
     code => 'event.set("path", @@source_log)'
    }
  }
}

output {
  if "metric_a" in [tags] and [w_level] {
    stdout {
       codec => rubydebug
   #   codec => line {
     #   format => "CDP-WARN Count: %{[w_level][count]} in file: [@metadata][path]"
    #  }
    }
  }
}

Which version of logstash are you running?

Sorry I forgot to mentioned 2.3.1

Wow, that's old.

I think in that case you can replace the get/set API with direct field references in the ruby code.

  event['path'] = @@path

and similarly on the getting. But I am unable to test it, so I may be wrong.

Ok so this part works:

  ruby {
   code => "
     slogfile = event['path']
    "
  }

Thats where I declare that slogfile has the path and I was able to print:

[f@dm0010.hkg1.com bin]$ clear;./logstash -f /home/fxea/logstash.conf1
Settings: Default pipeline workers: 80
Pipeline main started
/home/fxea/ferchis.log
/home/fxea/ferchis.log
{
      "@version" => "1",
    "@timestamp" => "2019-02-23T01:49:15.295Z",
       "message" => "dm0010.hkg1.turn.com",
       "w_level" => {
           "count" => 2,
         "rate_1m" => 0.0,
         "rate_5m" => 0.0,
        "rate_15m" => 0.0
    },
          "tags" => [
        [0] "metric_a"
    ]
}

However I dont know how to output that variable

I think you would still need to use a class variable so that is available to two different filters. To save the path use

ruby {
  code => "
    @@slogfile = event['path']
  "
}

and to add it to the event use

ruby {
  code => "
    event['path'] = @@slogfile
  "
}

I see, I got little lost, would you mind show me how the final config should look like?
Im confuse of where to add the ruby code part to create the new event and how to retrieve the event in the output.

filter {
  grok {
    match => { "message" =>  "%{YEAR:year}\/%{MONTHNUM:month}\/%{MONTHDAY:day} %{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second} %{LOGLEVEL:loglevel} %{GREEDYDATA:method}: %{GREEDYDATA:messages}" }
    add_tag => "loglevel_parsed"
  }

# Drop if theres failure parsing the log
  if "_grokparsefailure" in [tags] {
    drop { }
  }

#In order to get the path,
  ruby {
   code => "
     @@slogfile = event['path']
    "
  }

  if [loglevel] =~ /"|WARN|warn|Warn|"/ {
    metrics {
      meter => "w_level"
      flush_interval => 5
      clear_interval => 10
      add_tag => ["metric_a"]
      add_field => { "logs" => "%{@@slogfile}ruby {
   code => "
     @@slogfile = event['path']
    "
  }" }
    }
  }
  else if [loglevel] =~ /"|ERROR|Error|error|"/ {
    metrics {
      meter => "e_level"
      flush_interval => 5
      clear_interval =>10
      add_tag => ["metric_b"]
#       ignore_older_than => 3600
    }
  }
#  if "metric_a" in [tags] or "metric_b" in [tags]{
#    ruby {
#     code => "
#       files = even#['path']
#     "
#    }
#  }
}

output {
  if "metric_a" in [tags] and [w_level] {
    stdout {
      codec => line {
        format => "CDP-WARN Count: %{[w_level][count]} in file: %{['@@slogfile']}"['slogfile'] 
      }
    }
  }
}

At the very end of the filter add

if ! [path] {
    ruby { code => 'event["path"] = @@slogfile' }
}

Cool, Thank for you help, so my config works now, here it is:

input {
  file {
    path => [ '/home/fxea/ferchis_2.log', '/home/fxea/ferchis.log' ]
    start_position => beginning
    exclude => "*.gz"
#    ignore_older => 0
    type => "oozie"
  }
}

filter {
  grok {
    match => { "message" =>  "%{YEAR:year}\/%{MONTHNUM:month}\/%{MONTHDAY:day} %{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second} %{LOGLEVEL:loglevel} %{GREEDYDATA:method}: %{GREEDYDATA:messages}" }
    add_tag => "loglevel_parsed"
  }

# Drop if theres failure parsing the log
  if "_grokparsefailure" in [tags] {
    drop { }
  }

  if [loglevel] =~ /"|WARN|warn|Warn|"/ {
    ruby {
      code => '@@slogfile = event["path"]'
    }
    metrics {
      meter => "w_level"
      flush_interval => 5
      clear_interval => 10
      add_tag => ["metric_a"]
    }
  }
  if [loglevel] =~ /"|ERROR|Error|error|"/ {
    ruby {
      code => '@@slogfile = event["path"]'
    }
    metrics {
      meter => "e_level"
      flush_interval => 5
      clear_interval =>10
      add_tag => ["metric_b"]
#       ignore_older_than => 3600
    }
  }
  if "metric_a" in [tags] or "metric_b" in [tags] and ! [path] {
    ruby {
      code => 'event["path"] = @@slogfile'
    }
  }
}

output {
  
if "metric_b" in [tags] and [e_level][count] >6{
    
   stdout {
      codec => line {
         format => "CDP-ERROR Count: %{[e_level][count]} reference log file: %{[path]}"
      }
   }
  }
}

This seem to work the only issue that I see, is that the count seem to be wrong.
For example lets say in file /var/log/test.log I see like 30 times that the word WARN is writen in the file and /var/log/test1.log has like 10 times, however the count seem to be 3x that number Im not sure if is due to the way Im filtering. I wondering if that filter counts all the WARN/ERROR patterns that it finds it but in all the files and keep counting the event or if is the time that I set for flushing clear the event.

That's not going to work if you have multiple files. You might do better with an aggregate filter.

    aggregate {
        task_id => "%{path}"
        push_map_as_event_on_timeout => true
        timeout_task_id_field => "path"
        timeout => 10
        code => '
            map["errors"] ||= 0
            map["warnings"] ||= 0
            if event.get("message") =~ /error/i
                map["errors"] = map["errors"] + 1
            end
            if event.get("message") =~ /warn/i
                map["warnings"] = map["warnings"] + 1
            end
        '
    }

Thanks Again.
This actually works much better. Last thing is Im getting the following error when I try to compare values:

Settings: Default pipeline workers: 80
Pipeline main started
NoMethodError: undefined method `>=' for nil:NilClass
output_func at (eval):106
output_batch at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.1-java/lib/logstash/pipeline.rb:293
each at org/jruby/RubyArray.java:1613
inject at org/jruby/RubyEnumerable.java:852
output_batch at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.1-java/lib/logstash/pipeline.rb:287
worker_loop at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.1-java/lib/logstash/pipeline.rb:232
start_workers at /opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-2.3.1-java/lib/logstash/pipeline.rb:201

So I tried to mutate those fields to integer but it seems, that Im missing something:

    input {
  file {
    path => [ '/home/fxea/ferchis_2.log', '/home/fxea/ferchis.log' ]
    start_position => beginning
    exclude => "*.gz"
#    ignore_older => 0
    type => "oozie"
  }
}

filter {
  grok {
    match => { "message" =>  "%{YEAR:year}\/%{MONTHNUM:month}\/%{MONTHDAY:day} %{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second} %{LOGLEVEL:loglevel} %{GREEDYDATA:method}: %{GREEDYDATA:messages}" }
    add_tag => "loglevel_parsed"
  }

# Drop if theres failure parsing the log
  if "_grokparsefailure" in [tags] {
    drop { }
  }

  aggregate {
    task_id => "%{path}"
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "location"
    timeout => 5
    code => '
      map["errors"] ||= 0
      map["warnings"] ||= 0
        if event["message"] =~ /error/i
          map["errors"] = map["errors"] + 1
        end
        if event["message"] =~ /warn/i
          map["warnings"] = map["warnings"] + 1
        end
      '
  }
#  if [errors] or [warnings]{
#   mutate {
#     convert => {"errors" => "integer"
#                 "warnings" =>"integer"
#     }
#  }
# }
}

output {
if [event][message][errors] >= 2 {
    stdout {
      codec => line {
        format => "CDP-ERROR Count: %{[errors]} reference log file: %{[location]}"
       }
     }
     stdout {
       codec => rubydebug
    }
  }
#else {
#    stdout {
#     codec => rubydebug
#   }
#}
}

I have also change this if [event][message][errors] >= 2 { to: [errors] >= 2 { but same error

The filter is not dropping the events that it is aggregating (it could, you could add event.cancel as the last line in the code => ' ' option). So these are fed to the output, but do not have an errors field.

You could change the test in output to be

if [errors] and [errors] >= 2 {

Thanks now things look better.

One more thing, Im not sure if this work. But Im trying to this.

A- Count the amount of ERROR|WARN exception find per logfile in the last 12 hours and sent a summary through email, the content should have something like:

List of exception in the last 12 hours:

Number of WARN exception in /var/log/oozie/file1.log: 45
Number of WARN exception in /var/log/oozie/file2.log: 72
Number of ERROR exception in /var/log/oozie/file1.log: 45
Number of ERROR exception in /var/log/oozie/file2.log: 72

B- In case it detects a java stacktrace send it over slack like immediately
Java Exception found on /var/log/oozie/file2.log:

19/05/08 18:20:52 ERROR stoage.BlockManageMaste: Failed to emove boadcast 9 with emoveFomMaste = tue - Connection eset by pee ==========================================XXXXXX
java.io.IOException: Connection eset by pee
at sun.nio.ch.FileDispatcheImpl.ead0(Native Method)
at sun.nio.ch.SocketDispatche.ead(SocketDispatche.java:39)
at sun.nio.ch.IOUtil.eadIntoNativeBuffe(IOUtil.java:223)
at sun.nio.ch.IOUtil.ead(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.ead(SocketChannelImpl.java:380)
at io.netty.buffe.PooledUnsafeDiectByteBuf.setBytes(PooledUnsafeDiectByteBuf.java:288)
at io.netty.buffe.AbstactByteBuf.witeBytes(AbstactByteBuf.java:1106)
at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343)
at io.netty.channel.nio.AbstactNioByteChannel$NioByteUnsafe.ead(AbstactNioByteChannel.java:123)
at io.netty.channel.nio.NioEventLoop.pocessSelectedKey(NioEventLoop.java:645)
at io.netty.channel.nio.NioEventLoop.pocessSelectedKeysOptimized(NioEventLoop.java:580)
at io.netty.channel.nio.NioEventLoop.pocessSelectedKeys(NioEventLoop.java:497)
at io.netty.channel.nio.NioEventLoop.un(NioEventLoop.java:459ea

So far Im able to send it but it spam the channel like crazy, Any idea of what Im doing wrong or what can I do to fix this:

input {
  file {
    path => [ '/server/cdp/oozie/log/*/*.log', '/var/lib/oozie/log/*/*.log', '/var/lib/oozie/*.log' ]
    start_position => beginning
    exclude => [ "*.gz", "*.log.*" ]
    ignore_older => 43200
    type => "oozie"
    codec => multiline {
      pattern => "^%{YEAR}\/%{MONTHNUM}\/%{MONTHDAY} %{HOUR}:%{MINUTE}:%{SECOND} %{LOGLEVEL}"
      negate  => true
      what   => previous
    }
  }
}

filter {
  mutate {
    gsub => ["message", "r", ""]
  }
  grok {
    match => [ "message", "(?m)^%{YEAR:year}\/%{MONTHNUM:month}\/%{MONTHDAY:day} %{HOUR:hour}:%{MINUTE:minute}:%{SECOND:second} %{LOGLEVEL:loglevel} %{JAVACLASS:java_class}: %{GREEDYDATA:messages}" ]
    overwrite => ["message"]
    add_tag => "loglevel_parsed"
  }

# Drop if theres failure parsing the log
  if "_grokparsefailure" in [tags] {
    drop {}
  }

# Calculates and aggregates the count of each match in the file per file.
  if "multiline" not in [tags] and "loglevel_parsed" in [tags] {
    aggregate {
      task_id => "%{path}"
      push_map_as_event_on_timeout => true
      timeout_task_id_field => "location"
      timeout => 7200
      code => '
        map["errors"] ||= 0
        map["warnings"] ||= 0
        map["cdperrors"] ||= 0
        map["cdpwarnings"] ||= 0

        if event["messages"] =~ /cdperror/i
          map["cdperrors"] = map["cdperrors"] + 1
        end
        if event["messages"] =~ /cdpwarning/i
          map["cdpwarnings"] = map["cdpwarnings"] + 1
        end

        if event["loglevel"] =~ /error/i and event["messages"] !~ /cdperror/i
          map["errors"] = map["errors"] + 1
        end
        if event["loglevel"] =~ /warn/i and event["messages"] !~ /cdpwarning/i
          map["warnings"] = map["warnings"] + 1
        end
      '
    }
  }
  if "multiline" in [tags] and "loglevel_parsed" in [tags] {
    aggregate {
      task_id => "%{path}"
      push_map_as_event_on_timeout => false
      code => 'map["message"] = event["message"]'
#      code => 'event["message"] = map["message"]'
      map_action => "create_or_update"
    }
  }
}

output {
  elasticsearch {
    manage_template     => true
    template_name       => "oozie"
    template            => "/etc/logstash/templates/oozie.json"
    template_overwrite  => true
    index               => "cdp-%{+YYYY.MM.dd}"
    hosts               => ["dwh-edge001.atl2.turn.com:9200","dwh-edge002.atl2.turn.com:9200"]
    workers             => 1
  }

  if "multiline" in [tags] and "loglevel_parsed" in [tags] {
    slack {
      url => "https://hooks.slack.com/services"
      channel => "#elk_integration_test"
      format => "Java Exception found: `%{[path]}`: %{message}"
    }
  }

  if "loglevel_parsed" in [tags] and "multiline" not in [tags] {
    if [errors] and [errors] >= 10 {
      email {
        to => "WTF@logsforever.com"
        subject => "ERROR Events Detected %{[location]}"
        body => "Number of ERRORS: %{[errors]} in log file: %{[location]}"
      }
    }

    if [warnings] and [warnings]  >= 10 {
      email {
        to => "WTF@logsforever.com"
        subject => "WARNING Events Detected %{[location]}"
        body => "Number of WARNINGS: %{[warnings]} in log file: %{[location]}"
      }
    }

    if [cdperrors] and [cdperrors]  >= 10 {
      email {
        to => "WTF@logsforever.com"
        subject => "CDPERRORS Events Detected %{[location]}"
        body => "Number of CDPERRORS: %{[cdperrors]} in log file: %{[location]}"
      }
    }

    if [cdpwarnings] and [cdpwarnings]  >= 10 {
      email {
        to => "WTF@logsforever.com"
        subject => "CDPWARNINGS Events Detected %{[location]}"
        body => "Number of CDPWARNINGS: %{[cdpwarnings]} in log file: %{[location]}"
      }
    }
  }
}

thanks

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.