Dynamic fields in CSV to be pushed to ES

input 
{
	file 
    {
		path => "C:/logstash-7.16.2/Outbound/sample_*.csv"
		start_position => "beginning"
		
	}
}
filter 
{ 
	csv 
	{ 
		separator => ","  
		skip_empty_rows=>true  
		skip_header=>true
		columns => ["project","director","no_of_hours_per_day","hourly_avg_rate_based_on_previous_month_actual","apr_fte_new","may_fte_new","jun_fte_new","jul_fte_new","aug_fte_new",
				"sep_fte_new","oct_fte_new","nov_fte_new","dec_fte_new","jan_fte_new","feb_fte_new","mar_fte_new","apr_fte_new","may_fte_new","jun_fte_new",
				"jul_fte_new","aug_fte_new","sep_fte_new","fy_total"]
		
		skip_empty_columns => "false"
	}
  	if ([project] == "Project")
	{  
		
		mutate 
		{ 
			add_field => 
			{ 
				"[@metadata][header]" => "header"
			}
		}
	}
 	if([project] =="")
	{
		drop{}
	}	

 
	ruby {	init => "@@currentfinanceyear='currentfinanceyear'"
           code => "@@currentfinanceyear=event.get('apr_fte_new')"
         }
		 
	ruby {code => "event.set('currentFY',@@currentfinanceyear)"}
	
	date {match => [ "currentFY" , "MMM-yy" ,"yy-MMM"] 
		 target => "mytime"}
		 
	ruby {code => "event.set('[year_epoch]', (1000*event.get('mytime').to_f).round(0))"} 
	mutate {remove_field => ['mytime']}
	
	ruby {	init => "@@nextfinanceyear='nextfinanceyear'"
           code => "@@nextfinanceyear=event.get('oct_fte_new')"
         }
		 
	ruby {code => "event.set('FY',@@nextfinanceyear)"}
	date {match => [ "FY" , "MMM-yy" ,"yy-MMM"] 
		 target => "mytime"}
	ruby {code => "event.set('[nextyear_epoch]', (1000*event.get('mytime').to_f).round(0))"} 
	mutate {remove_field => ['mytime']}
	mutate { 
						
			  				  
			   add_field => {			  
			   "concat"=>"_"
			   "document_id" => "%{project}%{concat}%{director}"
			  }
			 
			}

}
output 
{ 
    stdout { } 
	stdout { codec => rubydebug }
	 if "_csvparsefailure" not in [tags]
	 {
    elasticsearch 
    {
    	index => "indexbacklog"
       	hosts => ["${ES_HOST}"]
		action => "update"
		doc_as_upsert => "true"
		document_id => "%{document_id}" 
	}   
			
}}

CSV input

|Project|Director|No. of Hours per Day|Hourly Average Rate Based on Previous Month Actual|Apr/24|May/24|Jun/24|Jul/24|Aug/24|Sep/24|Oct/24|Nov/24|Dec/24|Jan/25|Feb/25|Mar/25|Apr/25|May/25|Jun/25|Jul/25|Aug/25|Sep/25|F24 Total|
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|abc|a|9.0| 25.00 | 6.0 | 6.0 | 6.00 | 6.00 | 6.00 | 7.00 | 7.00 | 8.00 | 8.00 | 8.00 | 9.00 | 10.00 | 11.00 | 11.00 | 11.00 | 11.00 | 12.00 | 13.00 | 1,536,359 |
|xyz|b|9.0| 58.60 | 5.0 | 5.00 | 6.00 | 7.00 | 7.00 | 7.00 | 8.00 | 9.00 | 9.00 | 9.00 | 9.00 | 10.00 | 11.00 | 11.00 | 12.00 | 13.00 | 14.00 | 15.00 | 355,322 |
|abc|c|9.0| 89.60 | 2.0 | 3.00 | 3.00 | 3.00 | 3.00 | 3.00 | 3.00 | 4.00 | 5.00 | 6.00 | 6.00 | 7.00 | 8.00 | 9.00 | 10.00 | 10.00 | 10.00 | 10.00 | 204,102 |
|abc|d|9.0| 57.00 | 2.0 | 2.00 | 3.00 | 3.00 | 3.00 | 3.00 | 3.00 | 4.00 | 5.00 | 5.00 | 6.00 | 6.00 | 7.00 | 7.00 | 7.00 | 8.00 | 8.00 | 8.00 | 172,965 |
|xyz|e|9.0| 23.30 | 33.0 | 34.00 | 34.00 | 34.00 | 35.00 | 35.00 | 35.00 | 35.00 | 35.00 | 36.00 | 36.00 | 36.00 | 37.00 | 38.00 | 38.00 | 39.00 | 40.00 | 41.00 | 207,603 |


