Aggregation Filter 2 logs same index

Dear Comminuty

I need your help to fix my issue.
i have 2 logs in same index that should be aggregate to view all information on the Data Table visualize

input {
udp {
port => 1514
type => "alcatel"
}
}

filter {
if [type] == "alcatel" {
grok {

match => { "message" => [ "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:(%{POSINT:syslog_pid}))\s+\w+:\s+[\w+](?<status>\s+Authentication\s+failed):\s+\w+ %{DATA:job_id},",
"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:(%{POSINT:syslog_pid}))\s+\w+:\s+[\w+](?<status>\s+Session)%{DATA:job_id},\s+\w+\s+\w+\s+%{SYSLOGHOST:srcip}",
"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:(%{POSINT:syslog_pid}))\s+\w+:\s+[\w+](?<status>\s+Authentication\s+succeed):\s+\w+ %{DATA:job_id}," ] }
add_field => [ "received_at", "%{@timestamp}" ]
add_tag => "Alcatel"
}

if "_grokparsefailure" in [tags] {
drop {}
}

aggregate {
task_id => "%{job_id}"
code => "
map['syslog_timestamp'] ||= event.get('syslog_timestamp')
map['syslog_hostname'] ||= event.get('syslog_hostname')
map['syslog_program'] ||= event.get('syslog_program')
map['syslog_pid'] ||= event.get('syslog_pid')
map['status'] ||= event.get('status')
map['srcip'] ||= event.get('srcip')
map['job_id'] ||= event.get('job_id')
"
timeout => 2
timeout_tags => ['aggregated']
map_action => 'create_or_update'
push_map_as_event_on_timeout => true
}
mutate { gsub => [ "status", "^ ", "" ] }
}
}

I would like to generate only one logs based on job_id, because in first logs miss the src ip address and the second logs miss the authentication response.

For example I received these log:

Log 1
<78>Aug 03 19:04:41 mi-sg12-sw-01 SESSION(71) Data: [SES] Authentication failed: Session 13, access type Telnet
Log 2
Aug 03 22:25:19 mi-sg12-sw-01 SESSION(71) Data: [SES] Session 13, IP address 10.3.1.4

Log finally:
Aug 03 19:04:41 mi-sg12-sw-01 Authentication failed 13 IP Address 10.3.1.4

What is your question?

I updated the first post.
Thanks.

logstash will not even start with those grok patterns...

:exception=>#<RegexpError: undefined group option: /(?<SYSLOGTIMESTAMP:syslog_timestamp>(?:\b(?:[Jj]an(?:uary|uar)?|[Ff]eb(?:ruary|ruar)?|[Mm](?:a|ä)?r(?:ch|z)?|[Aa]pr(?:il)?|[Mm]a(?:y|i)?|[Jj]un(?:e|i)?|[Jj]ul(?:y)?|[Aa]ug(?:ust)?|[Ss]ep(?:tember)?|[Oo](?:c|k)?t(?:ober)?|[Nn]ov(?:ember)?|[Dd]e(?:c|z)(?:ember)?)\b) +(?:(?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])) (?:(?!<[0-9])(?:(?:2[0123]|[01]?[0-9])):(?:(?:[0-5][0-9]))(?::(?:(?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)))(?![0-9]))) (?<SYSLOGHOST:syslog_hostname>(?:(?:(?:(?:(?:((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?)|(?:(?<![0-9])(?:(?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5])[.](?:[0-1]?[0-9]{1,2}|2[0-4][0-9]|25[0-5]))(?![0-9]))))|(?:\b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b))))) (?<DATA:syslog_program>.*?)(?:\((?<POSINT:syslog_pid>\b(?:[1-9][0-9]*)\b)\))\s+\w+:\s+[\w+](?\s+Authentication\s+failed):\s+\w+ (?<DATA:job_id>.*?),/m>,

You will need to fix those before you worry about the aggregate. Try

            "message" => [
"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}\(%{POSINT:syslog_pid}\)\s+\w+:\s+\[\w+\]\s+Authentication\s+failed:\s+\w+ %{DATA:job_id},",
"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}\(%{POSINT:syslog_pid}\)\s+\w+:\s+\[\w+\]\s+Session%{DATA:job_id},\s+\w+\s+\w+\s+%{SYSLOGHOST:srcip}",
"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}\(%{POSINT:syslog_pid}\)\s+\w+:\s+\[\w+\]\s+Authentication\s+succeed:\s+\w+ %{DATA:job_id},"
            ]

Unfortunately not worked the logs are not aggregated.

