Logstash - how to configure CSV filter for joining 2 CSV files based on a common field (mapping - one to many data) and ingest the data into single index

Hi Team,

I have 2 CVS files which contains one similar column header say "faculty_id",

For the rows which has same "faculty_id" value, I want the below steps to be done

  1. Join/combine the data from both the csv files into 1 row, and ingest this row into ElasticSearch.
  2. If the left or right part in the join is empty, replace the empty with value 'null'

Find the below column header for CSV files,

Column Header of CSV1
faculty_id,faculty_name,reporting_manager

Column Header of CVS2
faculty_id,student_name,batch,year,percentage

Sample Input Data
csv1:
faculty_id,faculty_name,reporting_manager
1,AAA,R1
2,BBB,R2

csv2:
faculty_id,student_name,batch,year,percentage
1,STU1,2k20,2020,90
1,STU2,2k20,2020,78
2,STU3,2k20,2020,85
1,STU4,2k20,2020,75
2,STU5,2k20,2020,80
3,STU6,2k20,2020,80

Sample Result:
faculty_id,faculty_name,reporting_manager,student_name,batch,year,percentage
1,AAA,R1,STU1,2k20,2020,90
1,AAA,R1,STU2,2k20,2020,78
2,BBB,R2,STU3,2k20,2020,85
1,AAA,R1,STU4,2k20,2020,75
2,BBB,R2,STU5,2k20,2020,80
3,null,null,STU6,2k20,2020,80

Could anyone please guide me with the Logstash configuration to achieve the above mentioned scenario.

You will need to reformat the csv1 file, but you can do it using a translate filter.

Use a file input to read csv2 and then use the following filters

    csv { autodetect_column_names => true }
    translate {
        dictionary_path => "/home/user/t.test/csv1.csv"
        field => "faculty_id"
        destination => "faculty_name"
    }

Then you would use a second translate filter that has a file that looks like

faculty_id,reporting_manager
1,R1
2,R2

Alternately, completely changing the format of csv1

faculty_id,JSON
1,"{""faculty_name"": ""AAA"", ""reporting_manager"": ""R1""}"
2,"{""faculty_name"": ""BBB"", ""reporting_manager"": ""R2""}"

then use these filters

    csv { autodetect_column_names => true }
    translate {
        dictionary_path => "/home/user/csv1.csv"
        field => "faculty_id"
        destination => "[@metadata][json]"
    }
    json { source => "[@metadata][json]" }

Hi Badger,

Thank you for the quick response.

Do we need to have two separate input file for csv1. I am stuck in creating the file with json input with id as you mentioned above.
And also please suggest if we can use mutate filter for any kind of conversion in this.

Could you please provide the sample config for this scenario if any.

#csv1-> "/opt/parquet/test-input1.csv"
#csv2->"/opt/parquet/test-input2.csv"

input {
file {
type => "file2"
path => "/opt/parquet/test-input2.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}

}

filter {
if [type] == "file2" {
csv {
columns => ["...."id"...."]
separator => ","
}
}

mutate {
convert => {"percentage" => "float"}
}

translate {
    dictionary_path => "/opt/parquet/input1.csv"
    field => "id"
    destination => "name"

}

}

As I said, you would have to have three csv files. test-input2.csv would remain as is. The other two would be

test1.csv

faculty_id,faculty_name
1,AAA
2,BBB

test3.csv

faculty_id,reporting_manager
1,R1
2,R2

Then use two translate filters

translate {
    dictionary_path => "/some/path/test1.csv"
    field => "faculty_id"
    destination => "faculty_name"
}
translate {
    dictionary_path => "/some/path/test3.csv"
    field => "faculty_id"
    destination => "reporting_manager"
}

As you suggested, have had three files and used two translate filters but getting the below exception,
[ERROR][logstash.agent ] Failed to execute action {:id=>:sample_cur_pipeline, :action_type=>LogStash::ConvergeResult::FailedAction, :message=>"Could not execute action: PipelineAction::Create<sample_cur_pipeline>, action_result: false", :backtrace=>nil}.

Please guide me with this, also find the below configuration used for the same.

input {
file {
path => "/opt/parquet/test1.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}

filter {
csv {
columns => ["faculty_id","student_name","batch","year","percentage"]
separator => ","
}

translate {
dictionary_path => "/opt/parquet/test2.csv"
field => "faculty_id"
destination => "faculty_name"
}
translate {
dictionary_path => "/opt/parquet/test3.csv"
field => "faculty_id"
destination => "reporting_manager"
}
}
output {
elasticsearch {

hosts => ["http://aa.bb.cc.dd:xxx"]
index => "xxx-index"
user => "uuu"
password => "bbb"
}
}

Try enabling --log.level debug and see if you get a more informative error message.