I have data in CSV. where in march month data will start from apr/24 and end column will be sep/25 . now in next month it will be may/24 and column end will be oct/25. so every month these column header will change.Any idea how to handle this.

Thanks you in advance

A csv filter can only handle one set of column names, if you have a file per month and each file has columns for the following 12 months then I would use ruby.

Use a multiline codec to consume each file as a single event. Something like

file {
    path => "/tmp/foo/?.csv" 
    sincedb_path => "/dev/null" 
    start_position => beginning 
    codec => multiline { 
        pattern => "^Spalanzani" 
        negate => true 
        what => previous 
        auto_flush_interval => 2
    }
}

Then in a ruby filter...

    ruby {
        code => '
            lines = event.get("message")
            lines = lines.gsub(/ *\| */, "|")
            lines = lines.split("\n")

            columns = CSV.parse_line(lines.shift, :col_sep => "|")
            columns.shift   # Discard leading |
            columns.pop     # Discard trailing |

            lines.shift # Discard the row of dashes

            lines.each { |x|
                values = CSV.parse_line(x, :col_sep => "|")
                values.shift # Discard leading |
                columns.each_index { |i| event.set(columns[i], values[i]) }
            }
        '
    }
Thank you badger. I will try this.
I have one more question. 
CSV will be comma separated not '|' So same ruby code will work by replacing '|' with comma. or some other changes are required. 

Yes, the same code should work with that replacement everywhere.

Thank you @Badger . Please can you help me with below issue as well

category type	category no.	value 
A		
		Days
		Days
		Days
		Days
		Days
		Days
		Days
		Days
B		
	1	S Headcount
		S Headcount
	2	S Headcount
		S Headcount
		Utilization
C		
S1	1	B Headcount
		B Headcount
		B Headcount
	2	Utilization
S2	1	Headcount
		Headcount
		Headcount
		Headcount
		Headcount
		Headcount
		Headcount
		Headcount
		Headcount
		Headcount
		Headcount
		Headcount
		Headcount
		Headcount
	2	Utilization
S3	1	Headcount
		Headcount
	2	Utilization
S4	1	Headcount
		Headcount
		Headcount
		Headcount
		Headcount
	2	Utilization
S5	1	Headcount
		Headcount
	2	Utilization

In my CSV i have similar data first column category type, second category no and third value.
category type A,B,C,S1, S2 and so on 

in my logstash config what i am trying is say you find first category A then apply same category to all rows until you find next category. same should be applied to all unique categories. So while pushing data to elastic search it should push data in below format ... note giving CSV as reference 
category type	category no.	value 
A		
A		Days
A		Days
A		Days
A		Days
A		Days
A		Days
A		Days
A		Days
B		
B	1	S Headcount
B		S Headcount
B	2	S Headcount
B		S Headcount
B		Utilization
C		
S1	1	B Headcount
S1		B Headcount
S1		B Headcount
S1	2	Utilization
S2	1	Headcount
S2		Headcount
S2		Headcount
S2		Headcount
S2		Headcount
S2		Headcount
S2		Headcount
S2		Headcount
S2		Headcount
S2		Headcount
S2		Headcount
S2		Headcount
S2		Headcount
S2		Headcount
S2	2	Utilization
S3	1	Headcount
S3		Headcount
S3	2	Utilization
S4	1	Headcount
S4		Headcount
S4		Headcount
S4		Headcount
S4		Headcount
S4	2	Utilization
S5	1	Headcount
S5		Headcount
S5	2	Utilization

I have tried below code but it does not work 
   ruby {
        code => '
            prev_category = nil

            event.set("Category", prev_category) if prev_category
            if event.get("Category")
                prev_category = event.get("Category")
            else
                event.set("Category", prev_category)
            end
        '
    }

Please help me with this 
Thanks in advance

The prev_category = nil gets executed for every event, so the event.set will never execute. You could try

init = 'prev_category = nil'
code => '
    event.set("Category", prev_category) if prev_category

et. But the init isn't really needed, since prev_category will be nil by default.

Thanks @Badger .
any other better approach to achieve this?

This unconditionally overwrites the [Category] field with prev_category. That's not good.

If an event has a non-empty [Category] field then save it, but do not change it. Save it in an instance variable so that it is persisted across multiple events.

If the event does not have a [Category] field then add it using @prev_category.

code => '
    category = event.get("Category")
    if category
        @prev_category = category
    else
        event.set("Category", @prev_category)
    end
'

That's all you need. Obviously you need pipeline.workers set to 1 and pipeline.ordered to evaluate to true.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.