Problems with CVS: first fieldname in CVS columns has the entire message

I am trying to parse a file delimited by @ characters. I am using a CSV filter, and it works properly for every field except the first field. The first field contains the entire message, including the delimiter.

curl -XGET 'http://10.42.42.242:9200/cld-logstash-2015.09.29/_search?q=message_ID="A7/4E-18701-275E8065"' | python -m json.tool 
...
        "hits": [
            {
                "_id": "AVAaod7y8PfMTcP3lBp_",
                "_index": "cld-logstash-2015.09.29",
                "_score": 0.37731525,
                "_source": {
                    "@timestamp": "2015-09-29T19:43:45.274Z",
                    "@version": "1",
                    "Result": "R",
                    **"Unix_timestamp": "1443423602@A7/4E-18701-275E8065@A7/4E-18701-275E8065@2F/AC-18701-275E8065@R@XXXXXXXX@disney.com@WDPRO-DailyReports@email.disney.com@10.198.64.156@1872@esmtp@default@default",**
                    "batch_ID": "A7/4E-18701-275E8065",
...
                    "localpart": "XXXXXXX",
                    "message": [
                        "1443423602@A7/4E-18701-275E8065@A7/4E-18701-275E8065@2F/AC-18701-275E8065@R@XXXXXXXXX@disney.com@WDPRO-DailyReports@email.disney.com@10.198.64.156@1872@esmtp@default@default"
                    ],
                    "message_ID": "A7/4E-18701-275E8065",
                    "protocol": "esmtp",
                    "remote_IP": "10.198.64.156",
...
                },
                "_type": "logs"
            }
        ],
...

My logstash configuration file is:

# logstash configuration file for messagesystems.com server
#
#
input { stdin { } }

filter {
  csv {
    separator => "@"
    columns => [ "Unix_timestamp", "message_ID", "batch_ID", "connection_ID", "Result", "f_5", "f_6", "f_7", "f_8", "f_9", "f_10", "f_11", "f_12", "f_13" ]
  }
  if [Result] == "R" {
     mutate {
       rename => [ "f_5", "localpart", "f_6", "destination_domain", "f_7", "sender_localpart", "f_8", "send_domain", "f_9", "remote_IP", "f_10", "size", "f_11", "protocol", "f_12", "binding_group", "f_13", "binding"]
    } 
  }
  if [Result] == "D" or [Result] == "X" {
#    csv {
#      columns => [ "Unix_timestamp", "message_ID", "batch_ID", "connection_ID", "Result", "domain", "size", "binding_group", "binding", "retries", "remote_id"]
#    }
     mutate {
       rename => [ "f_5", "domain", "f_6", "size", "f_7", "binding_group", "f_8", "binding", "f_9", "retries", "f_10", "delays", "f_11", "remote_ip"]
     }
  }
  else {
# This has not been converted yet, waiting for the D type to be verified and to find a log file with T type records in it
    csv {
      columns => [ "Unix_timestamp", "unused_1", "unused_2", "unused_3", "Result" ]
    }
  }
}

...

All of the fields are working properly except the first one. Importing into elasticsearch works properly, except that the problem with the first field shows up in the elasticsearch queries and kibana, which I guess you'd expect. Garbage in, Garbage out. Adding a "dummy" field throws everything else off by one.

I have discovered another interesting factoid: it isn't consistently wrong. I threw a bigger file at logstash and found:

