Multiple tables in csv file

I have a csv file for a order management system .

How do I write the filter in the conf file for this data?
I tried separating using ",,,,,,,,,,,,,,,,,,,,,,,," , but cant form proper json in the output.\

Please help!.

P.S: I am new to logstash.So I dont even know if this IS possible !?


What you have is not a CSV file, but a file that contains delimited sequences of CSV data. This means that using the CSV codec is not an option, but we can still parse this data by deconstructing it in several steps as long as the tables all have the same "schema" (field names and order).

The following is untested, but with the linked documentation it should be enough to get you started.

By using the Multiline Codec, you can ensure that each "chunk" of CSV data is emitted from the input as a single Event. If you're using the File input plugin, the configuration would look something like this:

input {
  file {
    path => "/path/to/your/input.csv"
    codec => multiline {
      pattern => "^,,,,,,,,,,,,,,,,,,,,,,,,"
      negate => true
      what => previous

Those codec directives mean:

  • pattern => "^,,,,,,,,,,,,,,,,,,,,,,,,": lines that start with a bunch of commas are special
  • negate => truewe want to do something with lines that do NOT match our special pattern
  • what => previous: join them to the previous line

These come together to ensure that each "table" is emitted as a single event from the input.

Next, we should probably capture what you have as table names (the "Period 1 - Week 4" bit) and set the value(s) as a field on the event:

filter {
  grok {
    match => {
      "message" => "^,,,,,,,,,,,,,,,,,,,,,,,,\n(?<tablename>[^,]+),,,,,,,,,,,,,,,,,,,,,,,,\n(?<csvdata>(?:.|\n)+)"

Next, we should split the event into smaller events that each contain one line from the csvdata field that we just extracted with the Split Filter. Each emitted event will contain the tablename field from the original event, but its csvdata will be replaced to only contain one line of the csv data.

filter {
  split {
    field => "csvdata"

From there, you can use the CSV filter to parse the message of each event into component fields.

filter {
  csv {
    columns => ["Orders", "12:00 AM", "1:00 AM", ...]

Hi @yaauie ,
Thanks for the quick reply.

I tried following the instructions.
But, logstash is unable to send data to elasticsearch.
Both the servers are running successfully.

Here is the configuration :

input {
file {
path => "D:/KALYANI/ELK/order Dashboard.csv"
codec => multiline {
pattern => "^,,,,,,,,,,,,,,,,,,,,,,,,"
negate => true
what => previous
filter {
grok {
match => {
"message" => "^,,,,,,,,,,,,,,,,,,,,,,,,\n(?<Period 1 - Week 1>[^,]+),,,,,,,,,,,,,,,,,,,,,,,,\n(?(?:.|\n)+)"
split {
field => "csvdata"
csv {
columns => ["Orders", "12:00 AM", "1:00 AM", "2:00 AM","3:00 AM","4:00 AM","5:00 AM","6:00 AM","7:00 AM","8:00 AM","9:00 AM","10:00 AM","11:00 AM","12:00 PM","1:00 PM","2:00 PM","3:00 PM","4:00 PM","5:00 PM","6:00 PM","7:00 PM","8:00 PM","9:00 PM","10:00 PM","11:00 PM"]

output {
elasticsearch {
hosts => ["localhost:9200"]
index => "order"
workers => 1
stdout {}

Also,how do I incorporate multiple table names?


Do your Logstash logs indicate what is going on? Typically there will be ERROR-level logs if something is preventing the pipeline from processing data all the way through to the outputs.

I am not sure what you mean by:

If you are attempting to output the events to separate Elasticsearch indices, using the "index" directive, which can use fields from each event using the sprintf syntax, allows you to direct each event to a specific index.

Elastic search logs:

[2018-11-09T14:06:47,673][DEBUG][o.e.a.ActionModule ] Using REST wrapper from plugin
[2018-11-09T14:06:48,078][INFO ][o.e.d.DiscoveryModule ] [qiL43tM] using discovery type [zen]
[2018-11-09T14:06:49,371][INFO ][o.e.n.Node ] [qiL43tM] initialized
[2018-11-09T14:06:49,371][INFO ][o.e.n.Node ] [qiL43tM] starting ...
[2018-11-09T14:06:50,040][INFO ][o.e.t.TransportService ] [qiL43tM] publish_address {}, bound_addresses {}, {[::1]:9300}
[2018-11-09T14:06:53,217][INFO ][o.e.c.s.MasterService ] [qiL43tM] zen-disco-elected-as-master ([0] nodes joined)[, ], reason: new_master {qiL43tM}{qiL43tM2RKupasia3hqNfA}{86swDWXlQ4Crsze6o0ZuVg}{
}{}{ml.machine_memory=8018644992, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true}
[2018-11-09T14:06:53,233][INFO ][o.e.c.s.ClusterApplierService] [qiL43tM] new_master {qiL43tM}{qiL43tM2RKupasia3hqNfA}{86swDWXlQ4Crsze6o0ZuVg}{}{}{ml.machine_memory=8018644992,
stalled=true, ml.max_open_jobs=20, ml.enabled=true}, reason: apply cluster state (from master [master {qiL43tM}{qiL43tM2RKupasia3hqNfA}{86swDWXlQ4Crsze6o0ZuVg}{}{}{ml.machine_memory=801
8644992, xpack.installed=true, ml.max_open_jobs=20, ml.enabled=true} committed version [1] source [zen-disco-elected-as-master ([0] nodes joined)[, ]]])
[2018-11-09T14:06:54,017][INFO ][o.e.x.s.t.n.SecurityNetty4HttpServerTransport] [qiL43tM] publish_address {}, bound_addresses {}, {[::1]:9200}
[2018-11-09T14:06:54,017][INFO ][o.e.n.Node ] [qiL43tM] started
[2018-11-09T14:06:54,936][WARN ][o.e.x.s.a.s.m.NativeRoleMappingStore] [qiL43tM] Failed to clear cache for realms []
[2018-11-09T14:06:54,998][INFO ][o.e.l.LicenseService ] [qiL43tM] license [a42c8112-7684-40a8-b79c-6f24d72ff136] mode [basic] - valid
[2018-11-09T14:06:55,029][INFO ][o.e.g.GatewayService ] [qiL43tM] recovered [4] indices into cluster_state
[2018-11-09T14:07:00,265][INFO ][o.e.c.r.a.AllocationService] [qiL43tM] Cluster health status changed from [RED] to [YELLOW] (reason: [shards started [[.kibana][0], [bank][4]] ...]).

Logstash logs:

[2018-11-09T14:11:04,825][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.4.2"}
[2018-11-09T14:11:12,764][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-11-09T14:11:13,437][INFO ][logstash.outputs.elasticsearch] Elasticsearch pool URLs updated {:changes=>{:removed=>, :added=>[http://localhost:9200/]}}
[2018-11-09T14:11:13,454][INFO ][logstash.outputs.elasticsearch] Running health check to see if an Elasticsearch connection is working {:healthcheck_url=>http://localhost:9200/, :path=>"/"}
[2018-11-09T14:11:14,021][WARN ][logstash.outputs.elasticsearch] Restored connection to ES instance {:url=>"http://localhost:9200/"}
[2018-11-09T14:11:14,176][INFO ][logstash.outputs.elasticsearch] ES Output version determined {:es_version=>6}
[2018-11-09T14:11:14,181][WARN ][logstash.outputs.elasticsearch] Detected a 6.x and above cluster: the type event field won't be used to determine the document _type {:es_version=>6}
[2018-11-09T14:11:14,219][INFO ][logstash.outputs.elasticsearch] New Elasticsearch output {:class=>"LogStash::Outputs::Elasticsearch", :hosts=>["//localhost:9200"]}
[2018-11-09T14:11:14,355][INFO ][logstash.outputs.elasticsearch] Using mapping template from {:path=>nil}
[2018-11-09T14:11:14,588][INFO ][logstash.outputs.elasticsearch] Attempting to install template {:manage_template=>{"template"=>"logstash-", "version"=>60001, "settings"=>{"index.refresh_interval"=>"5s"},
appings"=>{"default"=>{"dynamic_templates"=>[{"message_field"=>{"path_match"=>"message", "match_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false}}}, {"string_fields"=>{"match"=>"
", "ma
h_mapping_type"=>"string", "mapping"=>{"type"=>"text", "norms"=>false, "fields"=>{"keyword"=>{"type"=>"keyword", "ignore_above"=>256}}}}}], "properties"=>{"@timestamp"=>{"type"=>"date"}, "@version"=>{"type"
"keyword"}, "geoip"=>{"dynamic"=>true, "properties"=>{"ip"=>{"type"=>"ip"}, "location"=>{"type"=>"geo_point"}, "latitude"=>{"type"=>"half_float"}, "longitude"=>{"type"=>"half_float"}}}}}}}}
[2018-11-09T14:11:15,415][INFO ][logstash.inputs.file ] No sincedb_path set, generating one based on the "path" setting {:sincedb_path=>"D:/KALYANI/ELK/logstash-6.4.2/data/plugins/inputs/file/.sincedb_3
5e2cac25c70f9fc234a44c9e30e91", :path=>["D:/KALYANI/ELK/order Dashboard.csv"]}
[2018-11-09T14:11:15,480][INFO ][logstash.pipeline ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x70c25d27 sleep>"}
[2018-11-09T14:11:15,553][INFO ][logstash.agent ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>}
[2018-11-09T14:11:15,623][INFO ][filewatch.observingtail ] START, creating Discoverer, Watch with file and sincedb collections
[2018-11-09T14:11:16,356][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}

Both are running fine. Still I cant see the index in Elastic search cluster. :frowning:

I now decided to modify the csv file and had just 1 table for Period 1 Week 1...
Then I am getting the index in Elasticsearch but the output is like this:

"took": 6,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
"hits": {
"total": 1,
"max_score": 1,
"hits": [
"_index": "orders",
"_type": "doc",
"_id": "A99V-GYBU_DxK6332S9g",
"_score": 1,
"_source": {
"1:00 PM": null,
"@timestamp": "2018-11-09T11:57:44.875Z",
"12:00 PM": null,
"1:00 AM": null,
"11:00 PM": null,
"5:00 PM": null,
"10:00 PM": null,
"message": """
Period 1 - Week 1,,,,,,,,,,,,,,,,,,,,,,,,
Orders,12:00 AM,1:00 AM,2:00 AM,3:00 AM,4:00 AM,5:00 AM,6:00 AM,7:00 AM,8:00 AM,9:00 AM,10:00 AM,11:00 AM,12:00 PM,1:00 PM,2:00 PM,3:00 PM,4:00 PM,5:00 PM,6:00 PM,7:00 PM,8:00 PM,9:00 PM,10:00 PM,11:00 PM
"4:00 PM": null,
"tags": [
"Orders": null,
"2:00 AM": null,
"8:00 PM": null,
"6:00 PM": null,
"9:00 AM": null,
"6:00 AM": null,
"8:00 AM": null,
"4:00 AM": null,
"7:00 AM": null,
"12:00 AM": null,
"host": "01HW387966",
"3:00 PM": null,
"10:00 AM": null,
"@version": "1",
"path": "D:/KALYANI/ELK/orderdashboard.csv",
"2:00 PM": null,
"7:00 PM": null,
"9:00 PM": null,
"11:00 AM": null,
"5:00 AM": null,
"3:00 AM": null

The _grokparsefailure indicates that grok is failing to parse. I would advise using the Grok Constructor to find a pattern that reliably works for your inputs.

Additionally, the logs indicate that the file input is loading a "sincedb", which is what that input uses to ensure that we don't reprocess lines from a log file after a restart. Since you are using this as an import tool and not to watch a log file for new lines to be appended, you will likely want to point the sincedb to a null device (sincedb => "/dev/nul") to prevent it from remembering. See the file input docs for further explanation.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.