Hi
I have a problem that sometime during uploading to elastic data with eventType number has a difrent date then it has in the file, This is suspicious that metadata has been changed somewhere, I guess that logstash are reading the next file (from the day ahead). What actually happens is, it continues to parse the next file before reading the last line of the previous one.
this is my pipeline:
input {
file {
mode => read
path => "/opt/data/input/*.txt"
sincedb_path => "/dev/null"
#sincedb_path => "/usr/share/logstash/data/sincedb/sincedb"
start_position => beginning
file_completed_action => "log_and_delete"
file_completed_log_path => "/opt/data/logstash_files/fin_eir.log"
#sincedb_write_interval => 3
codec => multiline { pattern => "^#" negate => true max_lines => 50000 what => previous multiline_tag => "" }
}
}
filter {
mutate { remove_field => [ "[event]", "log" ] }
if "# snapshot" in [message] {
dissect {
mapping => {
"[message]" => "# %{activity},%{val},%{time}"
}
remove_field => ["[message]"]
}
date {
match => ["time", "yyyyMMddHHmmss"]
timezone => "Europe/Paris"
}
ruby { code => '@@metadata = event.get("@timestamp")' }
# mutate { add_field => { "eventType" => "Header" } }
drop {}
} else if "# Network Entities" in [message] {
mutate { add_field => { "eventType" => "Network Entities" } }
split { field => "message" }
if [message] !~ /^#/ {
csv { columns => ["ID","Type","PCType","PC","GC","RI","SSN","CCGT","NTT","NNAI","NNP","DA","SRFIMSI"]
}
}
ruby { code => 'event.set("@timestamp", @@metadata)' }
} else if "# DNs" in [message] {
mutate { add_field => { "eventType" => "DNs" } }
split { field => "message" }
if [message] !~ /^#/ {
csv { columns => ["DN","IMSI","PT","SP","RN","VMS","GRN","ASD","ST","NSDN","CGBL","CDBL"]
}
}
ruby { code => 'event.set("@timestamp", @@metadata)' }
} else if "# DN Blocks" in [message] {
mutate { add_field => { "eventType" => "DN Blocks" } }
split { field => "message" }
if [message] !~ /^#/ {
csv { columns => ["BDN","EDN","PT","SP","RN","VMS","GRN","ASD","ST","NSDN","CGBL","CDBL"]
}
}
ruby { code => 'event.set("@timestamp", @@metadata)' }
} else if "# NUMBERs" in [message] {
mutate { add_field => {"eventType" => "NUMBERs"}}
split { field => "message" }
if [message] !~ /^#/ {
csv { columns => ["NUMBERs","SVN","WHITE","GRAY","BLACK"]
}
}
ruby { code => 'event.set("@timestamp", @@metadata)' }
} else {
mutate { add_field => { "eventType" => "IMEI Blocks" } }
ruby { code => 'event.set("@timestamp", @@metadata)' }
}
mutate {
remove_field => [ "host", "count", "fields", "@version", "input_type", "source", "tags", "type", "time", "path", "activity", "val", "message"]
}
}
and some part of file
# snapshot,68813395,20221018044504
# NUMBERs
000008018,0,n,n,y
000010040,0,n,n,y
000060532,0,n,n,y
large file was divided into pieces with the maximum number of 50 000 lines