Process - Reshape a CSV file using logstash

I just realized what you said here. You could change the ruby filter to add

init => '@row_number = 0'

and in the code option of the filter replace

h["row_number"] = rn

with

@row_number += 1
h["row_number"] = @row_number

This requires that you set --pipeline.workers 1

@Badger Unfortunately, I'm nit getting the output yet, I played and messed around with it but it didn't work yet.
The problem is that I should increase the row_number after the process of each row, but now it is being incremented after each cell.

Here is the output I'm getting, and the json is not well formatted:

(refer to the output i need to this post https://discuss.elastic.co/t/turning-a-csv-file-into-json-file-w-adding-new-fields/195571)

{
     "column_value_float" => "",
             "@timestamp" => 2019-08-18T16:09:29.616Z,
             "row_number" => 1,
                   "host" => "c4f20eb0029f",
            "column_name" => "user_name",
               "@version" => "1",
                   "path" => "/usr/share/logstash/data/sample.csv",
                "message" => "1,Mike,Hello,11.5\r",
    "column_value_string" => "Mike"
}
{
     "column_value_float" => "",
             "@timestamp" => 2019-08-18T16:09:29.616Z,
             "row_number" => 2,
                   "host" => "c4f20eb0029f",
            "column_name" => "text",
               "@version" => "1",
                   "path" => "/usr/share/logstash/data/sample.csv",
                "message" => "1,Mike,Hello,11.5\r",
    "column_value_string" => "Hello"
}
{
     "column_value_float" => "11.5",
             "@timestamp" => 2019-08-18T16:09:29.616Z,
             "row_number" => 3,
                   "host" => "c4f20eb0029f",
            "column_name" => "size",
               "@version" => "1",
                   "path" => "/usr/share/logstash/data/sample.csv",
                "message" => "1,Mike,Hello,11.5\r",
    "column_value_string" => ""
}
{
     "column_value_float" => "1",
             "@timestamp" => 2019-08-18T16:09:29.616Z,
             "row_number" => 4,
                   "host" => "c4f20eb0029f",
            "column_name" => "row_number",
               "@version" => "1",
                   "path" => "/usr/share/logstash/data/sample.csv",
                "message" => "1,Mike,Hello,11.5\r",
    "column_value_string" => ""
}
{
     "column_value_float" => "",
             "@timestamp" => 2019-08-18T16:09:29.617Z,
             "row_number" => 5,
                   "host" => "c4f20eb0029f",
            "column_name" => "user_name",
               "@version" => "1",
                   "path" => "/usr/share/logstash/data/sample.csv",
                "message" => "2,Nicolas,Test Test,0.25\r",
    "column_value_string" => "Nicolas"
}
{
     "column_value_float" => "",
             "@timestamp" => 2019-08-18T16:09:29.617Z,
             "row_number" => 6,
                   "host" => "c4f20eb0029f",
            "column_name" => "text",
               "@version" => "1",
                   "path" => "/usr/share/logstash/data/sample.csv",
                "message" => "2,Nicolas,Test Test,0.25\r",
    "column_value_string" => "Test Test"
}
{
     "column_value_float" => "0.25",
             "@timestamp" => 2019-08-18T16:09:29.617Z,
             "row_number" => 7,
                   "host" => "c4f20eb0029f",
            "column_name" => "size",
               "@version" => "1",
                   "path" => "/usr/share/logstash/data/sample.csv",
                "message" => "2,Nicolas,Test Test,0.25\r",
    "column_value_string" => ""
}
{
     "column_value_float" => "2",
             "@timestamp" => 2019-08-18T16:09:29.617Z,
             "row_number" => 8,
                   "host" => "c4f20eb0029f",
            "column_name" => "row_number",
               "@version" => "1",
                   "path" => "/usr/share/logstash/data/sample.csv",
                "message" => "2,Nicolas,Test Test,0.25\r",
    "column_value_string" => ""
}
{
     "column_value_float" => "",
             "@timestamp" => 2019-08-18T16:09:29.617Z,
             "row_number" => 9,
                   "host" => "c4f20eb0029f",
            "column_name" => "user_name",
               "@version" => "1",
                   "path" => "/usr/share/logstash/data/sample.csv",
                "message" => "3,Sandy,Test text,1.25\r",
    "column_value_string" => "Sandy"
}
{
     "column_value_float" => "",
             "@timestamp" => 2019-08-18T16:09:29.617Z,
             "row_number" => 10,
                   "host" => "c4f20eb0029f",
            "column_name" => "text",
               "@version" => "1",
                   "path" => "/usr/share/logstash/data/sample.csv",
                "message" => "3,Sandy,Test text,1.25\r",
    "column_value_string" => "Test text"
}
{
     "column_value_float" => "1.25",
             "@timestamp" => 2019-08-18T16:09:29.617Z,
             "row_number" => 11,
                   "host" => "c4f20eb0029f",
            "column_name" => "size",
               "@version" => "1",
                   "path" => "/usr/share/logstash/data/sample.csv",
                "message" => "3,Sandy,Test text,1.25\r",
    "column_value_string" => ""
}
{
     "column_value_float" => "3",
             "@timestamp" => 2019-08-18T16:09:29.617Z,
             "row_number" => 12,
                   "host" => "c4f20eb0029f",
            "column_name" => "row_number",
               "@version" => "1",
                   "path" => "/usr/share/logstash/data/sample.csv",
                "message" => "3,Sandy,Test text,1.25\r",
    "column_value_string" => ""
}

and here is the code after the changes you suggested + some tweaks to it

input {
  file {
    path => "/usr/share/logstash/data/sample.csv"
    start_position => beginning
    sincedb_path => "/dev/null"
  }
}

filter {
    csv { 
        source => "message" 
        target => "[@metadata][row]" 
        autodetect_column_names => true
    }
    
    ruby {
        init => '@row_number = 0'
        code => '
            r = event.get("[@metadata][row]")
            rn = 1
            if rn
                a = []
                r.each { |k, v|
                    h = {}
                    @row_number += 1
                    h["row_number"] = @row_number
                    h["column_name"] = k
                    if v =~ /^\s*[+-]?((\d+_?)*\d+(\.(\d+_?)*\d+)?|\.(\d+_?)*\d+)(\s*|([eE][+-]?(\d+_?)*\d+)\s*)$/
                        h["column_value_float"] = v
                        h["column_value_string"] = ""
                    else
                        h["column_value_float"] = ""
                        h["column_value_string"] = v
                    end
                    a << h
                }
                event.set("foo", a)
            end
        '
    }
    split { field => "foo" }
    ruby {
        code => '
            event.get("foo").each { |k, v|
                event.set(k,v)
            }
            event.remove("foo")
        '
    }
}

output {
    stdout {
        codec => json{}
    }
    file {
        path => "/usr/share/output/output.json"
        codec => "json_lines"
    }
}

Thanks in advance for your endless help and support.

OK, we no longer need rn, and you should increment the row number once per row, as you said, so change this to

    @row_number += 1
    a = [] 
    r.each { |k, v| 
        h = {}

and remove the end after event.set

1 Like

This is amazing! Finally I got it worked!
Last thing, the output is not formatted as a single json file, it's outputted as a separate json roots
like this:

{"column_value_string":"Mike","column_name":"user_name","row_number":1,"column_value_float":""}
{"column_value_string":"","column_name":"size","row_number":1,"column_value_float":"11.5"}
{"column_value_string":"12A","column_name":"output","row_number":1,"column_value_float":""}
{"column_value_string":"Hello","column_name":"text","row_number":1,"column_value_float":""}
{"column_value_string":"Nicolas","column_name":"user_name","row_number":2,"column_value_float":""}
{"column_value_string":"","column_name":"size","row_number":2,"column_value_float":"0.25"}
{"column_value_string":"13B","column_name":"output","row_number":2,"column_value_float":""}
{"column_value_string":"Test Test","column_name":"text","row_number":2,"column_value_float":""}
{"column_value_string":"Sandy","column_name":"user_name","row_number":3,"column_value_float":""}
{"column_value_string":"","column_name":"size","row_number":3,"column_value_float":"1.25"}
{"column_value_string":"f45","column_name":"output","row_number":3,"column_value_float":""}
{"column_value_string":"Test text","column_name":"text","row_number":3,"column_value_float":""}

How can I make them as single file like this, and add a new field to the json so I can use this field as an index in Elasticsearch:

{
    "resourceId": "ABC123",
    "data":  
    [
            {
            "column_name": "user_name",
            "column_value_string": "mike",
            "column_value_float": null,
            "row_number": 1
            },
            {
            "column_name": "text",
            "column_value_string": "hello",
            "column_value_float": null,
            "row_number": 1
            },
            {
            "column_name": "size",
            "column_value_string": "",
            "column_value_float": 11.5,
            "row_number": 1
            },
            {
            "column_name": "user_name",
            "column_value_string": "Nicolas",
            "column_value_float": null,
            "row_number": 2
            },
            {
            "column_name": "text",
            "column_value_string": "Test Test",
            "column_value_float": null,
            "row_number": 2
            },
            {
            "column_name": "size",
            "column_value_string": "",
            "column_value_float": 0.25,
            "row_number": 2
            },
            {
            "column_name": "user_name",
            "column_value_string": "Sandy",
            "column_value_float": null,
            "row_number": 3
            },
            {
            "column_name": "text",
            "column_value_string": "Test Text",
            "column_value_float": null,
            "row_number": 3
            },
            {
            "column_name": "size",
            "column_value_string": "",
            "column_value_float": 1.25,
            "row_number": 3
            }
    ]
}

Here is the code I'm using in the output:

output {
    stdout {
        codec => json{}
    }
    file {
        path => "/usr/share/output/output.json"
        codec => "json_lines"
    }
}

Is there anyway to achieve that? @Badger

Update:

I used this json filter:

json {
        source => "message"
        target => "message"
    }

but it didn't help on achieving what I mentioned in my previous reply @Badger

I cannot think of a way to do that using logstash.

@Badger Not even using ruby? I mean building one json file instead of having it as separate json objects

If you write enough ruby code you can do pretty much anything, but logstash is not designed to process a set of lines and then merge them together.

1 Like

@Badger
There is an update on the code, I don't know why I'm getting the output columns mixed!
The values are going to the column_name and the column_name values are being set as values!

Here is the code:

input {
  file {
    path => "/usr/share/input/**/*.*"
    start_position => beginning
    sincedb_path => "/dev/null"
    discover_interval => 2
    stat_interval => "1 s"
  }
}

filter {
    csv { 
        source => "message" 
        target => "[@metadata][row]" 
        autodetect_column_names => true
    }
    
    ruby {
        init => '@row_number = 0'
        code => '
            r = event.get("[@metadata][row]")
            @row_number += 1
            a = []
            r.each { |k, v|
                h = {}
                h["column_name"] = k
                h["row_number"] = @row_number
                if v =~ /^\s*[+-]?((\d+_?)*\d+(\.(\d+_?)*\d+)?|\.(\d+_?)*\d+)(\s*|([eE][+-]?(\d+_?)*\d+)\s*)$/
                    h["column_value_float"] = v.to_f
                    h["column_value_string"] = nil
                else
                    h["column_value_float"] = nil
                    h["column_value_string"] = v
                end
                a << h
            }
            event.set("foo", a)
        '
    }
    split { field => "foo" }
    ruby {
        code => '
            event.get("foo").each { |k, v|
                event.set(k,v)
            }
            event.remove("foo")
        '
    }
    
    mutate {
        add_field => { "blockId" => "12def45" }
    }
}

output {
    stdout { codec => rubydebug }
    file {
        path => "/usr/share/output/output.json"
        codec => "json_lines"
    }
    elasticsearch {
        index => "%{blockId}"
        hosts => ["${Hosts}"]
    }
}

Here is the csv file:

user_name,text,size,output,newfield
Mike,Hello,11.5,12A,1
Nicolas,Test Test,0.25,13B,2
Sandy,Test text,1.25,f45,3

Here is the output I'm getting! this is really weird !

{"column_value_float":null,"column_value_string":"Test Test","blockId":"inference-basel","message":"Nicolas,Test Test,0.25,13B,2\r","row_number":3,"column_name":"Test text"}
{"column_value_float":0.25,"column_value_string":null,"blockId":"inference-basel","message":"Nicolas,Test Test,0.25,13B,2\r","row_number":3,"column_name":"1.25"}
{"column_value_float":2.0,"column_value_string":null,"blockId":"inference-basel","message":"Nicolas,Test Test,0.25,13B,2\r","row_number":3,"column_name":"3"}
{"column_value_float":null,"column_value_string":"13B","blockId":"inference-basel","message":"Nicolas,Test Test,0.25,13B,2\r","row_number":3,"column_name":"f45"}
{"column_value_float":null,"column_value_string":"Nicolas","blockId":"inference-basel","message":"Nicolas,Test Test,0.25,13B,2\r","row_number":3,"column_name":"Sandy"}
{"column_value_float":null,"column_value_string":"text","blockId":"inference-basel","message":"user_name,text,size,output,newfield\r","row_number":2,"column_name":"Test text"}
{"column_value_float":null,"column_value_string":"size","blockId":"inference-basel","message":"user_name,text,size,output,newfield\r","row_number":2,"column_name":"1.25"}
{"column_value_float":null,"column_value_string":"newfield","blockId":"inference-basel","message":"user_name,text,size,output,newfield\r","row_number":2,"column_name":"3"}
{"column_value_float":null,"column_value_string":"output","blockId":"inference-basel","message":"user_name,text,size,output,newfield\r","row_number":2,"column_name":"f45"}
{"column_value_float":null,"column_value_string":"user_name","blockId":"inference-basel","message":"user_name,text,size,output,newfield\r","row_number":2,"column_name":"Sandy"}
{"column_value_float":null,"column_value_string":"Hello","blockId":"inference-basel","message":"Mike,Hello,11.5,12A,1\r","row_number":1,"column_name":"Test text"}
{"column_value_float":11.5,"column_value_string":null,"blockId":"inference-basel","message":"Mike,Hello,11.5,12A,1\r","row_number":2,"column_name":"1.25"}
{"column_value_float":1.0,"column_value_string":null,"blockId":"inference-basel","message":"Mike,Hello,11.5,12A,1\r","row_number":3,"column_name":"3"}
{"column_value_float":null,"column_value_string":"12A","blockId":"inference-basel","message":"Mike,Hello,11.5,12A,1\r","row_number":3,"column_name":"f45"}
{"column_value_float":null,"column_value_string":"Mike","blockId":"inference-basel","message":"Mike,Hello,11.5,12A,1\r","row_number":3,"column_name":"Sandy"}

and also when I'm adding two files at the same time the @row_number is not being reset! and it continues where it stopped counting/processing the previous csv file!

Make sure you set '--pipeline.workers 1' and if running a recent version of logstash set pipeline.java_execution: false in logstash.yml

To handle multiple file you would need more ruby code so that instead of having a single @row_number variable you would have a hash that tracked the row number for each value of path. You are really stretching the boundaries of what it makes sense to do using logstash.