How to add first row first column value as a field in to other rows' documents of a csv file in logstash filters?

I was trying out logstash pipeline config file, in order to add the first row first column value to other documents as a new field. But I got no where!

My csv file is,

Suite/Test/Step Name,Browser,Description,Tag,Start time,End time,Duration,Status

API_Test_suite_2,,,,2020-07-31 12:47:35,2020-07-31 12:48:29,53.965s,PASSED

,,,,,,,

Test Cases/API/Juniper Diagnostics Test/TC - 007,,,,2020-07-31 12:47:37,2020-07-31 12:48:26,48.826s,PASSED

"response = sendRequest(findTestObject(""API/Juniper Diagnostics Test - End Point/Request - 007""))",,,,2020-07-31 12:47:38,2020-07-31 12:48:26,48.523s,PASSED

"verifyResponseStatusCode(response, 200)",,,,2020-07-31 12:48:26,2020-07-31 12:48:26,0.012s,PASSED

"verifyElementPropertyValue(response, ""success"", true)",,,,2020-07-31 12:48:26,2020-07-31 12:48:26,0.019s,PASSED

"verifyElementPropertyValue(response, ""testResult"", true)",,,,2020-07-31 12:48:26,2020-07-31 12:48:26,0.013s,PASSED

"verifyElementPropertyValue(response, ""summary"", ""Service appears to be working on the Juniper router. Physical traffic is flowing both up-stream and down-stream. There is both inbound traffic from the customer and outbound traffic to the customer. The fault may have lasted momentarily."")",,,,2020-07-31 12:48:26,2020-07-31 12:48:26,0.013s,PASSED

,,,,,,,

Test Cases/API/NTUProxySSHConnectivityTest/TC - 001,,,,2020-07-31 12:48:26,2020-07-31 12:48:29,2.785s,PASSED

"response = sendRequest(findTestObject(""API/NTUProxySSHConnectivityTest - End Point/Request - 001""))",,,,2020-07-31 12:48:26,2020-07-31 12:48:29,2.568s,PASSED

"verifyResponseStatusCode(response, 200)",,,,2020-07-31 12:48:29,2020-07-31 12:48:29,0.002s,PASSED

"verifyElementPropertyValue(response, ""summary"", ""

Connection established successfully with the NTU"")",,,,2020-07-31 12:48:29,2020-07-31 12:48:29,0.024s,PASSED

"verifyElementPropertyValue(response, ""success"", true)",,,,2020-07-31 12:48:29,2020-07-31 12:48:29,0.014s,PASSED

"verifyElementPropertyValue(response, ""testResult"", true)",,,,2020-07-31 12:48:29,2020-07-31 12:48:29,0.011s,PASSED 

when we pass above csv to filebeat -> logstash it'll create a json documents for each and every row before passing to elasticsearch index. What I was trying out is that, I need to get the first row first column value and add that values to other row documents as a new field. According to above csv file, first row first column value would be "API_Test_suite_2" (in the csv filter I put as skip_header => "true", so it'll skip the headers.) Now I need to add above "API_Test_suite_2" value to other row documents as a new field.

So far I have implemented below conf file,

input {
  beats {
    ssl => "false"
    port => "5044"
    type => beats
  }
}

filter{
    csv{
      separator => ","
      skip_header => "true"
      columns => ["Suite/Test/Step Name","Browser","Description","Tag","Start time","End time","Duration","Status"]
      skip_empty_columns => "true"
      source => "message"
    }
}

output {
  elasticsearch {
    hosts => ["10.1.4.0:9200"]
    ssl => "false"
    user => "elastic"
    password => "ds6538DS#"
    index => "%{[fields][log_type]}-%{+YYYY.MM.dd}"
  }
 stdout { codec => rubydebug }

 file{
      path => "/usr/share/logstash/pipeline/outputs/json_output_from_logstash_csv_4.json"
  }
}

Any help would be appreciated! :slight_smile:

You can do it in ruby.

To have the pipeline maintain event ordering you will have to set pipeline.workers to 1, so the solution does not scale. Also, for anything from 7.0 to 7.7 set java_execution to false. Note that in a future release you may need to set the pipeline.ordered setting.

