Logstash Ruby Script - Access Fields While Avoiding Concurrency Issues

I have a CSV file and what im trying to do is after filtering a bit the data, to output them in a another csv file but i have to check that im not writing SAME info. So i have a Ruby script to check for if a KEY is already written. If it is it drops the event of course.

What i find out is that there may be problem with how i handle the variables in the script cause i find some weird stuff.

So here is the pipeline configuration file:

input {
	file {
        path => "/etc/logstash/in.csv"
        start_position => "beginning"
    }
}

filter {

	if [message] == "" {
		drop{}
	}

	csv{

		skip_header => "true"
		columns => ["id", "name"]
		
	}

    mutate {
        add_field => {
            "[@metadata][my_key]" => "%{id}" "[@metadata][my_value]" => "%{name}"
        }
	}

	
	if [@metadata][my_key]{
		mutate {
			strip => ["[@metadata][my_key]"]
		}

		#lookup for the key in csv file 
		ruby{
			path => "/etc/logstash/script.rb"
		}
	}
	
}



output {

	csv {
		path => "/etc/logstash/out.csv"
		csv_options => {"col_sep" => ","}
		fields => [ "[@metadata][my_key]", "[@metadata][my_value]" ]
		flush_interval => 0
	}


  	stdout { codec => rubydebug }
	
}

Here is the ruby script:

require 'csv'

#def register(params)
#	@src_key = params["source_field"]
#end

def filter(event)

    table = CSV.parse(File.read("/etc/logstash/out.csv"), headers: true)
    puts ("My_Key:" +  event.get("[@metadata][my_key]").to_s )
    puts "Already written keys:"    
    for _key in table.by_col[0] do
        puts _key
        if _key.to_s == event.get("[@metadata][my_key]").to_s
            return []
        end
    end

    return [event]

end

So first problem? i see is that when the in.csv file has duplicate entries from the start and the out.csv is empty it cannot work because id doesn't save every event separately in the file bu rather it waits to save them all together.

So if my in.csv looks like this:

1,"name1"
2,"name2"
3,"name3"
4,"name4"
4,"name4"
3,"name3"
1,"name1"
1,"name1"

It will save all the entries, not the unique, into the out.csv.

No second test that was a success was when logstash was running and i added again a duplicate entry into the in.csv it dropped it!!
See photo:

Not third and biggest problem i see is if i stop logstash and delete sincedb_path sudo rm /var/lib/logstash/plugins/inputs/file/.sincedb_2f67fa4ed13e04f8dd3e5594584a1c9c and run logstash again with in.csv and out.csv as before (with duplicate entries from first try) it goes crazy !!


Clearly it has some concurrency problems here because the results on the output doesn't make sense. And also it wrote a duplicate entry into out.csv as you can see from the printed event at the bottom of the screen.

Any suggestions? Thanks a lot and sorry for the huge post.

You will need pipeline.workers set to 1, otherwise, when you start, two events with the same id may be processed by separate threads, resulting in duplicate entries in the output.

Also, you need to understand pipeline batches. By default, 125 events will go through the csv filter, then those 125 events go through the mutate, then through the ruby filter, then they get sent to the output. So none of the first 125 events will get de-duped. You can set pipeline.batch.size to 1 to try to avoid this, but it will not work 100% of the time because a second batch may get through the ruby filter before the output is flushed.

You could use an aggregate filter to keep a record of which keys you have processed.

mutate { add_field => { "[@metadata][constant]" => 1 } }
aggregate {
    task_id => "%{[@metadata][constant]}"
    code => '
        key = event.get("[@metadata][my_key]")
        if map[key]
            event.cancel
        else
            map[key] = 1
        end
    '
}

This still requires pipeline.workers set to 1. Note that you can use aggregate_maps_path to persist the map across restarts. Note that entries in the map are never deleted, so this leaks memory. You could use the timeout option to flush them.

1 Like

I will check the aggregate filter you proposed as I'm a bit unfamiliar with. But may as it be, the -w 1 setting worked almost as i would.

I say almost because I'm experiencing an almost good result and your knowledge may help here. As u said for the first batch of events i cannot guarantee uniqueness (haven't tried aggregate filter yet), but what i tried is to redo the third attempt i made in my post. What i did is with the out.csv filled with these entries:

1,name1
2,name2
3,name3
4,name4
3,name3
4,name4
5,name5

I deleted the sincedb_path sudo rm /var/lib/logstash/plugins/inputs again an run logstash with 1 worker of course.

Now i did almost perfect:

Check it. For some reason it skips the first entry only. It checks everything else with order of out.csv but it skips the first one.

Do you know why?
Thanks again

That is because you told CSV.parse that the first row is a header.

1 Like

Thanks for the info. That was it.
I fixed my issue mostly and still haven't used the aggregate filter. I will mark your first response as solution

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.