Grouping And Ordering Log is Posible?

I have Log with example :
(Case 1)
CHAN1 : 23:57:05:89 |Message Start
CHAN1 : 23:57:05:89 |Lorem
CHAN1 : 23:57:05:89 |Ipsum
CHAN1 : 23:57:05:89 |Dolor
CHAN99i : 23:57:05:89 |Message Start
CHAN99i : 23:57:05:89 |Lorem
CHAN1 : 23:57:05:89 |Sit
CHAN1 : 23:57:05:89 |Message End
CHAN99i : 23:57:05:89 |Message End

Expect :
CHAN1 : 23:57:05:89 |Message Start
CHAN1 : 23:57:05:89 |Lorem
CHAN1 : 23:57:05:89 |Ipsum
CHAN1 : 23:57:05:89 |Dolor
CHAN1 : 23:57:05:89 |Sit
CHAN1 : 23:57:05:89 |Message End
CHAN99i : 23:57:05:89 |Message Start
CHAN99i : 23:57:05:89 |Lorem
CHAN99i : 23:57:05:89 |Message End

so i can parse the data by using start message and end message, i guess it can with aggregate filter plugin which is in logtash,which will be like this :

{
 "CHAN" : "CHAN1" ,
 "Message" : "Message Start Lorem Ipsum Dolor Sit Message End"
},
{
 "CHAN" : "CHAN99i " ,
 "Message" : "Message Start Lorem  Message End"
}

and bad in real case the log looks like this I think if I can find a way in the first case maybe I will think about how to get this second way :frowning:

Case 2
CHAN1 : 23:57:05:89 |Message Start
CHAN1 : 23:57:05:89 |Lorem:008
CHAN1 : 23:57:05:89 |Ipsum
CHAN1 : 23:57:05:89 |Dolor
CHAN99i : 23:57:05:89 |Message Start
CHAN99i : 23:57:05:89 |Lorem
CHAN1 : 23:57:05:90 |Sit
CHAN1 : 23:57:05:90 |Lorem:008
CHAN1 : 23:57:05:90 |Message End

{
 "CHAN" : "CHAN1" ,
 "Message1" : "Message Start Lorem:008 Ipsum Dolor",
"Message2" : " Sit Lorem:008 Message End"
}

You will want something similar to example 1 for the aggregate filter. Something like this:

    grok { match => { "message" => "^%{WORD:[@metadata][task]} : %{TIME}:\d{2} \|%{GREEDYDATA:[@metadata][restOfLine]}" } }

    if "Message Start" in [message] {
        aggregate {
            task_id => "%{[@metadata][task]}"
            code => '
                map["message"] = ""
                event.cancel
            '
            map_action => "create"
        }
    } else if "Message End" in [message] {
        aggregate {
            task_id => "%{[@metadata][task]}"
            code => '
                event.set("message", map["message"].rstrip)
                event.set("channel", event.get("[@metadata][task]"))
            '
            map_action => "update"
            end_of_task => true
        }
    } else {
        aggregate {
            task_id => "%{[@metadata][task]}"
            code => '
                map["message"] += event.get("[@metadata][restOfLine]") + " "
                event.cancel
            '
            map_action => "update"
        }
    }

which will produce

{
       "message" => "Lorem Ipsum Dolor Sit",
    "@timestamp" => 2023-06-01T20:12:51.901061412Z,
      "@version" => "1",
       "channel" => "CHAN1"
}
{
       "message" => "Lorem",
    "@timestamp" => 2023-06-01T20:12:51.901249877Z,
      "@version" => "1",
       "channel" => "CHAN99i"
}

As always with aggregate you will need pipeline.workers 1 and pipeline.ordered to evaluate to true.

1 Like

Thank you in advance I have tried to make 2 pipelines one for grouping then I saved it in a file and one for merging, but I failed for grouping because the data became random, I gave up hehe and finally I made a grouping from the linux command and now the problem is I want to make mergers but always fail based on time
i think same this example

CHAN1 : 23:57:05:89 |Lorem
CHAN1 : 23:57:05:89 |Ipsum
CHAN1 : 23:57:05:89 |Dolor
CHAN1 : 23:58:05:90 |Lorem
CHAN1 : 23:58:05:90 |Ipsum
CHAN1 : 23:58:05:90 |And Random Text

[
{
 "channel" : "CHAN1",
"msg":"Lorem Ipsum Dolor"
},
{
 "channel" : "CHAN1",
"msg":"Lorem Ipsum And Random Text"
}
]

my config :

input {
  # file {
  #   path => /logstash-8.8.0/nelogs/logs1*.log"
  #   tags => ["ordering"]
  # }
  file {
    path => "/logstash-8.8.0/nelogs/new*.log"
  }
}
filter {
  # if "ordering" in [tags]{
    grok {
      match => { "message" => "%{GREEDYDATA:channel}: %{TIME:time} %{GREEDYDATA:msg}" }
    }
  #   prune {
  #       # whitelist_names => [ "channel","time","errmsg" ]
  #       blacklist_names => [ "@timestamp", "@source","message","@version","host","log","event" ]
  #   }
  # }else{
    #  filter {
      # json {
      #   source => "message"
      #   # target => "message"
      # }
    # }
     prune {
        whitelist_names => [ "channel","time","msg" ]
        # blacklist_names => [ "@timestamp", "@source","message","@version","host","log","event" ,"tags"]
    }
    # grok { match => { "message" => "^%{WORD:[@metadata][task]} : %{TIME}:\d{2} \|%{GREEDYDATA:[@metadata][restOfLine]}" } }
   
  aggregate {
    task_id => "%{time}"
    code => "map['msg'] += map['msg'];"
    push_map_as_event_on_timeout => true
    timeout_task_id_field => "time"
  }
    #   if "Lorem" == [msg] {
    #     aggregate {
    #         task_id => "%{[time]}"
    #         code => '
    #             map["msg"] = ""
    #             event.cancel
    #         '
    #         map_action => "create"
    #     }
    # } else if " Meesage End" in [msg] {
    #     aggregate {
    #         task_id => "%{[time]}"
    #         code => '
    #             event.set("msg", map["msg"].rstrip)
    #             event.set("time", event.get("[time]"))
    #         '
    #         map_action => "update"
    #         end_of_task => true
    #     }
    # } else {
    #     aggregate {
    #         task_id => "%{[time]}"
    #         code => '
    #             map["msg"] += event.get("[msg]") + " "
    #             event.cancel
    #         '
    #         map_action => "update"
    #     }
    # }
  }
# }  
output {
if "ordering" in [tags]{
   if "CHAN1" in [channel] {
    	file {
        	path => "/logstash-8.8.0/nelogs/output.log"
          codec => line { format => "%{channel}:%{time} %{msg}"}
          flush_interval => 0
    	}
   }

}else{
  file {
        	path => "/logstash-8.8.0/nelogs/new.txt"
    	}
}
  stdout { codec => rubydebug { metadata => true } }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.