Why input plugin "file" doesn't read lines in sequence?

Thanks a lot for taking your time read my topic.

When I use "file" input plugin to read the following csv file, I found it doesn't always read lines in sequence.
(My purpose is check if the value of field "Sensornr" in the file already exist in the past, I need check both the current read batch and the old ES data)
File Content

rec1,1,5,5
rec1,2,2,2
rec1,3,44,44
rec1,4,33,22
rec2,5,66,55
rec2,6,4,4

The fields was mapped in the following segment:

 csv {
    separator => ","
    skip_empty_rows => true
    columns => [
        "Sensornr",
        "serial_nr",
        "test2",
        "test3"
    ]
    convert => {
        "serial_nr" => "float"
        "test2" => "float"
        "test3" => "float"
    }
}

and the "file" input plugin was configured like following:

file{
    path => ["D:/test.csv"]
    mode =>"tail"
    start_position => "beginning"
    close_older => "24 hour"
    sincedb_path => "meta/test"
}

Process of checking if the new fetched field already existed in the current batch or ES history:

# check if the "Sensornr" exist in current batch
throttle {
    before_count => 0
    after_count => 1
    period => 10
    max_age => 20
    key => "%{Sensornr}"
    add_tag => "retest_in_this_batch"
}
if "retest_in_this_batch" in [tags] {
    mutate{
        add_field => {"FirstIn"=> "N"}
    }
}
# check if the "Sensornr" exist in ES old record
else{
    elasticsearch {
        hosts => "127.0.0.1:9200"
        index => "logstash-test2019"
        query_template => "esquery.json"
        result_size => 1000
        aggregation_fields=> { "types_count" => "es_count"}
    }
    if [es_count][value] != 0 {
        mutate{	
        add_field => {"FirstIn"=> "N" }			
        }
    }else{
        mutate{		
            add_field => {
                "FirstIn"=> "Y"
            }			
        }
    }
}

and the esquery.json file was configured like this:

{
    "query":{
        "match":{
            "Sensornr":"%{[Sensornr]}"
        }
    },
    "aggs" : {    
        "types_count" : {
            "value_count" : { "field" : "Sensornr.keyword" }
        }
    }
}

But I got the following result:

Jan 1, 2020 @ 11:39:52.993	rec1	1	Jan 1, 2020 @ 11:39:52.993	0	Y	 - 
Jan 1, 2020 @ 11:39:52.994	rec1	2	Jan 1, 2020 @ 11:39:52.994	 - 	N	retest_in_this_batch
Jan 1, 2020 @ 11:39:52.994	rec1	3	Jan 1, 2020 @ 11:39:52.994	 - 	N	retest_in_this_batch
Jan 1, 2020 @ 11:39:52.995	rec1	4	Jan 1, 2020 @ 11:39:52.995	 - 	N	retest_in_this_batch
Jan 1, 2020 @ 11:40:29.116	rec2	5	Jan 1, 2020 @ 11:40:29.116	 - 	N	retest_in_this_batch
Jan 1, 2020 @ 11:40:29.117	rec2	6	Jan 1, 2020 @ 11:40:29.117	0	Y	 - 

For the 4 records of "Sensornr = rec1", 1st record was read before other 3 records, which meet my expect.
But the 2 records of "Sensornr = rec2" has problem, "serial_nr"=6 was marked with "Y" means it was not throttled by "throttle filter", which means it was read earlier than "serial_nr"=5.
Does it mean logstash "file input" plugin doesn't always read line in sequence?

Thanks.

Have you tried setting before_count to -1? According to the documentation this is the correct method to never throttle the first occurrence; setting to 0 could have unexpected behaviour.

Also, what is the exact output of the esquery.json file and what is contained in [es_count][value]? Are you sure checking for !=0 will accurately tell you if there has been a previous event with the same Sensornr?

1 Like

logstash will re-order lines unless you have pipeline.workers set to 1. If pipeline.java_execution is enabled (which became the default in v7.0) then logstash will re-order events even with pipeline.workers set to 1, so you need to disable that too.

1 Like

Thanks for your suggestion. I tried setting "before_count" to -1, using following setting of throttle:

	throttle {
	before_count => -1
	after_count => 1
	period => 1
	max_age => 20
	key => "%{Sensornr}"
	add_tag => "retest_in_this_batch"
}

but I got the result like this:

	  Time 	          serial_nr   Sensornr   	FirstIn  
