Merge two csv files into single csv file using logstash

Hi All,

I am trying to merge two csv files into single csv file using logstash and below is the sample data and config I have tried,

logstash-1.csv

Sample,Gender,Age
1,dentist,10
2,doctor,20
3,rep,30

logstash-2.csv

Sample,Gender,Age
4,Male,30
5,Female,25
6,Male,22

Below is the logstash configuration,

input {
  file {
    path => "/logstash/fileinput/logstash-1.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
  file {
    path => "/logstash/fileinput/logstash-2.csv"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
filter {
  csv {
    skip_header => true
    columns => ["Sample","Gender","Age"]
    separator => ","
  }
}
output {
  csv {
    fields => ["Sample", "Gender", "Age"]
    path => "/pvar/sincelkbs1/logstash/fileinput/output1.csv"
  }
}

Following is the output in csv file,

1,dentist,10
3,rep,30
6,Male,22
2,doctor,20
4,Male,30
5,Female,25

I was expecting my output to be in ascending order but I am doing something wrong. Can you pls help me to resolve this issue?

Thanks,
Ganeshbabu R

logstash does not preserve order by default. If you need the order to be preserved then set pipeline.workers to 1 and pipeline.ordered to true. Obviously this will reduce your throughput.

Hi @Badger

Thanks for sharing the details. It worked as expected in sometimes.

Because I am finding the order of my data sometimes like below after the logstash pipeline execution completed,

4,Male,30
5,Female,25
6,Male,22
Sample,Gender,Age
1,dentist,10
2,doctor,20
3,rep,30

Expected output

Sample,Gender,Age
1,dentist,10
2,doctor,20
3,rep,30
4,Male,30
5,Female,25
6,Male,22

Below is the logstash logs,

[2022-08-10T01:23:47,990][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>"test1", "pipeline.workers"=>1, "pipeline.batch.size"=>1000, "pipeline.batch.delay"=>100, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["/usr/share/logstash/conf.d/second-merge.conf"], :thread=>"#<Thread:0x1f596e@/apps/logstash-7.16.3/logstash-core/lib/logstash/java_pipeline.rb:129 run>"}
[2022-08-10T01:23:47,990][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>"test", "pipeline.workers"=>1, "pipeline.batch.size"=>1000, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>1000, "pipeline.sources"=>["/usr/share/logstash/conf.d/first-merge.conf"], :thread=>"#<Thread:0x65c717@/apps/logstash-7.16.3/logstash-core/lib/logstash/java_pipeline.rb:129 run>"}
[2022-08-10T01:23:48,011][WARN ][logstash.outputs.elasticsearchmonitoring] Restored connection to ES instance {:url=>"https://dev-logstash:9200/"}
[2022-08-10T01:23:48,019][INFO ][logstash.outputs.elasticsearchmonitoring] Elasticsearch version determined (7.16.3) {:es_version=>7}
[2022-08-10T01:23:48,020][WARN ][logstash.outputs.elasticsearchmonitoring] Detected a 6.x and above cluster: the `type` event field won't be used to determine the document _type {:es_version=>7}
[2022-08-10T01:23:48,072][WARN ][logstash.outputs.elasticsearchmonitoring] Restored connection to ES instance {:url=>"https://dev-logstash:9200/"}
[2022-08-10T01:23:48,111][WARN ][logstash.outputs.elasticsearchmonitoring] Restored connection to ES instance {:url=>"https://dev-logstash:9200/"}
[2022-08-10T01:23:48,147][WARN ][logstash.outputs.elasticsearchmonitoring] Configuration is data stream compliant but due backwards compatibility Logstash 7.x will not assume writing to a data-stream, default behavior will change on Logstash 8.0 (set `data_stream => true/false` to disable this warning)
[2022-08-10T01:23:48,147][WARN ][logstash.outputs.elasticsearchmonitoring] Configuration is data stream compliant but due backwards compatibility Logstash 7.x will not assume writing to a data-stream, default behavior will change on Logstash 8.0 (set `data_stream => true/false` to disable this warning)
[2022-08-10T01:23:48,150][WARN ][logstash.javapipeline    ] 'pipeline.ordered' is enabled and is likely less efficient, consider disabling if preserving event order is not necessary
[2022-08-10T01:23:48,154][INFO ][logstash.javapipeline    ] Starting pipeline {:pipeline_id=>".monitoring-logstash", "pipeline.workers"=>1, "pipeline.batch.size"=>2, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>2, "pipeline.sources"=>["monitoring pipeline"], :thread=>"#<Thread:0x3bd44eea@/apps/logstash-7.16.3/logstash-core/lib/logstash/pipeline_action/create.rb:54 run>"}
[2022-08-10T01:23:48,627][INFO ][logstash.javapipeline    ] Pipeline Java execution initialization time {"seconds"=>0.47}
[2022-08-10T01:23:48,640][INFO ][logstash.javapipeline    ] Pipeline Java execution initialization time {"seconds"=>0.65}
[2022-08-10T01:23:48,648][INFO ][logstash.javapipeline    ] Pipeline Java execution initialization time {"seconds"=>0.65}
[2022-08-10T01:23:48,658][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>".monitoring-logstash"}
[2022-08-10T01:23:48,686][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>"test1"}
[2022-08-10T01:23:48,688][INFO ][logstash.javapipeline    ] Pipeline started {"pipeline.id"=>"test"}
[2022-08-10T01:23:48,708][INFO ][filewatch.observingtail  ] START, creating Discoverer, Watch with file and sincedb collections
[2022-08-10T01:23:48,710][INFO ][filewatch.observingtail  ] START, creating Discoverer, Watch with file and sincedb collections
[2022-08-10T01:23:48,739][INFO ][logstash.agent           ] Pipelines running {:count=>3, :running_pipelines=>[:test1, :test, :".monitoring-logstash"], :non_running_pipelines=>[]}
[2022-08-10T01:23:48,910][WARN ][deprecation.logstash.codecs.plain] Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[2022-08-10T01:23:48,910][WARN ][deprecation.logstash.codecs.plain] Relying on default value of `pipeline.ecs_compatibility`, which may change in a future major release of Logstash. To avoid unexpected changes when upgrading Logstash, please explicitly declare your desired ECS Compatibility mode.
[2022-08-10T01:23:49,142][INFO ][logstash.outputs.csv     ] Opening file {:path=>"/apps/logstash/fileinput/output.csv"}
[2022-08-10T01:23:49,145][INFO ][logstash.outputs.csv     ] Opening file {:path=>"/apps/logstash/fileinput/output.csv"}

pipeline.yml

- pipeline.id: test
  queue.type: memory
  path.config: /usr/share/logstash/conf.d/first-merge.conf
  pipeline.batch.delay: 50
  pipeline.workers: 1
  pipeline.ordered: true
- pipeline.id: test1
  queue.type: memory
  path.config: /usr/share/logstash/conf.d/second-merge.conf
  pipeline.batch.delay: 100
  pipeline.workers: 1
  pipeline.ordered: true

first-merge.conf reads the data from logstash-1.csv and writes to output.csv file.
second-merge.conf reads the data from logstash-2 csv and writes to the same output.csv file.

How to define in logstash to execute the pipeline in order? so that the data will be written to csv file in right order.

Please correct me if I am doing anything wrong.

Thanks,
Ganeshbabu

The two pipelines execute independently. There is no way to tell logstash to execute one before the other. However, since they are both file inputs you can use path => "/logstash/fileinput/logstash-[12].csv" and then I believe it will completely read one file before moving on to the other.

Thanks @Badger It worked as expected.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.