and now miss the field status with the authentication response

the full configuration now is:

filter {
  if [type] == "alcatel" {
    grok {
      match => { "message" => [
"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}\(%{POSINT:syslog_pid}\)\s+\w+:\s+\[\w+\]\s+Authentication\s+failed:\s+\w+ %{DATA:job_id},",
"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}\(%{POSINT:syslog_pid}\)\s+\w+:\s+\[\w+\]\s+Session%{DATA:job_id},\s+\w+\s+\w+\s+%{SYSLOGHOST:srcip}",
"%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}\(%{POSINT:syslog_pid}\)\s+\w+:\s+\[\w+\]\s+Authentication\s+succeed:\s+\w+ %{DATA:job_id}," ]
               }
      add_field => [ "received_at", "%{@timestamp}" ]
      add_tag => "Alcatel"
     }
      mutate { gsub => [ "status", "^ ", "" ] } 

    aggregate {
      task_id => "%{job_id}"
       code => "
        map['syslog_timestamp'] ||= event.get('syslog_timestamp')
        map['syslog_hostname'] ||= event.get('syslog_hostname')
        map['syslog_program'] ||= event.get('syslog_program')
        map['syslog_pid'] ||= event.get('syslog_pid')
        map['status'] ||= event.get('status')
        map['srcip'] ||= event.get('srcip')
        map['job_id'] ||= event.get('job_id')
        "
      timeout => 2
      timeout_tags => ['aggregated']
      map_action => 'create_or_update'
      push_map_as_event_on_timeout => true
    }
  if "_grokparsefailure" in [tags] {
    drop {}
  }
 }
}

I do not know what to say. With the grok changes I suggested and your original configuration I get the two parsed log messages and a couple of seconds later I get

{
"syslog_timestamp" => "Aug 03 22:25:19",
      "syslog_pid" => "71",
          "status" => nil,
        "@version" => "1",
            "tags" => [
    [0] "aggregated"
],
           "srcip" => "10.3.1.4",
 "syslog_hostname" => "mi-sg12-sw-01",
  "syslog_program" => "SESSION",
          "job_id" => " 13",
      "@timestamp" => 2020-08-03T21:39:32.389Z
}

Could you give me the command that you use to generate it?
BTW also in your logs miss the field with "authentication failed", please show the below logs.

Log 1
<78>Aug 03 19:04:41 mi-sg12-sw-01 SESSION(71) Data: [SES] Authentication failed: Session 13, access type Telnet
Log 2
Aug 03 22:25:19 mi-sg12-sw-01 SESSION(71) Data: [SES] Session 13, IP address 10.3.1.4

Log finally:
Aug 03 19:04:41 mi-sg12-sw-01 Authentication failed 13 IP Address 10.3.1.4

That is probably because you never grok a field called [status]

Show the first post, to generate the status field use "(?<status>\s+Authentication\s+failed)"
BTW could you send me the command that you use to generate your output?

Except that does not appear in your first post.

The data I posted was generated by

output { stdout { codec => rubydebug } }

My output is:

          "syslog_pid" => "71",
                "type" => "alcatel",
      "syslog_program" => "SESSION",
              "status" => "Authentication succeed",
            "@version" => "1",
                "host" => "10.12.100.1",
             "message" => "<78>Aug 04 02:38:03 mi-sg12-sw-01 SESSION(71) Data: [SES] Authentication succeed: Session 11, access type Telnet\n\u0000",
    "syslog_timestamp" => "Aug 04 02:38:03",
          "@timestamp" => 2020-08-04T00:38:03.106Z,
     "syslog_hostname" => "mi-sg12-sw-01",
              "job_id" => "11",
         "received_at" => "2020-08-04T00:38:03.106Z"
}
{
          "syslog_pid" => "71",
                "type" => "alcatel",
      "syslog_program" => "SESSION",
            "@version" => "1",
                "host" => "10.12.100.1",
             "message" => "<78>Aug 04 02:38:03 mi-sg12-sw-01 SESSION(71) Data: [SES] Session 11, IP address 10.3.1.4\n\u0000",
    "syslog_timestamp" => "Aug 04 02:38:03",
               "srcip" => "10.3.1.4",
          "@timestamp" => 2020-08-04T00:38:03.107Z,
     "syslog_hostname" => "mi-sg12-sw-01",
              "job_id" => "11",
         "received_at" => "2020-08-04T00:38:03.107Z"
}
{
                "tags" => [
        [0] "Alcatel"
    ],
          "syslog_pid" => "71",
      "syslog_program" => "SESSION",
              "status" => "Authentication succeed",
                "host" => "10.12.100.1",
            "@version" => "1",
    "syslog_timestamp" => "Aug 04 02:38:03",
               "srcip" => "10.3.1.4",
          "@timestamp" => 2020-08-04T00:38:09.878Z,
     "syslog_hostname" => "mi-sg12-sw-01",
              "job_id" => "11"

I don't understand because each time that restart logstash the output change:

        [0] "Alcatel"
    ],
          "syslog_pid" => "71",
      "syslog_program" => "SESSION",
              "status" => "Authentication succeed",
                "host" => "10.12.100.1",
            "@version" => "1",
    "syslog_timestamp" => "Aug 04 02:38:03",
               "srcip" => "10.3.1.4",
          "@timestamp" => 2020-08-04T00:38:09.878Z,
     "syslog_hostname" => "mi-sg12-sw-01",
              "job_id" => "11"