curl -XGET 'http://10.42.42.242:9200/cld-logstash-2015.09.29/_search' | python -m json.tool | fgrep "Unix_timestamp"
                    "Unix_timestamp": "1443423601@37/4E-18701-175E8065@37/4E-18701-175E8065@EE/AC-18701-175E8065@R@ESPNAPI-DEV@espn.com@nap7-orion@starwave.com@10.198.1.23@1249@esmtp@default@default",
                    "Unix_timestamp": "1443510002",
                    "Unix_timestamp": "1443510002",
                    "Unix_timestamp": "1443510002@BA/2E-19080-2F63A065@BA/2E-19080-2F63A065@56/DA-19080-2F63A065@R@nanopy@nanopy.com@disneymovierewards@disneyhelp.com@10.198.114.51@21442@esmtp@gp_bg@gp_68.71.217.162",
                    "Unix_timestamp": "1443510002",
                    "Unix_timestamp": "1443423601@57/4E-18701-175E8065@47/4E-18701-175E8065@FE/AC-18701-175E8065@R@Studio.content.protection@disney.com@Studio.Content.Protection@disney.com@10.198.83.192@4256@esmtp@default@default",
                    "Unix_timestamp": "1443510002",
                    "Unix_timestamp": "1443423601@67/4E-18701-175E8065@67/4E-18701-175E8065@0F/AC-18701-175E8065@R@XXXXXXXXXX@disney.com@WDPRO-DailyReports@email.disney.com@10.198.64.156@4439@esmtp@default@default",
                    "Unix_timestamp": "1443423602@A7/4E-18701-275E8065@A7/4E-18701-275E8065@2F/AC-18701-275E8065@R@XXXXXXXXXX@disney.com@WDPRO-DailyReports@email.disney.com@10.198.64.156@1872@esmtp@default@default",

Also, the first field is unix time. Is there a way to convert it to a date/time in a filter?

Jeff Silverman
jeffrey.x.silverman.-nd@disney.com

I can't reproduce this with Logstash 1.5.4 and the following minimal configuration:

$ cat test.config
input { stdin { } }
output { stdout { codec => rubydebug } }
filter {
  csv {
    separator => "@"
    columns => ["Unix_timestamp", "message_ID", "batch_ID",
"connection_ID", "Result", "f_5", "f_6", "f_7", "f_8", "f_9", "f_10",
"f_11", "f_12", "f_13"]
  }
}
$ echo '1443423602@A7/4E-18701-275E8065@A7/4E-18701-275E8065@2F/AC-18701-275E8065@R@XXXXXXXXX@disney.com@WDPRO-DailyReports@email.disney.com@10.198.64.156@1872@esmtp@default@default' | /opt/logstash/bin/logstash -f test.config
Logstash startup completed
{
           "message" => [
        [0] "1443423602@A7/4E-18701-275E8065@A7/4E-18701-275E8065@2F/AC-18701-275E8065@R@XXXXXXXXX@disney.com@WDPRO-DailyReports@email.disney.com@10.198.64.156@1872@esmtp@default@default"
    ],
          "@version" => "1",
        "@timestamp" => "2015-09-30T06:24:26.589Z",
              "host" => "lnxmagnusbk",
    "Unix_timestamp" => "1443423602",
        "message_ID" => "A7/4E-18701-275E8065",
          "batch_ID" => "A7/4E-18701-275E8065",
     "connection_ID" => "2F/AC-18701-275E8065",
            "Result" => "R",
               "f_5" => "XXXXXXXXX",
               "f_6" => "disney.com",
               "f_7" => "WDPRO-DailyReports",
               "f_8" => "email.disney.com",
               "f_9" => "10.198.64.156",
              "f_10" => "1872",
              "f_11" => "esmtp",
              "f_12" => "default",
              "f_13" => "default"
}
Logstash shutdown completed

Also, the first field is unix time. Is there a way to convert it to a date/time in a filter?

That's what the date filter is for.

Magnus,

I detect the problem only on some records and not others. In the example I
showed, there are 9 records. 4 are correct and 4 are wrong. I don't know
why it is intermittent. I do know that intermittent problems are the
hardest to solve.

Do you have a suggestion on how I should proceed next?

Thank you for the suggestion for the date filter.

Jeff

Oh, right. That's really weird. I tried one of the strings that failed to parse in your output above but it worked nicely with my configuration. Sorry, I have no leads here (short of debugging the code).