Thanks a lot for taking your time read my topic.
When I use "file" input plugin to read the following csv file, I found it doesn't always read lines in sequence.
(My purpose is check if the value of field "Sensornr" in the file already exist in the past, I need check both the current read batch and the old ES data)
File Content
rec1,1,5,5
rec1,2,2,2
rec1,3,44,44
rec1,4,33,22
rec2,5,66,55
rec2,6,4,4
The fields was mapped in the following segment:
csv {
separator => ","
skip_empty_rows => true
columns => [
"Sensornr",
"serial_nr",
"test2",
"test3"
]
convert => {
"serial_nr" => "float"
"test2" => "float"
"test3" => "float"
}
}
and the "file" input plugin was configured like following:
file{
path => ["D:/test.csv"]
mode =>"tail"
start_position => "beginning"
close_older => "24 hour"
sincedb_path => "meta/test"
}
Process of checking if the new fetched field already existed in the current batch or ES history:
# check if the "Sensornr" exist in current batch
throttle {
before_count => 0
after_count => 1
period => 10
max_age => 20
key => "%{Sensornr}"
add_tag => "retest_in_this_batch"
}
if "retest_in_this_batch" in [tags] {
mutate{
add_field => {"FirstIn"=> "N"}
}
}
# check if the "Sensornr" exist in ES old record
else{
elasticsearch {
hosts => "127.0.0.1:9200"
index => "logstash-test2019"
query_template => "esquery.json"
result_size => 1000
aggregation_fields=> { "types_count" => "es_count"}
}
if [es_count][value] != 0 {
mutate{
add_field => {"FirstIn"=> "N" }
}
}else{
mutate{
add_field => {
"FirstIn"=> "Y"
}
}
}
}
and the esquery.json file was configured like this:
{
"query":{
"match":{
"Sensornr":"%{[Sensornr]}"
}
},
"aggs" : {
"types_count" : {
"value_count" : { "field" : "Sensornr.keyword" }
}
}
}
But I got the following result:
Jan 1, 2020 @ 11:39:52.993 rec1 1 Jan 1, 2020 @ 11:39:52.993 0 Y -
Jan 1, 2020 @ 11:39:52.994 rec1 2 Jan 1, 2020 @ 11:39:52.994 - N retest_in_this_batch
Jan 1, 2020 @ 11:39:52.994 rec1 3 Jan 1, 2020 @ 11:39:52.994 - N retest_in_this_batch
Jan 1, 2020 @ 11:39:52.995 rec1 4 Jan 1, 2020 @ 11:39:52.995 - N retest_in_this_batch
Jan 1, 2020 @ 11:40:29.116 rec2 5 Jan 1, 2020 @ 11:40:29.116 - N retest_in_this_batch
Jan 1, 2020 @ 11:40:29.117 rec2 6 Jan 1, 2020 @ 11:40:29.117 0 Y -
For the 4 records of "Sensornr = rec1", 1st record was read before other 3 records, which meet my expect.
But the 2 records of "Sensornr = rec2" has problem, "serial_nr"=6 was marked with "Y" means it was not throttled by "throttle filter", which means it was read earlier than "serial_nr"=5.
Does it mean logstash "file input" plugin doesn't always read line in sequence?
Thanks.