Jan 2, 2020 @ 14:19:52.865	10	aa1	Y
Jan 2, 2020 @ 14:19:53.126	20	aa1	Y
Jan 2, 2020 @ 14:19:53.170	30	aa1	N
Jan 2, 2020 @ 14:19:53.171	40	aa1	N
Jan 2, 2020 @ 14:19:53.172	50	aa1	N
Jan 2, 2020 @ 14:19:53.173	60	bb1	Y
Jan 2, 2020 @ 14:19:53.174	70	bb1	N
Jan 2, 2020 @ 14:19:53.175	80	bb1	N
Jan 2, 2020 @ 14:19:53.176	90	bb1	N
Jan 2, 2020 @ 14:19:53.177	100	bb1	N
Jan 2, 2020 @ 14:19:53.178	110	cc1	Y
Jan 2, 2020 @ 14:19:53.179	120	cc1	N
Jan 2, 2020 @ 14:19:53.180	130	cc1	N
Jan 2, 2020 @ 14:19:53.182	140	cc1	N
Jan 2, 2020 @ 14:19:53.183	150	cc1	N
Jan 2, 2020 @ 14:19:53.183	160	dd1	Y
Jan 2, 2020 @ 14:19:53.184	170	dd1	N
Jan 2, 2020 @ 14:19:53.185	180	dd1	N
Jan 2, 2020 @ 14:19:53.186	190	dd1	N
Jan 2, 2020 @ 14:19:53.187	200	dd1	N	

It doesn't help for marking duplicated value.

For your question about esquery.json output, here is the raw out put from the console:

{"host":"CFL0494W","message":"dd1,190,6,6\r","@timestamp":"2020-01-02T06:19:53.186Z","serial_nr":190.0,"test2":6.0,"tags":[],"es_count":{"value":0},"FirstIn":"N","@version":"1","path":"D:/test.csv","Sensornr":"dd1","test3":6.0}

When I input a new line in the csv file after past records put into ES:

bb1,250,2,1

I can get raw output from console:

{"path":"D:/test.csv","es_count":{"value":5},"Sensornr":"bb1","test2":2.0,"host":"CFL0494W","@timestamp":"2020-01-02T06:41:18.601Z","@version":"1","serial_nr":250.0,"test3":1.0,"FirstIn":"N","message":"bb1,250,2,1\r"}

you can see from above, [es_count][value] indeed got the count from ES about the Sensornr.

PS: my csv file content:

aa1,10,1,1
aa1,20,1,1
aa1,30,1,1
aa1,40,2,2
aa1,50,2,2
bb1,60,2,2
bb1,70,2,2
bb1,80,2,3
bb1,90,2,2
bb1,100,2,2
cc1,110,3,3
cc1,120,4,4
cc1,130,4,4
cc1,140,5,6
cc1,150,6,6
dd1,160,6,6
dd1,170,6,6
dd1,180,6,6
dd1,190,6,6
dd1,200,6,6

Thanks a lot for feedback

I tried again and I found what you mentioned:

	throttle {
	before_count => 0
	after_count => 1
	period => 1
	max_age => 20
	key => "%{Sensornr}"
	add_tag => "retest_in_this_batch"
}	

CSV file:

qq-1,10,35,55
qq-1,11,35,55
qq-1,12,35,55
qq-1,13,35,55
qd-2,14,35,55
qd-2,15,35,55
qd-2,16,35,55
qd-2,17,6,6
qd-3,18,6,6
qd-3,19,6,6
qd-3,20,8,8
qd-3,21,8,8
qd-4,22,8,8
qd-4,23,8,8
qd-4,24,2,1
qd-4,25,35,55
qd-5,26,35,55
qd-5,27,35,55
qd-5,28,35,66
qd-5,29,35,66	

Result:

Time 	Sensornr  	serial_nr   	FirstIn  
Jan 2, 2020 @ 14:54:57.613	qq-1	10	Y
Jan 2, 2020 @ 14:54:57.615	qq-1	11	N
Jan 2, 2020 @ 14:54:57.616	qq-1	12	N
Jan 2, 2020 @ 14:54:57.618	qq-1	13	N
Jan 2, 2020 @ 14:54:57.619	qd-2	14	Y
Jan 2, 2020 @ 14:54:57.619	qd-2	15	N
Jan 2, 2020 @ 14:54:57.621	qd-2	16	N
Jan 2, 2020 @ 14:54:57.622	qd-2	17	N
Jan 2, 2020 @ 14:54:57.622	qd-3	18	Y
Jan 2, 2020 @ 14:54:57.623	qd-3	19	N
Jan 2, 2020 @ 14:54:57.623	qd-3	20	N
Jan 2, 2020 @ 14:54:57.624	qd-3	21	N
Jan 2, 2020 @ 14:54:57.625	qd-4	22	N
Jan 2, 2020 @ 14:54:57.626	qd-4	23	N
Jan 2, 2020 @ 14:54:57.627	qd-4	24	N
Jan 2, 2020 @ 14:54:57.628	qd-4	25	N
Jan 2, 2020 @ 14:54:57.629	qd-5	26	N
Jan 2, 2020 @ 14:54:57.630	qd-5	27	N
Jan 2, 2020 @ 14:54:57.631	qd-5	28	N
Jan 2, 2020 @ 14:54:57.632	qd-5	29	N

Thanks a lot and I will try it.

I have updated the pipeline configuration according your suggestion , until now I didn't found re-ordered lines.
And I will monitor it for some days, thanks :grinning:

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.