Reboot 1

               "srcip" => "10.3.1.4",
              "status" => "Authentication failed",
            "@version" => "1",
     "syslog_hostname" => "mi-sg12-sw-01",
              "job_id" => "12",
          "@timestamp" => 2020-08-04T00:39:51.135Z,
          "syslog_pid" => "71",
      "syslog_program" => "SESSION",
                "tags" => [
        [0] "Alcatel"
    ],
    "syslog_timestamp" => "Aug 04 02:39:44",
                "host" => "10.12.100.1"

Reboot 2:

     "syslog_hostname" => "mi-sg12-sw-01",
                "tags" => [
        [0] "Alcatel"
    ],
            "@version" => "1",
          "@timestamp" => 2020-08-04T00:42:27.724Z,
    "syslog_timestamp" => "Aug 04 02:42:21",
          "syslog_pid" => "71",
               "srcip" => "10.3.1.4",
              "job_id" => "13",
              "status" => "Authentication succeed",
                "host" => "10.12.100.1",
      "syslog_program" => "SESSION"

Reboot 3

                "host" => "10.12.100.1",
      "syslog_program" => "SESSION",
          "syslog_pid" => "71",
               "srcip" => "10.3.1.4",
    "syslog_timestamp" => "Aug 04 02:43:34",
              "job_id" => "14",
              "status" => "Authentication succeed",
                "tags" => [
        [0] "Alcatel"
    ],
          "@timestamp" => 2020-08-04T00:43:38.797Z,
     "syslog_hostname" => "mi-sg12-sw-01",
            "@version" => "1"

I think that is not normal, BTW from kibana under discovery i saw only the specific log not the aggregate.

Fixed the issue, below the correct configuration, i hope that could be useful for other member of the community.

input {
  udp {
    port => 1514
    type => "alcatel"
  }
}

filter {
  if [type] == "alcatel" {
    grok {

      match => { "message" => [ "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\(%{POSINT:syslog_pid}\))\s+\w+:\s+\[\w+\](?\<status\>\s+Authentication\s+failed):\s+\w+ %{DATA:job_id},",
                                "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\(%{POSINT:syslog_pid}\))\s+\w+:\s+\[\w+\](?\<status\>\s+Authentication\s+succeed):\s+\w+ %{DATA:job_id},",
                                "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\(%{POSINT:syslog_pid}\))\s+\w+:\s+\[\w+\]\s+\w+%{DATA:job_id},\s+\w+\s+\w+\s+%{SYSLOGHOST:srcip}" ] }                 
      add_field => [ "received_at", "%{@timestamp}" ]
      #add_tag => "Alcatel"
     }
      mutate { gsub => [ "status", "^ ", "" ] } 
      mutate { gsub => [ "job_id", "^ ", "" ] }
     if "_grokparsefailure" in [tags] {
       drop {}
     }
    aggregate {
      task_id => "%{job_id}"
       code => "
        map['received_at'] ||= event.get('received_at') 
        map['type'] ||= event.get('type')
        map['syslog_timestamp'] ||= event.get('syslog_timestamp')
        map['syslog_hostname'] ||= event.get('syslog_hostname')
        map['syslog_program'] ||= event.get('syslog_program')
        map['syslog_pid'] ||= event.get('syslog_pid')
        map['status'] ||= event.get('status')
        map['srcip'] ||= event.get('srcip')
        map['host'] ||= event.get('host')
        map['job_id'] ||= event.get('job_id')
        "
      timeout => 2
      timeout_tags => ['Alcatel']
      map_action => 'create_or_update'
      push_map_as_event_on_timeout => true
    }
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.