Issue with provision large file to logstash

Hi
I'm facing with case with large file (~800MB) during transfer to logstash
Indeed this is a case where data doesn't match in order
This case has begun on (Csv plugin cooperating with multiplying pattern - #11 by Badger) @Badger Can You keep an eye once again?

at least I'm using such pipeline with current configuration(there I've decided to use PQ -persitent queue)

http.host: "0.0.0.0"
pipeline.workers: 1
pipeline.batch.size: 2000
pipeline.batch.delay: 50
pipeline.ordered: true
config.reload.automatic: true
xpack.monitoring.enabled: false
#xpack.management.pipeline.id: ["main"]
pipeline.ecs_compatibility: disabled
log.level: info
- pipeline.id: npdb
  path.config: "/usr/share/logstash/pipeline/pipe1.yml"
  queue.type: persisted
  path.queue: /usr/share/logstash/data/queue/
  queue.max_bytes: 2000mb

as test I'm uploading file on tcp port

input {
  tcp { port => 12367
        codec => multiline { pattern => "^#" negate => true what => "previous"  multiline_tag => "" }

      }
	
}





filter {
    if [message] =~ "# 20" { drop{ } }
    if [message] =~ "table" { drop{ } }
    if [message] =~ "# number of Blocks" { drop{ } }
    mutate { remove_field => [ "[event]", "log" ] }
    if "# snapshot" in [message] {
	 dissect {
            mapping => {
                "[message]" => "# %{activity},%{val},%{time}"
            }
            remove_field => ["[message]"]
        }
        date {
                match => ["time", "yyyyMMddHHmmss"]
                timezone => "Europe/Paris"
            }
	ruby { code => '@@metadata = event.get("@timestamp")' }
         # mutate { add_field => { "eventType" => "Header" } }
	drop {}
    } else if "# Network Entities" in [message] {
        mutate { add_field => { "eventType" => "Network Entities" } }
        split { field => "message" }
        if [message] !~ /^#/ {
            csv { columns => ["ID","Type","PCType","PC","GC","RI","SSN","CCGT","NTT","NNAI","NNP","DA","SRFIMSI"] 
		}
	}
	ruby { code => 'event.set("@timestamp", @@metadata)' }

    } else if "# DNs" in [message] {
        mutate { add_field => { "eventType" => "DNs" } }
        split { field => "message" }
        if [message] !~ /^#/ {
            csv { columns => ["DN","IMS","PT","SP","RN","VMS","GRN","ASD","ST","NSDN","CGBL","CDBL"] 
		}
        }
	ruby { code => 'event.set("@timestamp", @@metadata)' }
    } else if "# DN Blocks" in [message] {
        mutate { add_field => { "eventType" => "DN Blocks" } }
        split { field => "message" }
        if [message] !~ /^#/ {
            csv { columns => ["BDN","EDN","PT","SP","RN","VMS","GRN","ASD","ST","NSDN","CGBL","CDBL"] 
		}
	}
	ruby { code => 'event.set("@timestamp", @@metadata)' }
    } 
    
    else  {
        mutate { add_field => {"eventType" => "Blocs"}}
        split { field => "message" }
        if [message] !~ /^#/ {
            csv { columns => ["IM","SVN","WHITE","GRAY","BLACK"] 
          
		} 
        } 
	ruby { code => 'event.set("@timestamp", @@metadata)' }
    }
mutate {
        remove_field => [ "host", "count", "fields", "@version", "input_type", "source", "tags", "type", "time", "path", "activity", "val", "message", "port" ]
        }
}

what is strange that the file consists a huge of data rows
but as I observed it was processes only 501 of hints for eventType -> DNs or Network Entities. whether I shorten the log or not. At least I've concluded that this pipeline diverges with data matching at some point, as if it gets lost after a certain number of processed records.
this is sample data https://filetransfer.io/data-package/SkuqBzeX#link
Thanks for Your insight.

I have some doubts that multiline_codec_max_lines_reached ... ?
I think that multiline codec is not the solution for large input of row.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.