Hi
I'm facing with case with large file (~800MB) during transfer to logstash
Indeed this is a case where data doesn't match in order
This case has begun on (Csv plugin cooperating with multiplying pattern - #11 by Badger) @Badger Can You keep an eye once again?
at least I'm using such pipeline with current configuration(there I've decided to use PQ -persitent queue)
http.host: "0.0.0.0"
pipeline.workers: 1
pipeline.batch.size: 2000
pipeline.batch.delay: 50
pipeline.ordered: true
config.reload.automatic: true
xpack.monitoring.enabled: false
#xpack.management.pipeline.id: ["main"]
pipeline.ecs_compatibility: disabled
log.level: info
- pipeline.id: npdb
path.config: "/usr/share/logstash/pipeline/pipe1.yml"
queue.type: persisted
path.queue: /usr/share/logstash/data/queue/
queue.max_bytes: 2000mb
as test I'm uploading file on tcp port
input {
tcp { port => 12367
codec => multiline { pattern => "^#" negate => true what => "previous" multiline_tag => "" }
}
}
filter {
if [message] =~ "# 20" { drop{ } }
if [message] =~ "table" { drop{ } }
if [message] =~ "# number of Blocks" { drop{ } }
mutate { remove_field => [ "[event]", "log" ] }
if "# snapshot" in [message] {
dissect {
mapping => {
"[message]" => "# %{activity},%{val},%{time}"
}
remove_field => ["[message]"]
}
date {
match => ["time", "yyyyMMddHHmmss"]
timezone => "Europe/Paris"
}
ruby { code => '@@metadata = event.get("@timestamp")' }
# mutate { add_field => { "eventType" => "Header" } }
drop {}
} else if "# Network Entities" in [message] {
mutate { add_field => { "eventType" => "Network Entities" } }
split { field => "message" }
if [message] !~ /^#/ {
csv { columns => ["ID","Type","PCType","PC","GC","RI","SSN","CCGT","NTT","NNAI","NNP","DA","SRFIMSI"]
}
}
ruby { code => 'event.set("@timestamp", @@metadata)' }
} else if "# DNs" in [message] {
mutate { add_field => { "eventType" => "DNs" } }
split { field => "message" }
if [message] !~ /^#/ {
csv { columns => ["DN","IMS","PT","SP","RN","VMS","GRN","ASD","ST","NSDN","CGBL","CDBL"]
}
}
ruby { code => 'event.set("@timestamp", @@metadata)' }
} else if "# DN Blocks" in [message] {
mutate { add_field => { "eventType" => "DN Blocks" } }
split { field => "message" }
if [message] !~ /^#/ {
csv { columns => ["BDN","EDN","PT","SP","RN","VMS","GRN","ASD","ST","NSDN","CGBL","CDBL"]
}
}
ruby { code => 'event.set("@timestamp", @@metadata)' }
}
else {
mutate { add_field => {"eventType" => "Blocs"}}
split { field => "message" }
if [message] !~ /^#/ {
csv { columns => ["IM","SVN","WHITE","GRAY","BLACK"]
}
}
ruby { code => 'event.set("@timestamp", @@metadata)' }
}
mutate {
remove_field => [ "host", "count", "fields", "@version", "input_type", "source", "tags", "type", "time", "path", "activity", "val", "message", "port" ]
}
}
what is strange that the file consists a huge of data rows
but as I observed it was processes only 501 of hints for eventType -> DNs or Network Entities. whether I shorten the log or not. At least I've concluded that this pipeline diverges with data matching at some point, as if it gets lost after a certain number of processed records.
this is sample data https://filetransfer.io/data-package/SkuqBzeX#link
Thanks for Your insight.