Once all that is done, you could try this (which I have not tested)

ruby {
    code => '
        @suite ||= event.get("Suite/Test/Step Name")
        event.set("someField", @suite)
    '
}
1 Like

Thank you for your reply, is it possible to loop through csv rows? I meant, in logstash filter it creates the document for each and every row in the csv file, so, is it possible to access the third row when filtering happens and add only that particular field for that document(row)?

Do you mean you only want to add the field from the first non-header row to the third non-header row?

yes... exactly.

You might be able to do it with

code => '
    @suite ||= event.get("Suite/Test/Step Name")
    @row ||=0
    @row += 1
    if @row == 3
        event.set("someField", @suite)
    end
'

This feels like it is getting more fragile.

1 Like

Cool... thanks. I'll try this one. :slight_smile: :grinning: :smile:

And also, how we can delete a row of the csv file from the conf file filter? And how to get the full row count?

You can count rows using an aggregate filter. Something similar to example 3. The task_id can be the filename, or even a constant that you add using mutate+add_field.

You can delete a row using a drop filter, or if you need ruby to determine whether it is the row to drop, call event.cancel from ruby.

1 Like

Appreciate your response, How we can implement a logic like below in the logstash conf?

Below I mentioned the json documents of the above csv file(output of logstash) before passing to elasticsearch index.

