Logstash 2.3 CSV import Error

Importing CSV file to index my_index-[0-9] (0-9 is the last number from "owner_id", i have 10 indices, it's just based on the last digit so..)

input{
    file{
        path => "/data/logstash-2.3.1/conf/export.csv"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}
filter {

   csv {
        columns => ["order_id", "owner_id", "execution_date", "creation_date", "authorization_date", "description", "payer_name", "payer_account_id", "payee_name", "payee_account_id", "amount", "currency", "status", "type", "sub_type", "svt_id", "payer_iban", "payee_iban"]
        separator => ";"
        skip_empty_columns => true
        convert => {
                "sub_type" => "integer"
                
        }
    }

    grok {
            match => ["owner_id", "(?<index_number>.$)"]
            }
            mutate {
                    add_field => {
                            "[@metadata][index_number]" => "%{index_number}"
                    }
            }
    
            mutate {
                    remove_field => ["index_number", "@timestamp", "@version"]
            }
    }

output {
        elasticsearch {
                hosts => ["localhost:9200"]
                index => "my_index-%{[@metadata][index_number]}"
                document_id => "%{order_id}"
                routing => "%{owner_id}"
                document_type => "order_item"
                flush_size => 4000
        }
}

This is how my csv file looks like:

"20190616000439995800";"0911978367901";"2019-06-16";16.06.2019 12:33:57;16.06.2019 12:33:57;"Description_something 1. b/1. c-06-2019";"JOHN DOE";"3200371523";"CITY NAME";"HH04040405933282134";462;"191";"PRO";"PMT";1;"MobileVersion";"HX3331234594358234";""

Keep getting ERROR:

"status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse [creation_date]", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"Invalid format: "16.06.2019 15:57:37" is malformed at ".06.2019 15:57:37""}}}}, :level=>:warn}

In rubydebug I can see most of the lines of csv are populated correctly, but plenty of them looks like this:

"message" => ""20190616000440074438";"1103963335145";"2019-06-17";16.06.2019 20:19:13;16.06.2019 20:46:55;"Description for 5/2019\r",
"path" => "/data/logstash-2.3.1/conf/export.csv",
"host" => "myhosturl",
"tags" => [
[0] "_csvparsefailure",
[1] "_grokparsefailure"
]

0 documents are being written in Elasticsearch (it's also 2.3.1 - same as logstash). Am I missing something in configuration?

Here's the mapping for one of the indices:

{
  "my_index-9": {
    "mappings": {
      "order_item": {
        "_routing": {
          "required": true
        },
        "properties": {
          "amount": {
            "type": "double"
          },
          "authorization_date": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis"
          },
          "creation_date": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis"
          },
          "currency": {
            "type": "string",
            "index": "not_analyzed"
          },
          "description": {
            "type": "string",
            "analyzer": "custom_analyzer"
          },
          "execution_date": {
            "type": "date",
            "format": "strict_date_optional_time||epoch_millis"
          },
          "host": {
            "type": "string",
            "index": "not_analyzed"
          },
          "owner_id": {
            "type": "string",
            "index": "not_analyzed"
          },
          "path": {
            "type": "string",
            "index": "not_analyzed"
          },
          "payee_account_id": {
            "type": "string",
            "index": "not_analyzed"
          },
          "payee_name": {
            "type": "string",
            "analyzer": "custom_analyzer"
          },
          "payer_account_id": {
            "type": "string",
            "index": "not_analyzed"
          },
          "payer_name": {
            "type": "string",
            "analyzer": "custom_analyzer"
          },
          "status": {
            "type": "string",
            "index": "not_analyzed"
          },
          "sub_type": {
            "type": "integer"
          },
          "svt_id": {
            "type": "string",
            "index": "not_analyzed"
          },
          "type": {
            "type": "string",
            "index": "not_analyzed"
          }
        }
      }
    }
  }
}

Your mapping has

 "format": "strict_date_optional_time||epoch_millis"

