Logstash - how to configure CSV filter for joining 2 CSV files based on a common field and send the joined log to ES

Hi Team,

I need your help for my below scenario which I am suffering a lot since last week. I googled a lot but can't figure out a good anwser for my case.

My Scenario is I have 2 CVS file which contains a same column header "COMMIT_SHA1", For the rows which has same COMMIT_SHA1 value, I want them

  1. joined into 1 row, and ship this 1 row into ElasticSearch.
  2. If the left or right part in the join is empty, replace the empty with value 'null'

Column Header of CVS1 is
GIT_ORG, GIT_REPOS, COMMIT_SHA1, COMMIT_AUTHOR
Column Header of CVS2 is
COMMIT_SHA1, FILE_CHANGED, FILE_TYPE, FILE_METHOD

Can anyone help advise how should I configure the CVS for this complex case?

Process the first file with a regular csv filter. Pre-process the second file so that it only has two columns

COMMIT_SHA1,FILE_CHANGED;FILE_TYPE;FILE_METHOD

Then use a translate filter to do a lookup in the second file and a dissect or grok to split the three fields.

Hi Badger,

Thanks for the quick response.

  1. I can understand "Process the first file with a regular csv filter. Pre-process the second file so that it only has two columns". Like below.
    input {
    file{
    path => "C:\elkstack\elasticsearch-6.5.1\logs\cvs1.cvs"
    start_position => "beginning"
    sincedb_path => "null"
    }
    filter {
    csv {
    columns => [
    "GIT_ORG",
    "GIT_REPOS",
    "COMMIT_SHA1",
    "COMMIT_AUTHOR"
    ]
    separator => ","
    }

  2. For Then use a translate filter to do a lookup in the second file and a dissect or grok to split the three fields , like below?

    translate {
    dictionary_path => "C:\elkstack\elasticsearch-6.5.1\logs\cvs2.cvs"
    }

       Could you help provide a sample config for this part?
    

Thanks,
Cherie

    translate { destination => "[@metadata][lookup]" dictionary_path => "/home/user/foo.csv" field => "COMMIT_SHA1" }
    dissect { mapping => { "[@metadata][lookup]" => "%{FILE_CHANGED};%{FILE_TYPE};%{FILE_METHOD}" } }

Thank you. Let me try

Hi Badger,

the content in CSV1.csv is not ready as the trace log says discover_files {"count"=>0}

  [2019-02-05T23:00:41,704][DEBUG][logstash.filters.translate] LogStash::Filters::Translate: 
  Dictionary translation method - Exact
  [2019-02-05T23:00:42,387][TRACE][logstash.inputs.file     ] Registering file input {:path=> 
  ["C:\\elkstack\\elasticsearch-6.5.1\\logs\\csv1.csv"]}
  [2019-02-05T23:00:42,529][INFO ][logstash.pipeline        ] Pipeline started successfully 
  {:pipeline_id=>"main", :thread=>"#<Thread:0x750c6461 run>"}
  [2019-02-05T23:00:42,569][TRACE][logstash.agent           ] Converge results {:success=>true, 
  :failed_actions=>[], :successful_actions=>["id: main, action_type: 
  LogStash::PipelineAction::Create"]}
  [2019-02-05T23:00:42,665][INFO ][logstash.agent           ] Pipelines running {:count=>1, 
  :running_pipelines=>[:main], :non_running_pipelines=>[]}
  [2019-02-05T23:00:42,703][INFO ][filewatch.observingtail  ] START, creating Discoverer, Watch 
  with file and sincedb collections
  [2019-02-05T23:00:42,741][DEBUG][logstash.agent           ] Starting puma
  [2019-02-05T23:00:42,776][DEBUG][logstash.agent           ] Trying to start WebServer 
  {:port=>9600}
  [2019-02-05T23:00:42,849][TRACE][filewatch.sincedbcollection] open: reading from null
  [2019-02-05T23:00:42,864][TRACE][filewatch.sincedbcollection] open: count of keys read: 0
  [2019-02-05T23:00:42,912][DEBUG][logstash.api.service     ] [api-service] start
  [2019-02-05T23:00:42,963][TRACE][filewatch.discoverer     ] discover_files {"count"=>0}

My logstash.conf is
input {
file{
path => "C:\elkstack\elasticsearch-6.5.1\logs\csv1.csv"
start_position => "beginning"
sincedb_path => "null"
}

}

filter {
    csv {
        columns => [
          "GIT_ORG",
          "GIT_REPOS",
          "COMMIT_SHA1",
          "COMMIT_AUTHOR"
        ]
        separator => ","
}
        translate { destination => "[@metadata][lookup]" 
		            dictionary_path => "C:\elkstack\elasticsearch-6.5.1\logs\lookup.csv" 
					field => "COMMIT_SHA1" 
				  }
        dissect { mapping => { "[@metadata][lookup]" => "%{FILE_CHANGED};%{FILE_TYPE};%{FILE_METHOD}" } }
}

output {
    elasticsearch {
	 action => "index"
	 hosts  => "localhost:9200"
	 index  => "logstash-git"
	 }
	stdout { codec => rubydebug }
}

What is the possible issue with CSV1.csv file? Maybe from format perspective?

Thanks,
Cherie

Use forward slash in filenames, not backslash.

My fault. You taught me this in another thread before

Hi Badger,

Appreciated for the solution, It works. I have one more question here.

In your solution, you mentioned that "Pre-process the second file so that it only has two columns"

In my test cvs file, it only has 20 records, so I manually process it to below COMMIT_SHA1,FILE_CHANGED;FILE_TYPE;FILE_METHOD

But in reality, CSV will have huge number of records. Can logstash configuration make it to have the process automatically happen?

Below merge will make it?

mutate {
merge => { "lookup" => "%{FILE_CHANGED};%{FILE_TYPE};%{FILE_METHOD}" }
}

Thanks,
Cherie

You could do something like

mutate { gsub => [ "message", ",", ";", "message", "^([^;]*);", "\1," ] }

That is, turn all the commas into semicolons, then change the first one back to comma. Use that with a file input and a file output to do the pre-processing. Probably cheaper to do it in Powershell tho' :slight_smile:

Hi Badger,

  1. What is this \1 Regular Expression Syntax?

  2. Can you help review my complete logstash.conf if it is correct or not? not clear where should I put the file output after the pre-processing.

     input { file{ path => "C:/elkstack/elasticsearch-6.5.1/logs/csv1.csv"		
                   start_position => "beginning"
     			  sincedb_path => "null" }
     
     file{ path => "C:/elkstack/elasticsearch-6.5.1/logs/lookup.csv"		
           start_position => "beginning"
     	  sincedb_path => "null" }}
    
     filter { if [path]== "C:/elkstack/elasticsearch-6.5.1/logs/csv1.csv" {     
              csv { columns => [ "GIT_ORG",
                                "GIT_REPOS",
                                "COMMIT_SHA1",
                                "COMMIT_AUTHOR"]
                    separator => ","}}
     
     if [path]== "C:/elkstack/elasticsearch-6.5.1/logs/lookup.csv" {
         mutate { gsub => [ "message", ",", ";", "message", "^([^;]*);", "\1," ] }
     
         translate { destination => "[@metadata][lookup]" 
                 dictionary_path => "C:/elkstack/elasticsearch-6.5.1/logs/lookup.csv" 
     			field => "COMMIT_SHA1" 
     		  }
         dissect { mapping => { "[@metadata][lookup]" => "%{FILE_CHANGED};%{FILE_TYPE};%{FILE_METHOD}" }}}
    
     output {
         elasticsearch {
     	 action => "index"
     	 hosts  => "localhost:9200"
     	 index  => "logstash-git" }
     	stdout { codec => rubydebug }
     }

Hi Badger,

I tried with above logstash configuraiton, and looks like there should be one step missed after mutate {gsub} as the log says

 `Dissector mapping, field not found in event {"field"=>"[@metadata][lookup]", "event"=>{"@version"=>"1", "message"=>"e27992e00c232278fafda7737a8a2511ed93fbe2,idl-codomain-service/src/main/java/com/successfactors/codomain/mdf/meta/resolver/impl/COMDFRecursiveGOFieldValidator.java;java;COMDFRecursiveGOFieldValidator.isValid\r", "@timestamp"=>2019-02-06T12:58:18.529Z, "path"=>"C:/elkstack/elasticsearch-6.5.1/logs/lookup.csv", "host"=>"PVGN50859047A"}}`

How to access and refer to original 3 fields FILE_CHANGED;FILE_TYPE;FILE_METHOD and rename them to a new field?

Thanks,
Cherie

I am suggesting that you run a one-off process to modify the lookup file. Use a configuration like

input { file { path => "/home/user/lookup.csv" sincedb_path => "/dev/null" start_position => "beginning" } }
filter {
    mutate { gsub => [ "message", ",", ";", "message", "^([^;]*);", "\1," ] }
}
output { file { path => "/home/user/lookup2.csv" codec => plain { format => "%{message}
" } } }

\1 is a reference to a capture group. A capture group is a pair of parentheses in the regexp. So "^([^;]*);" means at start of line (^) start capturing all characters that are not semicolon up to the first semicolon. Then use that group followed by a comma (instead of the semicolon) as the replacement.

OK, so once you have transformed lookup.csv into lookup2.csv reference that in the translate filter. Something like

input { file { path => "/home/user/foo.csv" sincedb_path => "/dev/null" start_position => "beginning" } }
filter {
    csv {
        columns => [ "GIT_ORG", "GIT_REPOS", "COMMIT_SHA1", "COMMIT_AUTHOR"]
        separator => ","
    }
    translate {
        destination => "[@metadata][lookup]"
        dictionary_path => "/home/user/lookup2.csv"
        field => "COMMIT_SHA1"
    }
    dissect { mapping => { "[@metadata][lookup]" => "%{FILE_CHANGED};%{FILE_TYPE};%{FILE_METHOD}" }}
}
output { stdout {} }
1 Like

Does it mean there will be 2 longstash configuration file?
1st logstash.conf to process lookup.cvs and output it to lookup2.cvs

2nd logstash.conf to process CVS1.cvs and lookup2. cvs to join these 2 files bases on COMMIT_SHA1?

That is correct.

If 2 logstash.conf files are put in same path, how logstash know which one should load and start first by s command logstash -f path?

Yes,it will use the file that you supply using -f

How does it know which one to load and handle first?

If you use "-f /some/path/" then it will concatenate all the files in that directory to create a single configuration, which is not what you want.

If you use "-f /some/path/file1.conf" or "-f /some/path/file2.conf" it will use that single file as the configuration.

You need to do a one-time pre-process of the lookup file using one configuration, and then process the data using the second configuration.

OK. Understood your point.

Facing new issue now. With following logstash configuration, file lookup2.csv is not created under path C:/elkstack/elasticsearch-6.5.1/logs/.

Logstash Configuraiton:
input{ file{ path => "C:/elkstack/elasticsearch-6.5.1/logs/lookup.csv"
start_position => "beginning"
sincedb_path => "null" }
}

filter{ mutate { gsub => [ "message", ",", ";", "message", "^([^;]*);", "\1," ] }}

output { file { path => "C:/elkstack/elasticsearch-6.5.1/logs/lookup2.csv" 
                codec => plain { format => "%{message}" } 
			  } 
	     stdout { codec => rubydebug }
	   }

Logs in Trace Mode, and I don't why it trying to read another csv file (csv1.csv), it should read lookup.csv only

             [2019-02-07T14:55:13,040][TRACE][logstash.inputs.file     ] Registering file input {:path=>["C:/elkstack/elasticsearch-6.5.1/logs/lookup.csv"]}
    [2019-02-07T14:55:13,199][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0xb1445b5 run>"}
    [2019-02-07T14:55:13,257][TRACE][logstash.agent           ] Converge results {:success=>true, :failed_actions=>[], :successful_actions=>["id: main, action_type: LogStash::PipelineAction::Create"]}
    [2019-02-07T14:55:13,358][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
    [2019-02-07T14:55:13,412][INFO ][filewatch.observingtail  ] START, creating Discoverer, Watch with file and sincedb collections
    [2019-02-07T14:55:13,422][DEBUG][logstash.agent           ] Starting puma
    [2019-02-07T14:55:13,454][DEBUG][logstash.agent           ] Trying to start WebServer {:port=>9600}
    [2019-02-07T14:55:13,519][TRACE][filewatch.sincedbcollection] open: reading from null
    [2019-02-07T14:55:13,581][DEBUG][logstash.api.service     ] [api-service] start
    [2019-02-07T14:55:13,591][TRACE][filewatch.sincedbcollection] open: importing ... 'unknown 0 0' => '4326 1549457898.591 C:/elkstack/elasticsearch-6.5.1/logs/csv1.csv'
    [2019-02-07T14:55:13,608][TRACE][filewatch.sincedbcollection] open: setting #<struct FileWatch::InodeStruct inode="unknown", maj=0, min=0> to #<FileWatch::SincedbValue:0x9e31798 @last_changed_at=1549457898.591, @path_in_sincedb="C:/elkstack/elasticsearch-6.5.1/logs/csv1.csv", @watched_file=nil, @position=4326>
    [2019-02-07T14:55:13,622][TRACE][filewatch.sincedbcollection] open: importing ... '4001839134-462055-1638400 0 0' => '4026 1549521929.361 C:/elkstack/elasticsearch-6.5.1/logs/lookup.csv'
    [2019-02-07T14:55:13,626][TRACE][filewatch.sincedbcollection] open: setting #<struct FileWatch::InodeStruct inode="4001839134-462055-1638400", maj=0, min=0> to #<FileWatch::SincedbValue:0x60b41a89 @last_changed_at=1549521929.361, @path_in_sincedb="C:/elkstack/elasticsearch-6.5.1/logs/lookup.csv", @watched_file=nil, @position=4026>
    [2019-02-07T14:55:13,637][TRACE][filewatch.sincedbcollection] open: count of keys read: 2
    [2019-02-07T14:55:13,761][TRACE][filewatch.discoverer     ] discover_files {"count"=>1}
    [2019-02-07T14:55:13,949][TRACE][filewatch.discoverer     ] discover_files handling: {"new discovery"=>true, "watched_file details"=>"<FileWatch::WatchedFile: @filename='lookup.csv', @state='watched', @recent_states='[:watched]', @bytes_read='0', @bytes_unread='0', current_size='4026', last_stat_size='4026', file_open?='false', @initial=true, @sincedb_key='4001839134-462055-1638400 0 0'>"}
    [2019-02-07T14:55:13,962][TRACE][filewatch.sincedbcollection] associate: finding {"inode"=>"4001839134-462055-1638400", "path"=>"C:/elkstack/elasticsearch-6.5.1/logs/lookup.csv"}
    [2019-02-07T14:55:13,981][TRACE][filewatch.sincedbcollection] associate: found sincedb record {"filename"=>"lookup.csv", "sincedb key"=>#<struct FileWatch::InodeStruct inode="4001839134-462055-1638400", maj=0, min=0>, "sincedb_value"=>#<FileWatch::SincedbValue:0x60b41a89 @last_changed_at=1549521929.361, @path_in_sincedb="C:/elkstack/elasticsearch-6.5.1/logs/lookup.csv", @watched_file=nil, @position=4026>}
    [2019-02-07T14:55:14,047][TRACE][filewatch.sincedbcollection] handle_association fully read, ignoring..... {"watched file"=>"<FileWatch::WatchedFile: @filename='lookup.csv', @state='ignored', @recent_states='[:watched, :watched]', @bytes_read='4026', @bytes_unread='0', current_size='4026', last_stat_size='4026', file_open?='false', @initial=false, @sincedb_key='4001839134-462055-1638400 0 0'>", "sincedb value"=>#<FileWatch::SincedbValue:0x60b41a89 @last_changed_at=1549522514.023, @path_in_sincedb="C:/elkstack/elasticsearch-6.5.1/logs/lookup.csv", @watched_file="<FileWatch::WatchedFile: @filename='lookup.csv', @state='ignored', @sincedb_key='4001839134-462055-1638400 0 0, size=4026>", @position=4026>}
    [2019-02-07T14:55:14,060][TRACE][filewatch.sincedbcollection] associate: inode and path matched
    [2019-02-07T14:55:14,141][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}