{
    "@version":"1",
    "ecs":{
        "version":"1.5.0"
    },
    "agent":{
        "type":"filebeat",
        "id":"168be642-9aa7-4db6-97f2-f01474f6b576",
        "version":"7.8.0",
        "ephemeral_id":"0743b908-7cee-42b8-b667-fe715cbbe44b",
        "name":"8c2f87a95468",
        "hostname":"8c2f87a95468"
    },"log":{
        "file":{
            "path":"/usr/share/filebeat/reports/test/1/report.csv"
        },
        "offset":82
    },
    "fields":{
        "log_type":"katalon_ui_reports"
    },
    "message":"API_Test_suite,,,,2020-07-31 12:45:19,2020-07-31 12:45:34,14.513s,PASSED",
    "host":{
        "id":"83a8f1f835d84a9a9bf5417cecaf0c8e",
        "os":{
            "version":"7 (Core)",
            "family":"redhat",
            "name":"CentOS Linux",
            "kernel":"4.19.104-microsoft-standard",
            "codename":"Core",
            "platform":"centos"
        },
        "containerized":true,
        "name":"8c2f87a95468",
        "ip":["10.1.4.1"],
        "mac":["02:42:0a:01:04:01"],
        "architecture":"x86_64",
        "hostname":"8c2f87a95468"
    },
    "Suite/Test/Step Name":"API_Test_suite",
    "Start time":"2020-07-31T12:45:19.000Z",
    "input":{"type":"log"},
    "Status":"PASSED",
    "@timestamp":"2020-08-01T06:35:14.668Z",
    "End time":"2020-07-31T12:45:34.000Z",
    "tags":["katalon_ui_reports","beats_input_codec_plain_applied"],
    "Duration":"14.513s"
}
{
    "@version":"1",
    "input":{
        "type":"log"
    },
    "ecs":{
        "version":"1.5.0"
    },
    "agent":{
        "type":"filebeat",
        "id":"168be642-9aa7-4db6-97f2-f01474f6b576",
        "version":"7.8.0",
        "ephemeral_id":"0743b908-7cee-42b8-b667-fe715cbbe44b",
        "name":"8c2f87a95468",
        "hostname":"8c2f87a95468"
    },
    "@timestamp":"2020-08-01T06:35:14.669Z",
    "log":{
        "file":{
            "path":"/usr/share/filebeat/reports/test/1/report.csv"
        },"offset":156
    },
    "message":",,,,,,,",
    "tags":["katalon_magnify_ui_reports","beats_input_codec_plain_applied"],
    "fields":{
        "log_type":"katalon_ui_reports"
    },
    "host":{
        "id":"83a8f1f835d84a9a9bf5417cecaf0c8e",
        "os":{
            "version":"7 (Core)",
            "family":"redhat",
            "name":"CentOS Linux",
            "kernel":"4.19.104-microsoft-standard",
            "codename":"Core"
            ,"platform":"centos"
        },
        "containerized":true,
        "name":"8c2f87a95468",
        "ip":["10.1.4.1"],
        "mac":["02:42:0a:01:04:01"],
        "architecture":"x86_64",
        "hostname":"8c2f87a95468"
    }
}
{
    "@version":"1",
    "ecs":{
        "version":"1.5.0"
    },
    "log":{
        "file":{
            "path":"/usr/share/filebeat/reports/test/1/report.csv"
        },
        "offset":165
    },
    "agent":{
        "type":"filebeat",
        "id":"168be642-9aa7-4db6-97f2-f01474f6b576",
        "version":"7.8.0",
        "ephemeral_id":"0743b908-7cee-42b8-b667-fe715cbbe44b",
        "name":"8c2f87a95468",
        "hostname":"8c2f87a95468"
    },
    "fields":{
        "log_type":"katalon_magnify_ui_reports"
    },
    "message":"Test Cases/API/C7PingTest/TC - 004,,Valid parameter for c7 ping test,,2020-07-31 12:45:26,2020-07-31 12:45:32,6.426s,PASSED",
    "host":{
        "id":"83a8f1f835d84a9a9bf5417cecaf0c8e",
        "os":{
            "version":"7 (Core)",
            "family":"redhat",
            "name":"CentOS Linux",
            "kernel":"4.19.104-microsoft-standard",
            "codename":"Core",
            "platform":"centos"
        },
        "containerized":true,
        "name":"8c2f87a95468",
        "mac":["02:42:0a:01:04:01"],
        "ip":["10.1.4.1"],
        "architecture":"x86_64",
        "hostname":"8c2f87a95468"
    },
    "Suite/Test/Step Name":"Test Cases/API/C7PingTest/TC - 004",
    "Start time":"2020-07-31T12:45:26.000Z",
    "input":{
        "type":"log"
    },
    "Status":"PASSED",
    "@timestamp":"2020-08-01T06:35:14.669Z",
    "End time":"2020-07-31T12:45:32.000Z",
    "tags":["katalon_ui_reports","beats_input_codec_plain_applied"],
    "Duration":"6.426s",
    "Description":"Valid parameter for c7 ping test"
}
{
    "@version":"1",
    "ecs":{
        "version":"1.5.0"
    },
    "agent":{
        "type":"filebeat",
        "id":"168be642-9aa7-4db6-97f2-f01474f6b576",
        "version":"7.8.0",
        "ephemeral_id":"0743b908-7cee-42b8-b667-fe715cbbe44b",
        "name":"8c2f87a95468",
        "hostname":"8c2f87a95468"
    },
    "log":{
        "file":{
            "path":"/usr/share/filebeat/reports/test/1/report.csv"
        },
        "offset":290
    },
    "fields":{
        "log_type":"katalon_ui_reports"
    },
    "message":"\"response = sendRequest(findTestObject(\"\"API/C7PingTest - End Point/Request - 004\"\"))\",,,,2020-07-31 12:45:26,2020-07-31 12:45:32,5.827s,PASSED",
    "host":{
        "id":"83a8f1f835d84a9a9bf5417cecaf0c8e",
        "containerized":true,
        "os":{
            "version":"7 (Core)",
            "family":"redhat",
            "name":"CentOS Linux",
            "kernel":"4.19.104-microsoft-standard",
            "codename":"Core",
            "platform":"centos"
        },
        "name":"8c2f87a95468",
        "ip":["10.1.4.1"],
        "mac":["02:42:0a:01:04:01"],
        "architecture":"x86_64",
        "hostname":"8c2f87a95468"
    },
    "Suite/Test/Step Name":"response = sendRequest(findTestObject(\"API/C7PingTest - End Point/Request - 004\"))",
    "Start time":"2020-07-31T12:45:26.000Z",
    "input":{"type":"log"},
    "Status":"PASSED",
    "@timestamp":"2020-08-01T06:35:14.669Z",
    "End time":"2020-07-31T12:45:32.000Z",
    "tags":["katalon_ui_reports","beats_input_codec_plain_applied"],
    "Duration":"5.827s"
}
{
    "@version":"1",
    "ecs":{"version":"1.5.0"},
    "agent":{
        "type":"filebeat",
        "id":"168be642-9aa7-4db6-97f2-f01474f6b576",
        "version":"7.8.0",
        "ephemeral_id":"0743b908-7cee-42b8-b667-fe715cbbe44b",
        "name":"8c2f87a95468",
        "hostname":"8c2f87a95468"
    },
    "log":{
        "file":{
            "path":"/usr/share/filebeat/reports/test/1/report.csv"
        },
        "offset":435
    },
    "fields":{
        "log_type":"katalon_ui_reports"
    },
    "message":"\"verifyResponseStatusCode(response, 200)\",,,,2020-07-31 12:45:32,2020-07-31 12:45:32,0.003s,PASSED",
    "host":{
        "id":"83a8f1f835d84a9a9bf5417cecaf0c8e",
        "containerized":true,
        "os":{
            "version":"7 (Core)",
            "family":"redhat",
            "name":"CentOS Linux",
            "kernel":"4.19.104-microsoft-standard",
            "codename":"Core",
            "platform":"centos"
        },
        "name":"8c2f87a95468",
        "ip":["10.1.4.1"],
        "mac":["02:42:0a:01:04:01"],
        "architecture":"x86_64",
        "hostname":"8c2f87a95468"
    },
    "Suite/Test/Step Name":"verifyResponseStatusCode(response, 200)",
    "Start time":"2020-07-31T12:45:32.000Z",
    "input":{"type":"log"},
    "Status":"PASSED",
    "@timestamp":"2020-08-01T06:35:14.669Z",
    "End time":"2020-07-31T12:45:32.000Z",
    "tags":["katalon_ui_reports","beats_input_codec_plain_applied"],
    "Duration":"0.003s"
}
{
    "@version":"1",
    "ecs":{
        "version":"1.5.0"
    },
    "agent":{
        "type":"filebeat",
        "id":"168be642-9aa7-4db6-97f2-f01474f6b576",
        "version":"7.8.0",
        "ephemeral_id":"0743b908-7cee-42b8-b667-fe715cbbe44b",
        "name":"8c2f87a95468",
        "hostname":"8c2f87a95468"
    },
    "log":{
        "file":{
            "path":"/usr/share/filebeat/reports/test/1/report.csv"
        },
        "offset":535
    },
    "fields":{
        "log_type":"katalon_ui_reports"
    },
    "message":"\"verifyElementPropertyValue(response, \"\"success\"\", true)\",,,,2020-07-31 12:45:32,2020-07-31 12:45:32,0.056s,PASSED",
    "host":{
        "id":"83a8f1f835d84a9a9bf5417cecaf0c8e",
        "os":{
            "version":"7 (Core)",
            "family":"redhat",
            "name":"CentOS Linux",
            "kernel":"4.19.104-microsoft-standard",
            "codename":"Core",
            "platform":"centos"
        },
        "containerized":true,
        "name":"8c2f87a95468",
        "ip":["10.1.4.1"],
        "mac":["02:42:0a:01:04:01"],
        "architecture":"x86_64",
        "hostname":"8c2f87a95468"
    },
    "Suite/Test/Step Name":"verifyElementPropertyValue(response, \"success\", true)",
    "Start time":"2020-07-31T12:45:32.000Z",
    "input":{"type":"log"},
    "Status":"PASSED",
    "@timestamp":"2020-08-01T06:35:14.669Z",
    "End time":"2020-07-31T12:45:32.000Z",
    "tags":["katalon_ui_reports","beats_input_codec_plain_applied"],
    "Duration":"0.056s"
}
{
    "@version":"1",
    "ecs":{
        "version":"1.5.0"
    },
    "agent":{
        "type":"filebeat",
        "id":"168be642-9aa7-4db6-97f2-f01474f6b576",
        "version":"7.8.0",
        "ephemeral_id":"0743b908-7cee-42b8-b667-fe715cbbe44b",
        "name":"8c2f87a95468",
        "hostname":"8c2f87a95468"
    },
    "log":{
        "file":{
            "path":"/usr/share/filebeat/reports/test/1/report.csv"
        },
        "offset":651
    },
    "fields":{
        "log_type":"katalon_ui_reports"
    },
    "message":"\"verifyElementPropertyValue(response, \"\"testResult\"\", true)\",,,,2020-07-31 12:45:32,2020-07-31 12:45:32,0.036s,PASSED",
    "host":{
        "id":"83a8f1f835d84a9a9bf5417cecaf0c8e",
        "containerized":true,
        "os":{
            "version":"7 (Core)",
            "family":"redhat",
            "name":"CentOS Linux",
            "kernel":"4.19.104-microsoft-standard",
            "codename":"Core",
            "platform":"centos"
        },
        "name":"8c2f87a95468",
        "ip":["10.1.4.1"],
        "mac":["02:42:0a:01:04:01"],
        "architecture":"x86_64",
        "hostname":"8c2f87a95468"
    },
    "Suite/Test/Step Name":"verifyElementPropertyValue(response, \"testResult\", true)",
    "Start time":"2020-07-31T12:45:32.000Z",
    "input":{"type":"log"},
    "Status":"PASSED",
    "@timestamp":"2020-08-01T06:35:14.669Z",
    "End time":"2020-07-31T12:45:32.000Z",
    "tags":["katalon_ui_reports","beats_input_codec_plain_applied"],
    "Duration":"0.036s"
}

