aidensV
(aidensV)
June 1, 2023, 4:42pm
1
I have Log with example :
(Case 1)
CHAN1 : 23:57:05:89 |Message Start
CHAN1 : 23:57:05:89 |Lorem
CHAN1 : 23:57:05:89 |Ipsum
CHAN1 : 23:57:05:89 |Dolor
CHAN99i : 23:57:05:89 |Message Start
CHAN99i : 23:57:05:89 |Lorem
CHAN1 : 23:57:05:89 |Sit
CHAN1 : 23:57:05:89 |Message End
CHAN99i : 23:57:05:89 |Message End
Expect :
CHAN1 : 23:57:05:89 |Message Start
CHAN1 : 23:57:05:89 |Lorem
CHAN1 : 23:57:05:89 |Ipsum
CHAN1 : 23:57:05:89 |Dolor
CHAN1 : 23:57:05:89 |Sit
CHAN1 : 23:57:05:89 |Message End
CHAN99i : 23:57:05:89 |Message Start
CHAN99i : 23:57:05:89 |Lorem
CHAN99i : 23:57:05:89 |Message End
so i can parse the data by using start message and end message, i guess it can with aggregate filter plugin which is in logtash,which will be like this :
{
"CHAN" : "CHAN1" ,
"Message" : "Message Start Lorem Ipsum Dolor Sit Message End"
},
{
"CHAN" : "CHAN99i " ,
"Message" : "Message Start Lorem Message End"
}
and bad in real case the log looks like this I think if I can find a way in the first case maybe I will think about how to get this second way
Case 2
CHAN1 : 23:57:05:89 |Message Start
CHAN1 : 23:57:05:89 |Lorem:008
CHAN1 : 23:57:05:89 |Ipsum
CHAN1 : 23:57:05:89 |Dolor
CHAN99i : 23:57:05:89 |Message Start
CHAN99i : 23:57:05:89 |Lorem
CHAN1 : 23:57:05:90 |Sit
CHAN1 : 23:57:05:90 |Lorem:008
CHAN1 : 23:57:05:90 |Message End
{
"CHAN" : "CHAN1" ,
"Message1" : "Message Start Lorem:008 Ipsum Dolor",
"Message2" : " Sit Lorem:008 Message End"
}
You will want something similar to example 1 for the aggregate filter. Something like this:
grok { match => { "message" => "^%{WORD:[@metadata][task]} : %{TIME}:\d{2} \|%{GREEDYDATA:[@metadata][restOfLine]}" } }
if "Message Start" in [message] {
aggregate {
task_id => "%{[@metadata][task]}"
code => '
map["message"] = ""
event.cancel
'
map_action => "create"
}
} else if "Message End" in [message] {
aggregate {
task_id => "%{[@metadata][task]}"
code => '
event.set("message", map["message"].rstrip)
event.set("channel", event.get("[@metadata][task]"))
'
map_action => "update"
end_of_task => true
}
} else {
aggregate {
task_id => "%{[@metadata][task]}"
code => '
map["message"] += event.get("[@metadata][restOfLine]") + " "
event.cancel
'
map_action => "update"
}
}
which will produce
{
"message" => "Lorem Ipsum Dolor Sit",
"@timestamp" => 2023-06-01T20:12:51.901061412Z,
"@version" => "1",
"channel" => "CHAN1"
}
{
"message" => "Lorem",
"@timestamp" => 2023-06-01T20:12:51.901249877Z,
"@version" => "1",
"channel" => "CHAN99i"
}
As always with aggregate you will need pipeline.workers 1 and pipeline.ordered to evaluate to true.
1 Like
aidensV
(aidensV)
June 3, 2023, 1:11pm
3
Thank you in advance I have tried to make 2 pipelines one for grouping then I saved it in a file and one for merging, but I failed for grouping because the data became random, I gave up hehe and finally I made a grouping from the linux command and now the problem is I want to make mergers but always fail based on time
i think same this example
CHAN1 : 23:57:05:89 |Lorem
CHAN1 : 23:57:05:89 |Ipsum
CHAN1 : 23:57:05:89 |Dolor
CHAN1 : 23:58:05:90 |Lorem
CHAN1 : 23:58:05:90 |Ipsum
CHAN1 : 23:58:05:90 |And Random Text
[
{
"channel" : "CHAN1",
"msg":"Lorem Ipsum Dolor"
},
{
"channel" : "CHAN1",
"msg":"Lorem Ipsum And Random Text"
}
]
my config :
input {
# file {
# path => /logstash-8.8.0/nelogs/logs1*.log"
# tags => ["ordering"]
# }
file {
path => "/logstash-8.8.0/nelogs/new*.log"
}
}
filter {
# if "ordering" in [tags]{
grok {
match => { "message" => "%{GREEDYDATA:channel}: %{TIME:time} %{GREEDYDATA:msg}" }
}
# prune {
# # whitelist_names => [ "channel","time","errmsg" ]
# blacklist_names => [ "@timestamp", "@source","message","@version","host","log","event" ]
# }
# }else{
# filter {
# json {
# source => "message"
# # target => "message"
# }
# }
prune {
whitelist_names => [ "channel","time","msg" ]
# blacklist_names => [ "@timestamp", "@source","message","@version","host","log","event" ,"tags"]
}
# grok { match => { "message" => "^%{WORD:[@metadata][task]} : %{TIME}:\d{2} \|%{GREEDYDATA:[@metadata][restOfLine]}" } }
aggregate {
task_id => "%{time}"
code => "map['msg'] += map['msg'];"
push_map_as_event_on_timeout => true
timeout_task_id_field => "time"
}
# if "Lorem" == [msg] {
# aggregate {
# task_id => "%{[time]}"
# code => '
# map["msg"] = ""
# event.cancel
# '
# map_action => "create"
# }
# } else if " Meesage End" in [msg] {
# aggregate {
# task_id => "%{[time]}"
# code => '
# event.set("msg", map["msg"].rstrip)
# event.set("time", event.get("[time]"))
# '
# map_action => "update"
# end_of_task => true
# }
# } else {
# aggregate {
# task_id => "%{[time]}"
# code => '
# map["msg"] += event.get("[msg]") + " "
# event.cancel
# '
# map_action => "update"
# }
# }
}
# }
output {
if "ordering" in [tags]{
if "CHAN1" in [channel] {
file {
path => "/logstash-8.8.0/nelogs/output.log"
codec => line { format => "%{channel}:%{time} %{msg}"}
flush_interval => 0
}
}
}else{
file {
path => "/logstash-8.8.0/nelogs/new.txt"
}
}
stdout { codec => rubydebug { metadata => true } }
}
system
(system)
Closed
July 1, 2023, 1:11pm
4
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.