I have a CSV file and what im trying to do is after filtering a bit the data, to output them in a another csv file but i have to check that im not writing SAME info. So i have a Ruby script to check for if a KEY is already written. If it is it drops the event of course.
What i find out is that there may be problem with how i handle the variables in the script cause i find some weird stuff.
So here is the pipeline configuration file:
input {
file {
path => "/etc/logstash/in.csv"
start_position => "beginning"
}
}
filter {
if [message] == "" {
drop{}
}
csv{
skip_header => "true"
columns => ["id", "name"]
}
mutate {
add_field => {
"[@metadata][my_key]" => "%{id}" "[@metadata][my_value]" => "%{name}"
}
}
if [@metadata][my_key]{
mutate {
strip => ["[@metadata][my_key]"]
}
#lookup for the key in csv file
ruby{
path => "/etc/logstash/script.rb"
}
}
}
output {
csv {
path => "/etc/logstash/out.csv"
csv_options => {"col_sep" => ","}
fields => [ "[@metadata][my_key]", "[@metadata][my_value]" ]
flush_interval => 0
}
stdout { codec => rubydebug }
}
Here is the ruby script:
require 'csv'
#def register(params)
# @src_key = params["source_field"]
#end
def filter(event)
table = CSV.parse(File.read("/etc/logstash/out.csv"), headers: true)
puts ("My_Key:" + event.get("[@metadata][my_key]").to_s )
puts "Already written keys:"
for _key in table.by_col[0] do
puts _key
if _key.to_s == event.get("[@metadata][my_key]").to_s
return []
end
end
return [event]
end
So first problem? i see is that when the in.csv file has duplicate entries from the start and the out.csv is empty it cannot work because id doesn't save every event separately in the file bu rather it waits to save them all together.
So if my in.csv looks like this:
1,"name1"
2,"name2"
3,"name3"
4,"name4"
4,"name4"
3,"name3"
1,"name1"
1,"name1"
It will save all the entries, not the unique, into the out.csv.
No second test that was a success was when logstash was running and i added again a duplicate entry into the in.csv it dropped it!!
See photo:
Not third and biggest problem i see is if i stop logstash and delete sincedb_path sudo rm /var/lib/logstash/plugins/inputs/file/.sincedb_2f67fa4ed13e04f8dd3e5594584a1c9c
and run logstash again with in.csv and out.csv as before (with duplicate entries from first try) it goes crazy !!
Clearly it has some concurrency problems here because the results on the output doesn't make sense. And also it wrote a duplicate entry into out.csv as you can see from the printed event at the bottom of the screen.
Any suggestions? Thanks a lot and sorry for the huge post.