This how csv file looks like,

as you can see in the json file, for each row in the csv file, there is json document in the above json(logstash output). Each json document there is a field called "message" which has the katalon test result data; I need to get the test case name and apply that name to it's test steps as a new field. As an example, according to above data, first non header row has the test suite name, second none header row is a empty row(it has the message: ',,,,,,,') ; third none header row has the first test case name of the test suite; 4th none header row to 8th none header row have test steps of that particular test case. I need to get that test case name which is in the 3rd none header row and put that name in all it's test steps(4th to 8th none header rows) as a new field? One file may have multiple test cases. how to loop above implementation?

In the above picture,
Test Cases/API/C7PingTest/TC - 004
Test Cases/API/Diversions Lookup/TC - 004

are the test case names.

I am starting to think the whole approach is wrong. Instead of having filebeat send lines of a csv file and trying to aggregate them in logstash, I think you should have filebeat send the entire file as a multiline message, then parse it in a ruby filter.

1 Like

I'm new to ELK stack,So it is bit confusing for me to get a picture on how to use logstash filters properly. But, really appreciate your support. :grinning: :slight_smile:
How should I start to implement this, "filebeat send the entire file as a multiline message, then parse it in a ruby filter" ?

To send the entire file as a single event you can something like

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /tmp/test.log
  multiline.pattern: ^Monsieur Spalanzani
  multiline.negate: true
  multiline.match: after
  multiline.timeout: 5