strict_date_optional_time supports a wide variety of formats, but I do not think "MM.dd.YYYY HH:mm:ss" is one of them, so it raises an exception as soon as it sees the first period. You need to update your mapping to have the right format.

That appears to have unbalanced double quotes.

I tried to add filter date to match my execution_date and authorization_date, but it is not helping.

        date {
                match => [ "creation_date", "dd.MM.yyyy HH:mm:ss" ]
                 }
        date {
                match => [ "authorization_date", "dd.MM.yyyy HH:mm:ss"]
        }

Is there any way to convert this dd.MM.yyyy HH:mm:ss format to proper Elasticsearch timestamp format, because I have another documents in index with:

"creation_date": "2017-05-11T22:00:00.000Z",
 "authorization_date": "2017-05-11T22:00:00.000Z",

About double "", I don't see them in my original CSV file?!

That will overwrite the @timestamp field twice. To overwrite a field you have to specify the target, for example

date {
    match => [ "creation_date", "dd.MM.yyyy HH:mm:ss" ]
    target => "creation_date"
}

You might need to roll to a new daily index for that to start working.

Still not getting through:

{:timestamp=>"2019-06-18T00:13:43.790000+0200", :message=>"Error parsing csv", :field=>"message", :source=>"\"20190616000439997643\";\"1002992385029\";\"2019-06-17\";16.06.2019 12:39:04;16.06.2019 12:39:03;\"BILL for 4/2019\";\"MOCKED NAME\";\"3212211221\";\"SOME COMPANY GMBH\";\"XX9422221111100050899\";35,4;\"191\";\"PRO\";\"PMT\";1;\"VersionlMobile\";\"XR2223411122212110261\";\"\"\r", :exception=>#<CSV::MalformedCSVError: Illegal quoting in line 1.>, :level=>:warn}
{:timestamp=>"2019-06-18T00:13:43.850000+0200", :message=>"Failed action. ", :status=>404, :action=>["index", {:_id=>"%{order_id}", :_index=>"my_index-%{index_number}", :_type=>"order_item", :_routing=>"%{owner_id}"}, #<LogStash::Event:0x5daa0a48 @metadata_accessors=#<LogStash::Util::Accessors:0x12e5fad9 @store={"path"=>"/data/logstash-2.3.1/conf/export2.csv"

only suspicious thing I found is double ;; in some lines, removed those lines and tried again, same thing.. any ideas what to do? Is there any other way to import this csv?

This is your problem. The entire field has to be quoted, and your field has a trailing \r after the closing double quote. You can use mutate+gsub to strip it.

Unless you have config.support_escapes set you will need a literal Ctrl/M in the congfiguration, not "\r".

Well I used to change this csv with python:

csv = pd.read_csv("exp_full.csv", sep=";", skiprows=[0] )
csv.to_csv("new.csv", sep=";", encoding="utf-8", header=None )

Now date filter is working because I can get some data to ES, but for some lines there are still errors:

response=>{"index"=>{"_index"=>"my_index-5", "_type"=>"order_item", "_id"=>"20190616000440011228", "status"=>400, "error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse [amount]", "caused_by"=>{"type"=>"number_format_exception", "reason"=>"For input string: \"1,25\""}}}}, :level=>:warn}

and why am I seeing "message" field in ES, is that by default and I need to remove it, I want just a source (fields from CSV)

Well, where I live, "1,25" is not a number, it is a list of numbers. I do not know what your number format is, whether you just use comma as a decimal point, whether you always have 3 digits between commas, and so on, so I cannot suggest a solution. It might be as simple as using mutate+gsub to replace the comma with a period. It might require a ruby filter to parse the number in a specific locale. Not sure if you can tell elasticsearch to parse a string to a number in a specific locale or format.

And yes, for inputs that create a [message] field it will be sent to the output unless it is removed. I suggest you add this to your csv filter

remove_field => [ "message" ]

which will remove [message] unless there is a parse failure, so that you can look in ES and look for the [message] field which will only be present on messages with a _csvparsefailure tag. That way you have retention of them without having to trawl through the logstash logs.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.