My case is that I have a separate pipelines for each file. Here for "Numbers*" files, it doesn't work properly for me to put data into elastic. On input side I have files divided into 50_000 lines and with a name that is assigned to the pipeline name. If I add files from different days, logstash packs me in one bulk and sends with wrong date. Are you able to point out where the error is?
Every files has a date in the snapshot line. So it should close the file and get the next new. But why it doesn't work as expected. I've also tried option with "auto_flush_interval => 4", it didn't help.
NUMBERs_AutoExport_-a_20221029044502.txt
NUMBERs_AutoExport_-a_20221023044502.txt
# snapshot,68843601,20221023044502
# NUMBERs
a,b,c
d,e,f
# Type2
foo,1,2,3
bar,4,5,6
# DN Blocks
224135896,224135897,,,,,,,,,,,
224135896,224135897,,,,,,,,,,,
input {
file {
path => "/opt/data/input/Numbers_*.txt""
sincedb_path => "/dev/null"
start_position => beginning
codec => multiline { pattern => "^#" negate => true what => previous multiline_tag => "" }
}
}
filter {
mutate { remove_field => [ "[event]", "log" ] }
if [message] =~ /# snapshot/{
dissect {
mapping => {
"[message]" => "# %{activity},%{val},%{time}"
}
remove_field => ["[message]"]
}
date {
match => ["time", "yyyyMMddHHmmss"]
timezone => "Europe/Paris"
target => "timestamp"
}
ruby { code => '@@metadata = event.get("@timestamp")' }
drop {}
} else if "# NUMBERs" in [message] {
mutate { add_field => { "eventType" => "NUMBERs" } }
split { field => "message" }
if [message] !~ /^#/ {
csv { columns => [ "c1", "c2", "c3" ] }
}
ruby { code => 'event.set("@timestamp", @@metadata)' }
} else if "# Type2" in [message] {
mutate { add_field => { "eventType" => "Type2" } }
split { field => "message" }
} else {
mutate { add_field => { "eventType" => "Unrecognized" } }
}
}
logsatsh.yml
log.level: info
config.reload.automatic: true
config.reload.interval: 30s
pipeline.ecs_compatibility: disabled
pipeline.workers: 48
pipeline.batch.size: 2000
pipeline.batch.delay: 50
pipeline.ordered: auto