Parsing csv file through Logstash

Hi All,

I am running ELK stack 7.6.2. I need to parse a csv through Logstash. My logstash conf file looks like this:

input {
  file {
    path => "/opt/gtal/ictal/elasticsearch/app/logstash/stage/STATS-01062021.txt"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
filter {
  csv {
      separator => ","
      skip_header => "true"
      columns => ["Total call statistics for each process","User call statistics for each process"]
  }
}
output {
      file {
       path => "/opt/gtal/ictal/elasticsearch/logs/maa/rubydebug.txt"
       codec => rubydebug
     }


    elasticsearch {
     hosts => [ "xx.xx.xx.xxx:3045" ]
     user => "xxxxxxxxxx"
     password => "xxxxxxxxxxxxxx"
         index => "demo-csv-%{+YYYY.MM.dd}"

  }
}

While I am able to display the result in Kibana, the data is spread all over. The CSV file is also complicated. Please see below:

Total call statistics for each process:
{'BServer': 3105489,
 'CServer': 10146,
 'CMngr': 5679760,
 'CiCServer-19000101:19951231': 0,
 'CiCServer-20191001:20191231': 34,
 'CiCServer-20200101:20200331': 114134,
 'CiCServer-20200401:20200630': 4967513,
 'CiCServer-20200701:20200930': 4968153,
 'CiCServer-20201001:20201231': 5057632,
 'CiCServer-20210101:20210331': 5158988,
 'CiCServer-20210401:20210630': 5351872,
 'CiCServer-20210701:20210930': 0,
 'CiCServer-20211001:20211231': 0,
 'CfgServer': 52,
 'CRskhist': 3548,
 'gccdServer': 6,
 'gfnServer': 1328904,
 'kServer': 14363,
 'madServer': 11045554,
 'mmcServer': 6,
 'qsServer': 6,
 'RrServer': 681,
 'StnServer': 194801}
User call statistics for each process:
{'BServer': {'': 7.0,
               'ccvma': 153766.0,
               'ccvrts': 2951576.0,
               'fqenv': 3.0,
               'frtbU': 137.0},
 'CServer': {'': 3.0, 'ccvma': 10140.0, 'fqenv': 3.0},
 'CMngr': {'': 182.0,
                        'ccvma': 190594.0,
                        'ccvrts': 5488978.0,
                        'fqenv': 6.0},
 'CiCServer-19000101:19951231': {},
 'CiCServer-19960101:20001231': {},
 'CiCServer-20010101:20011231': {},
 'CiCServer-20190101:20190331': {'': 9.0, 'ccvma': 9.0},
 'CiCServer-20190401:20190630': {'': 16.0, 'ccvma': 18.0},
 'CiCServer-20190701:20190930': {'': 16.0, 'ccvma': 18.0},
 'CiCServer-20191001:20191231': {'': 16.0, 'ccvma': 18.0},
 'CiCServer-20200101:20200331': {'': 16.0,
                                         'ccvma': 1768.0,
                                         'ccvrts': 112350.0},
 'CiCServer-20200401:20200630': {'': 11.0,
                                         'ccvma': 45092.0,
                                         'ccvrts': 4922410.0},
 'CiCServer-20200701:20200930': {'': 11.0,
                                         'ccvma': 45645.0,
                                         'ccvrts': 4922497.0},
 'CiCServer-20201001:20201231': {'': 16.0,
                                         'ccvma': 4074128.0,
                                         'ccvrts': 983488.0},
 'CiCServer-20210101:20210331': {'': 16.0,
                                         'ccvma': 4824540.0,
                                         'ccvrts': 334432.0},
 'CiCServer-20210401:20210630': {'': 7341.0,
                                         'ccvma': 4683996.0,
                                         'ccvrts': 660535.0},
 'CiCServer-20210701:20210930': {},
 'CiCServer-20211001:20211231': {},
 'CfgServer': {'': 6.0, 'ccvrts': 40.0, 'fqenv': 6.0},
 'CRskhist': {'': 3.0, 'ccvma': 3542.0, 'fqenv': 3.0},
 'gccdServer': {'': 3.0, 'fqenv': 3.0},
 'gfnServer': {'': 433.0,
                  'ccvrts': 1328038.0,
                  'fqenv': 3.0,
                  'nameservice': 430.0},
 'kServer': {'': 3.0, 'ccvrts': 14357.0, 'fqenv': 3.0},
 'madServer': {'': 162.0,
               'ccvma': 358873.0,
               'ccvrts': 10686372.0,
               'fqenv': 9.0,
               'frtbU': 138.0},
 'mmcServer': {'': 3.0, 'fqenv': 3.0},
 'qsServer': {'': 3.0, 'fqenv': 3.0},
 'RrServer': {'': 3.0, 'ccvma': 675.0, 'fqenv': 3.0},
 'StnServer': {'': 3.0, 'ccvrts': 194795.0, 'fqenv': 3.0}}

Please guide me as I am new to this. thanks

Is that the contents of your file? It is not a CSV.

Yes you are right. It is a text file. Sorry for misleading. Is there a way to parse this using logstash?

No problem. Ingest the entire file as a single event. To do that use a file input with a multiline codec that never matches the data in the file

codec => multiline { pattern => "^Spalanzani" negate => true what => previous auto_flush_interval => 1 multiline_tag => "" }

Then chop the data into two parts using grok. Save these under [@metadata] so that they are available in logstash but not indexed in elasticsearch. Then convert the data to JSON (which requires double quotes) and parse

    grok { match => { "message" => "Total call statistics for each process:%{GREEDYDATA:[@metadata][totalCalls]}User call statistics for each process:%{GREEDYDATA:[@metadata][userCalls]}" } remove_field => [ "message" ] }
    mutate {
        gsub => [
            "[@metadata][totalCalls]", "'", '"',
            "[@metadata][userCalls]", "'", '"'
        ]
    }
    json { source => "[@metadata][userCalls]" target => "userCalls" }
    json { source => "[@metadata][totalCalls]" target => "totalCalls" }

That will result in an event with

"totalCalls" => {
                      "mmcServer" => 6,
    "CiCServer-20200701:20200930" => 4968153,
    "CiCServer-20211001:20211231" => 0,
                        "BServer" => 3105489,
...
},
 "userCalls" => {
                      "mmcServer" => {
             "" => 3.0,
        "fqenv" => 3.0
    },
    "CiCServer-20200701:20200930" => {
              "" => 11.0,
        "ccvrts" => 4922497.0,
         "ccvma" => 45645.0
    },

"" is a valid field name in JSON, but a lot of things object to empty names. I think elasticsearch may be one, so you might get a mapping exception. If so, you could insert

"[@metadata][userCalls]", "''", "'empty'",

at the top of the list of gsubs in the mutate filter.

Hi. Its so hard....
Topic creator have json, but yes he must del
"Total call statistics for each process:" and "User call statistics for each process:"
from this file and replace ' to "
Simple way do it in bash (if its on linux)
cat YouFile.txt |awk '{if(!index($0,"call statistics for each process")){gsub(/\x027/,"\"",$0);print $0}}' >> jsonfile.txt

End next if logstash use some input with
codec="json"

Hello Badger,

Thanks! It worked as expected after I changed as per your recommendations

Hi,

I was wondering if it is possible to assign a generic variable name say "serverName" and assign the values to it? e.g:

serverName: BServer
serverName: CServer
serverName: CMngr
serverName: CiCServer-19000101:19951231
...................
and so on.

This would perhaps help me in creating a visualization chart based on individual server.

Total call statistics for each process:
{'BServer': 3105489,
 'CServer': 10146,
 'CMngr': 5679760,
 'CiCServer-19000101:19951231': 0,
 'CiCServer-20191001:20191231': 34,
 'CiCServer-20200101:20200331': 114134,
 'CiCServer-20200401:20200630': 4967513,
 'CiCServer-20200701:20200930': 4968153,
 'CiCServer-20201001:20201231': 5057632,
 'CiCServer-20210101:20210331': 5158988,
 'CiCServer-20210401:20210630': 5351872,
 'CiCServer-20210701:20210930': 0,
 'CiCServer-20211001:20211231': 0,
 'CfgServer': 52,
 'CRskhist': 3548,
 'gccdServer': 6,
 'gfnServer': 1328904,
 'kServer': 14363,
 'madServer': 11045554,
 'mmcServer': 6,
 'qsServer': 6,
 'RrServer': 681,
 'StnServer': 194801}
User call statistics for each process:
{'BServer': {'': 7.0,
               'ccvma': 153766.0,
               'ccvrts': 2951576.0,
               'fqenv': 3.0,
               'frtbU': 137.0},
 'CServer': {'': 3.0, 'ccvma': 10140.0, 'fqenv': 3.0},
 'CMngr': {'': 182.0,
                        'ccvma': 190594.0,
                        'ccvrts': 5488978.0,
                        'fqenv': 6.0},
 'CiCServer-19000101:19951231': {},
 'CiCServer-19960101:20001231': {},
 'CiCServer-20010101:20011231': {},
 'CiCServer-20190101:20190331': {'': 9.0, 'ccvma': 9.0},
 'CiCServer-20190401:20190630': {'': 16.0, 'ccvma': 18.0},
 'CiCServer-20190701:20190930': {'': 16.0, 'ccvma': 18.0},
 'CiCServer-20191001:20191231': {'': 16.0, 'ccvma': 18.0},
 'CiCServer-20200101:20200331': {'': 16.0,
                                         'ccvma': 1768.0,
                                         'ccvrts': 112350.0},
 'CiCServer-20200401:20200630': {'': 11.0,
                                         'ccvma': 45092.0,
                                         'ccvrts': 4922410.0},
 'CiCServer-20200701:20200930': {'': 11.0,
                                         'ccvma': 45645.0,
                                         'ccvrts': 4922497.0},
 'CiCServer-20201001:20201231': {'': 16.0,
                                         'ccvma': 4074128.0,
                                         'ccvrts': 983488.0},
 'CiCServer-20210101:20210331': {'': 16.0,
                                         'ccvma': 4824540.0,
                                         'ccvrts': 334432.0},
 'CiCServer-20210401:20210630': {'': 7341.0,
                                         'ccvma': 4683996.0,
                                         'ccvrts': 660535.0},
 'CiCServer-20210701:20210930': {},
 'CiCServer-20211001:20211231': {},
 'CfgServer': {'': 6.0, 'ccvrts': 40.0, 'fqenv': 6.0},
 'CRskhist': {'': 3.0, 'ccvma': 3542.0, 'fqenv': 3.0},
 'gccdServer': {'': 3.0, 'fqenv': 3.0},
 'gfnServer': {'': 433.0,
                  'ccvrts': 1328038.0,
                  'fqenv': 3.0,
                  'nameservice': 430.0},
 'kServer': {'': 3.0, 'ccvrts': 14357.0, 'fqenv': 3.0},
 'madServer': {'': 162.0,
               'ccvma': 358873.0,
               'ccvrts': 10686372.0,
               'fqenv': 9.0,
               'frtbU': 138.0},
 'mmcServer': {'': 3.0, 'fqenv': 3.0},
 'qsServer': {'': 3.0, 'fqenv': 3.0},
 'RrServer': {'': 3.0, 'ccvma': 675.0, 'fqenv': 3.0},
 'StnServer': {'': 3.0, 'ccvrts': 194795.0, 'fqenv': 3.0}}

Almost certainly. But instead of

"userCalls" => {
                      "mmcServer" => {
             "" => 3.0,
        "fqenv" => 3.0
    },
    "CiCServer-20200701:20200930" => {
              "" => 11.0,
        "ccvrts" => 4922497.0,
         "ccvma" => 45645.0
    },

etc. what do you want the data to look like?

They could be:

serverName: mmcServer.empty
serverName: mmcServer.fqenv

serverName: CiCServer-20200701:20200930.ccvrts
serverName: CiCServer-20200701:20200930.ccvma

Hope I understood your question correctly.

Thanks

But if serverName is the key then where does the value go? Here is one option for what you could do -- one event per server. Change the json filter to store the parsed data in [@metadata] and then use ruby

    json { source => "[@metadata][userCalls]" target => "[@metadata][parsedUserCalls]" }
    json { source => "[@metadata][totalCalls]" target => "[@metadata][parsedTotalCalls]" }
    ruby {
        code => '
            a = []
            userCalls = event.get("[@metadata][parsedUserCalls]")
            totalCalls = event.get("[@metadata][parsedTotalCalls]")
            totalCalls.each { |k, v|
                server = {}
                server["serverName"] = k
                server["totalCalls"] = v
                server["userCalls"] = userCalls[k]
                a << server
            }
            event.set("servers", a)
        '
    }

That will get you

   "servers" => [
    [ 0] {
         "userCalls" => {},
        "serverName" => "CiCServer-20210701:20210930",
        "totalCalls" => 0
    },
    [ 1] {
         "userCalls" => {
            "empty" => 3.0,
            "fqenv" => 3.0
        },
        "serverName" => "gccdServer",
        "totalCalls" => 6
    },

etc. You might want to use a split filter to separate each filter into its own event, and then use this to move the contents of [server] to the root.

But do not let my guesses drive your data design. You need to describe exactly how you want they keys and values to appear.

Thanks. My aim is to create a visualization based on the output, regardless of how the data is manipulated/ displayed.

I will try your suggestion.

Hi Badger, I am struggling with getting this part done... I tried various options but it did not work out.

Can you please guide?

If you have a top level field called servers which is an array then

split { field => "servers" }

will result in as many events as there are entries in the servers array, each containing a field called servers that contains one of those entries.

So this is how the logtsash config looks now:

input {
  file {
    path => "/opt/gportal/icgportal/elasticsearch/app/logstash/stage/CVMQA_UAT_STATS-*.txt"
    codec => multiline { pattern => "^Spalanzani" negate => true what => previous auto_flush_interval => 1 multiline_tag => "" }
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
filter {
  csv {
      separator => ","
      skip_header => "true"
      columns => ["Total call statistics for each process","User call statistics for each process"]
  }
  grok { match => { "message" => "Total call statistics for each process:%{GREEDYDATA:[@metadata][totalCalls]}User call statistics for each process:%{GREEDYDATA:[@metadata][userCalls]}" } r
emove_field => [ "message" ] }
    mutate {
        gsub => [
            "[@metadata][userCalls]", "''", "'empty'",
            "[@metadata][totalCalls]", "'", '"',
            "[@metadata][userCalls]", "'", '"'
        ]
    }
        json { source => "[@metadata][userCalls]" target => "[@metadata][parsedUserCalls]" }
        json { source => "[@metadata][totalCalls]" target => "[@metadata][parsedTotalCalls]" }
		ruby {
            code => '
                a = []
                userCalls = event.get("[@metadata][parsedUserCalls]")
                totalCalls = event.get("[@metadata][parsedTotalCalls]")
                totalCalls.each { |k, v|
                    server = {}
                    server["serverName"] = k
                    server["totalCalls"] = v
                    server["userCalls"] = userCalls[k]
                    a << server
            }
            event.set("servers", a)
        '
    }
	
}
output {
      file {
       path => "/opt/gportal/icgportal/elasticsearch/logs/mqa/rubydebug.txt"
       codec => rubydebug
     }


    elasticsearch {
     hosts => [ "xx.xx.xxx.xxx:3045" ]
     user => "elastic"
     password => "xxxxxxxxxxxxxxxxxxx"
         index => "demo-csv-%{+YYYY.MM.dd}"

  }
}

Where can I add split { field => "servers" } ?

After the ruby filter.

Thanks Badger!

Hi Badger. Thanks for the help. Fields now start to look fine. When I try to replace the txt file with a new one I get the following error in the log:

[2021-06-11T16:16:39,863][ERROR][logstash.filters.ruby    ][main] Ruby exception occurred: undefined method `each' for nil:NilClass
[2021-06-11T16:16:39,878][WARN ][logstash.filters.split   ][main] Only String and Array types are splittable. field:servers is of type = NilClass
[2021-06-11T16:16:39,882][ERROR][logstash.filters.ruby    ][main] Ruby exception occurred: undefined method `each' for nil:NilClass

My config file looks like following:

input {
  file {
    path => "/opt/gtal/iptal/elasticsearch/app/logstash/stage/CVMQA_UAT_STATS-*.txt"
    codec => multiline { pattern => "^Spalanzani" negate => true what => previous auto_flush_interval => 1 multiline_tag => "" }
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}
filter {
  csv {
      separator => ","
      skip_header => "true"
      columns => ["Total call statistics for each process","User call statistics for each process"]
  }
grok { match => { "message" => "Total call statistics for each process:%{GREEDYDATA:[@metadata][totalCalls]}User call statistics for each process:%{GREEDYDATA:[@metadata][userCalls]}" } rem
ove_field => [ "message" ] }
    mutate {
        remove_field => [ "host" ]
        gsub => [
            "[@metadata][userCalls]", "''", "'empty'",
            "[@metadata][totalCalls]", "'", '"',
            "[@metadata][userCalls]", "'", '"'
        ]
    }
        json { source => "[@metadata][userCalls]" target => "[@metadata][parsedUserCalls]" }
        json { source => "[@metadata][totalCalls]" target => "[@metadata][parsedTotalCalls]" }
        ruby {
            code => '
                a = []
                userCalls = event.get("[@metadata][parsedUserCalls]")
                totalCalls = event.get("[@metadata][parsedTotalCalls]")
                totalCalls.each { |k, v|
                    server = {}
                    server["serverName"] = k
                    server["totalCalls"] = v
                    server["userCalls"] = userCalls[k]
                    a << server
            }
            event.set("servers", a)
        '
    }
        split { field => "servers" }

        ruby {
        code => '
            event.get("servers").each { |k, v|
                event.set(k,v)
            }
            event.remove("servers")
        '
    }

            }

output {
      file {
       path => "/opt/gtal/iptal/elasticsearch/logs/mqa/rubydebug.txt"
       codec => rubydebug
     }


    elasticsearch {
     hosts => [ "xx-xxx-xxx:3045" ]
     user => "xxxxxx"
     password => "xxxxxxxxxxxxxxx"
         index => "demo-csv-%{+YYYY.MM.dd}"

  }
}

I also tried to add the following in the config as per https://discuss.elastic.co/t/only-string-and-array-types-are-splittable-field-splitedjson-is-of-type-nilclass/191725/2

if [message] drop {}

but still the error remains.

Please guide.

Thanks

The first "undefined method" error is happening for one of

            userCalls = event.get("[@metadata][parsedUserCalls]")
            totalCalls = event.get("[@metadata][parsedTotalCalls]")

Once a ruby filter gets an exception the rest of the code is not executed (unless you catch the exception), so it never does

        event.set("servers", a)

That cause the split to fail, because the field it is trying to split is nil. Similarly for the ruby filter that moves things from the non-existent [servers] field.

The example code I wrote contains no error handling, including not verifying that data exists before trying to use it. In the real world you need to add the checks to handle missing data.

I suspect the grok is failing for your new file.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.