output.logstash:
  hosts: ["127.9.2.3:5044"]

Unless you are processing the libretto of an Offenbach opera your file will not contain a line that starts with "Monsieur Spalanzani", and by combining a pattern that never matches with negate: true we can combine every line into one event.

A couple of points about your sample data. Firstly, every other line is blank. In a logstash file input those blank lines get elided, but filebeat keeps them. That is why I use mutate to remove them. Secondly your response summary in the second test is broken across two lines. I am hoping that is a typo, because it is going to be very hard to handle.

In logstash we can parse it using

    mutate { gsub => [ "message", "(\n)\n", "\1" ] } # Strip empty lines
    mutate { add_field => { "[@metadata][lines]" => "%{message}" } }
    mutate { split => { "[@metadata][lines]" => "
" } } # Needs a literal newline in the configuration file
    ruby {
        init => "
            require 'csv'
            Headers = 'Suite/Test/Step Name,Browser,Description,Tag,Start time,End time,Duration,Status'
        "
        code => '
            lines = event.get("[@metadata][lines]")
            if lines
                lines.shift # Discard CSV header
                tests1 = []
                tests1 << [ lines.shift ] # Suite header, needs to be an array with one entry
                while lines.length > 0
                    lines.shift # Discard blank row
                    tests1 << lines.shift(6) # Pull out one test
                end
                # tests1 is now an array, each member being an array of lines

                tests2 = []
                tests1.each { |x|
                    csvString = x.join("\n")
                    csv = CSV.parse(csvString, { :headers => Headers })
                    tests2 << csv
                }
                # tests2 is now an array, each member being a CSV::Table

                tests3 = []
                tests2.each { |table|
                    tests3 << table.map { |row| row.to_hash }
                }
                # tests3 is now an array of arrays of hashes
                event.set("header", tests3.shift[0])
                event.set("tests", tests3)
            end
        '
        remove_field => [ "message" ]
    }
    split { field => "tests" }

This will get you events containing both the header hash, and a tests array that contains the 6 hashes for a single test.

    "header" => {
                  "Status" => "PASSED",
                 "Browser" => nil,
    "Suite/Test/Step Name" => "API_Test_suite_2",
                     "Tag" => nil,
             "Description" => nil,
              "Start time" => "2020-07-31 12:47:35",
                "Duration" => "53.965s",
                "End time" => "2020-07-31 12:48:29"
},
     "tests" => [
    [0] {
                     "Browser" => nil,
                      "Status" => "PASSED",
        "Suite/Test/Step Name" => "Test Cases/API/NTUProxySSHConnectivityTest/TC - 001",
                         "Tag" => nil,
                 "Description" => nil,
                  "Start time" => "2020-07-31 12:48:26",
                    "End time" => "2020-07-31 12:48:29",
                    "Duration" => "2.785s"
    },
    [1] {
                     "Browser" => nil,
                      "Status" => "PASSED",
        "Suite/Test/Step Name" => "response = sendRequest(findTestObject(\"API/NTUProxySSHConnectivityTest - End Point/Request - 001\"))",
                         "Tag" => nil,
                 "Description" => nil,
                  "Start time" => "2020-07-31 12:48:26",
                    "End time" => "2020-07-31 12:48:29",
                    "Duration" => "2.568s"
    ...

That's probably not a good data format for you, but moving the data around is a lot easier than parsing it to start with.

Note also that I used lines.shift(6) to grab the lines for a single test. If there are more or less than 6 lines sometimes then you would have to do a loop appending lines to an array until you see the line containing ,,,,,,, then break and start a new test.

1 Like

wow... This is really helpful. Thank you! appreciate you support... :grinning: :slight_smile:

Hi, one more question, when try to apply this it worked as expected for one report. But I need to do this to multiple reports. That means multiple csv files will be copied to same directory with sub folders(inside these csv files exists). In this kind of a scenario it keeps the "Suite/Test/Step Name" value of the first csv file. It doesn't change base on the report(But it required to be changed). How can I achieve this?

Provided that there is one report per file I would expect it to work.

1 Like

The thing is, katalon generate multiple reports in different folder; once a new report was generated that particular data should go to the elastic index. when using below code ,

code => '
    @suite ||= event.get("Suite/Test/Step Name")
    @row ||=0
    @row += 1
    if @row == 3
        event.set("someField", @suite)
    end
'

it keeps the "@suite" data of the previous report. I need to implement a way to reset to this "@suite" variable once a report has been push to the index. How I can do that?

Oh, I was assuming you were using the code that I posted to process an entire file as a multiline event. If you want to use that code then you would have to make @suite and @row hashes, with entries named after the path to the file.

1 Like

Could you please add an example on how I can do that? :slight_smile:

I have not tested this, but it would be something like

code => '
    @suite ||= {}
    @row ||= {}
    path = event.get("[log][file][name]") # Or whatever the field name is
    if @suite.key?(path)
        @row[path] += 1
    else
        @suite[path] = event.get("Suite/Test/Step Name")
        @row[path] = 1
    end
    if @row[path] == 3
        event.set("someField", @suite[path])
    end
'
1 Like