Multiline codec not expected bulk

My case is that I have a separate pipelines for each file. Here for "Numbers*" files, it doesn't work properly for me to put data into elastic. On input side I have files divided into 50_000 lines and with a name that is assigned to the pipeline name. If I add files from different days, logstash packs me in one bulk and sends with wrong date. Are you able to point out where the error is?
Every files has a date in the snapshot line. So it should close the file and get the next new. But why it doesn't work as expected. I've also tried option with "auto_flush_interval => 4", it didn't help.

NUMBERs_AutoExport_-a_20221029044502.txt
NUMBERs_AutoExport_-a_20221023044502.txt

# snapshot,68843601,20221023044502
# NUMBERs
a,b,c
d,e,f
# Type2
foo,1,2,3
bar,4,5,6
# DN Blocks
224135896,224135897,,,,,,,,,,,
224135896,224135897,,,,,,,,,,,
input {
    file {
        path => "/opt/data/input/Numbers_*.txt""
        sincedb_path => "/dev/null"
        start_position => beginning
        codec => multiline { pattern => "^#" negate => true what => previous multiline_tag => "" }
    }
}
filter {
    mutate { remove_field => [ "[event]", "log" ] }
     if [message] =~ /# snapshot/{
         dissect {
            mapping => {
                "[message]" => "# %{activity},%{val},%{time}"
            }
            remove_field => ["[message]"]
        }
        date {
                match => ["time", "yyyyMMddHHmmss"]
                timezone => "Europe/Paris"
                target => "timestamp"
            }
        ruby { code => '@@metadata = event.get("@timestamp")' }
        drop {}

    } else if "# NUMBERs" in [message] {
        mutate { add_field => { "eventType" => "NUMBERs" } }
        split { field => "message" }
        if [message] !~ /^#/ {
            csv { columns => [ "c1", "c2", "c3" ] }
        }  
 ruby { code => 'event.set("@timestamp", @@metadata)' }

    } else if "# Type2" in [message] {
        mutate { add_field => { "eventType" => "Type2" } }
        split { field => "message" }
    } else {
        mutate { add_field => { "eventType" => "Unrecognized" } }
    }
}

logsatsh.yml

log.level: info
config.reload.automatic: true
config.reload.interval: 30s
pipeline.ecs_compatibility: disabled
pipeline.workers: 48
pipeline.batch.size: 2000
pipeline.batch.delay: 50
pipeline.ordered: auto

double

Can anyone look at this case? the same behavior was observed with "pipeline.workers: 1"

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.