Logstash csv filter unable to handle newline in CSV

Hello,

i have a csv data with newline in some of the column data.
Whenever the data start with ", it contain newline.

testing2.csv

date,name,address
5/23/2024,"Lee123
Hello3",Tanjung789
5/24/2024,Lee124,Tanjung790
5/25/2024,Lee125,Tanjung791
5/26/2024,Lee126,Tanjung792
5/27/2024,Lee127,Tanjung793
5/28/2024,"Lee128
Joke",Tanjung794

i tried multiple method but still fail to output the correct data.
Method 1
this method, the script is stuck at

input {
    file {
        path => "C:\Users\ABCD\Desktop\ELK\logstash-8.6.2\testing2.csv"
        start_position => "beginning"
        sincedb_path => "NUL"
    codec => multiline {
      pattern => "^\\d{1,2}/\\d{1,2}/\\d{4},"  # Example pattern for matching records
      negate => "true"
      what => "previous"
    }
  }
}

filter {
  csv {
    separator => ","
    quote_char => '"'
    columns => ["date", "name", "address"]
  }
}
output {

    file{
        path => "./test.json"
    }
}

Method 2

This method show Error parsing csv.
[Error parsing csv {:field=>"message", :source=>"Thong",Tanjung813\r", :exception=>#<CSV::MalformedCSVError: Illegal quoting in line 1.>} ]


input {
    file {
        path => "C:/Users/ABCD/Downloads/testing2.csv"
        start_position => "beginning"
        sincedb_path => "NUL"
        
    }
}

filter {

    csv {

        separator => ","
        skip_header => "true"
        columns => ["date","name","address"]
        quote_char => '"'
        #autodetect_column_names => "true"

        
    }
    mutate {
        remove_field => ["message"]  # Remove the original message field if not needed
    }
}

output {

    file{
        path => "./test.json"
    }
}

Method 3

The method 3 is a very simple approach.


input {
    file {
        path => "C:\Users\ABCD\Desktop\ELK\logstash-8.6.2\testing2.csv"
        start_position => "beginning"
        sincedb_path => "NUL"
        
    }
}

filter {

    csv {

        separator => ","
        skip_header => "true"
        columns => ["date","name","address"]
        #autodetect_column_names => "true"

        
    }
    mutate {
        remove_field => ["message"]  # Remove the original message field if not needed
    }
}

output {

    file{
        path => "./test.json"
    }
}

Below output is from the method 3.

{"host":{"name":"SUPER_PC"},"@version":"1","event":{"original":"6/5/2024,Lee136,Tanjung802\r"},"name":"Lee136","address":"Tanjung802","log":{"file":{"path":"C:/Users/LEL2PG/Downloads/testing2.csv"}},"@timestamp":"2024-09-10T11:31:25.082263Z","date":"6/5/2024"}
{"host":{"name":"SUPER_PC"},"@version":"1","event":{"original":"6/19/2024,Lee150,Tanjung816\r"},"name":"Lee150","address":"Tanjung816","log":{"file":{"path":"C:/Users/LEL2PG/Downloads/testing2.csv"}},"@timestamp":"2024-09-10T11:31:25.086262Z","date":"6/19/2024"}
{"host":{"name":"SUPER_PC"},"@version":"1","event":{"original":"5/27/2024,Lee127,Tanjung793\r"},"name":"Lee127","address":"Tanjung793","log":{"file":{"path":"C:/Users/LEL2PG/Downloads/testing2.csv"}},"@timestamp":"2024-09-10T11:31:25.081263400Z","date":"5/27/2024"}
{"host":{"name":"SUPER_PC"},"@version":"1","event":{"original":"6/10/2024,Lee141,Tanjung807\r"},"name":"Lee141","address":"Tanjung807","log":{"file":{"path":"C:/Users/LEL2PG/Downloads/testing2.csv"}},"@timestamp":"2024-09-10T11:31:25.083262500Z","date":"6/10/2024"}
{"host":{"name":"SUPER_PC"},"@version":"1","event":{"original":"5/24/2024,Lee124,Tanjung790\r"},"name":"Lee124","address":"Tanjung790","log":{"file":{"path":"C:/Users/LEL2PG/Downloads/testing2.csv"}},"@timestamp":"2024-09-10T11:31:25.080264100Z","date":"5/24/2024"}
{"host":{"name":"SUPER_PC"},"@version":"1","event":{"original":"6/8/2024,Lee139,Tanjung805\r"},"name":"Lee139","address":"Tanjung805","log":{"file":{"path":"C:/Users/LEL2PG/Downloads/testing2.csv"}},"@timestamp":"2024-09-10T11:31:25.083262500Z","date":"6/8/2024"}
{"host":{"name":"SUPER_PC"},"@version":"1","event":{"original":"6/22/2024,Lee153,Tanjung819\r"},"name":"Lee153","address":"Tanjung819","log":{"file":{"path":"C:/Users/LEL2PG/Downloads/testing2.csv"}},"@timestamp":"2024-09-10T11:31:25.086262Z","date":"6/22/2024"}
{"host":{"name":"SUPER_PC"},"@version":"1","event":{"original":"6/4/2024,Lee135,Tanjung801\r"},"name":"Lee135","address":"Tanjung801","log":{"file":{"path":"C:/Users/LEL2PG/Downloads/testing2.csv"}},"@timestamp":"2024-09-10T11:31:25.082263Z","date":"6/4/2024"}
{"host":{"name":"SUPER_PC"},"@version":"1","event":{"original":"6/18/2024,Lee149,Tanjung815\r"},"name":"Lee149","address":"Tanjung815","log":{"file":{"path":"C:/Users/LEL2PG/Downloads/testing2.csv"}},"@timestamp":"2024-09-10T11:31:25.084262700Z","date":"6/18/2024"}
{"host":{"name":"SUPER_PC"},"@version":"1","event":{"original":"5/29/2024,Lee129,Tanjung795\r"},"name":"Lee129","address":"Tanjung795","log":{"file":{"path":"C:/Users/LEL2PG/Downloads/testing2.csv"}},"@timestamp":"2024-09-10T11:31:25.081263400Z","date":"5/29/2024"}
{"host":{"name":"SUPER_PC"},"@version":"1","event":{"original":"6/13/2024,Lee144,Tanjung810\r"},"name":"Lee144","address":"Tanjung810","log":{"file":{"path":"C:/Users/LEL2PG/Downloads/testing2.csv"}},"@timestamp":"2024-09-10T11:31:25.084262700Z","date":"6/13/2024"}
{"host":{"name":"SUPER_PC"},"tags":["_csvparsefailure"],"@version":"1","event":{"original":"5/28/2024,\"Lee128"},"log":{"file":{"path":"C:/Users/LEL2PG/Downloads/testing2.csv"}},"@timestamp":"2024-09-10T11:31:25.081263400Z"}


The task i need to perform is to digest daily generated csv file and upload it to elastic. Please help me on this, or if there is other elastic tools that able to handle this "daily automated upload" please do let me know. Thanks so much in advance.

Change that to ^\d{1,2}/\d{1,2}/\d{4}, and it works

{
   "message" => "5/23/2024,\"Lee123\nHello3\",Tanjung789",
      "date" => "5/23/2024",
      "name" => "Lee123\nHello3",
      "tags" => [
    [0] "multiline"
],
   "address" => "Tanjung789"
}
{
   "message" => "5/24/2024,Lee124,Tanjung790",
      "date" => "5/24/2024",
      "name" => "Lee124",
   "address" => "Tanjung790"
}

wow that was fast. Thanks so much for the help. By the way did you configure anything besides that pattern?


input {
    file {
        path => "C:\Users\ABCD\Desktop\ELK\logstash-8.6.2\testing2.csv"
        start_position => "beginning"
        sincedb_path => "NUL"
        codec => multiline {
        pattern => "^\\d{1,2}/\\d{1,2}/\\d{4},"  # Updated Here
        negate => "true"
        what => "previous"
        }
    }
}

filter {

    csv {
    separator => ","
    quote_char => '"'
    columns => ["date", "name", "address"]
        
    }
    
}

output {

    file{
        path => "./test.json"
    }
}

the terminal did not show anything after the pipelines running.

Sending Logstash logs to C:/Users/ABCD/Desktop/ELK/logstash-8.6.2/logs which is now configured via log4j2.properties
[2024-09-10T23:10:57,711][INFO ][logstash.runner          ] Log4j configuration path used is: C:\Users\ABCD\Desktop\ELK\logstash-8.6.2\config\log4j2.properties
[2024-09-10T23:10:57,718][WARN ][logstash.runner          ] The use of JAVA_HOME has been deprecated. Logstash 8.0 and later ignores JAVA_HOME and uses the bundled JDK. Running Logstash with the bundled JDK is recommended. The bundled JDK has been verified to work with each specific version of Logstash, and generally provides best performance and reliability. If you have compelling reasons for using your own JDK (organizational-specific compliance requirements, for example), you can configure LS_JAVA_HOME to use that version instead.
[2024-09-10T23:10:57,720][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"8.6.2", "jruby.version"=>"jruby 9.3.10.0 (2.6.8) 2023-02-01 107b2e6697 OpenJDK 64-Bit Server VM 17.0.6+10 on 17.0.6+10 +indy +jit [x86_64-mswin32]"}
[2024-09-10T23:10:57,722][INFO ][logstash.runner          ] JVM bootstrap flags: [-Xms1g, -Xmx1g, -Djava.awt.headless=true, -Dfile.encoding=UTF-8, -Djruby.compile.invokedynamic=true, -XX:+HeapDumpOnOutOfMemoryError, -Djava.security.egd=file:/dev/urandom, -Dlog4j2.isThreadContextMapInheritable=true, -Djruby.regexp.interruptible=true, -Djdk.io.File.enableADS=true, --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED, --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED, --add-opens=java.base/java.security=ALL-UNNAMED, --add-opens=java.base/java.io=ALL-UNNAMED, --add-opens=java.base/java.nio.channels=ALL-UNNAMED, --add-opens=java.base/sun.nio.ch=ALL-UNNAMED, --add-opens=java.management/sun.management=ALL-UNNAMED]
[2024-09-10T23:10:57,768][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2024-09-10T23:10:59,648][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600, :ssl_enabled=>false}
[2024-09-10T23:11:00,037][INFO ][org.reflections.Reflections] Reflections took 168 ms to scan 1 urls, producing 127 keys and 444 values
[2024-09-10T23:11:01,592][INFO ][logstash.codecs.jsonlines] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)
[2024-09-10T23:11:01,619][INFO ][logstash.javapipeline    ] Pipeline `main` is configured with `pipeline.ecs_compatibility: v8` setting. All plugins in this pipeline will default to `ecs_compatibility => v8` unless explicitly configured otherwise.
[2024-09-10T23:11:01,635][INFO ][logstash.filters.csv     ][main] ECS compatibility is enabled but `target` option was not specified. This may cause fields to be set at the top-level of the event where they are likely to clash with the Elastic Common Schema. It is recommended to set the `target` option to avoid potential schema conflicts (if your data is ECS compliant or non-conflicting, feel free to ignore this message)
[2024-09-10T23:11:01,649][INFO ][logstash.javapipeline    ][main] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>16, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50, "pipeline.max_inflight"=>2000, "pipeline.sources"=>["C:/Users/ABCD/Desktop/ELK/logstash-8.6.2/logstash_dng_v2.conf"], :thread=>"#<Thread:0x92a0dac@C:/Users/ABCD/Desktop/ELK/logstash-8.6.2/logstash-core/lib/logstash/java_pipeline.rb:131 run>"}
[2024-09-10T23:11:03,828][INFO ][logstash.javapipeline    ][main] Pipeline Java execution initialization time {"seconds"=>2.18}
[2024-09-10T23:11:03,875][INFO ][logstash.javapipeline    ][main] Pipeline started {"pipeline.id"=>"main"}
[2024-09-10T23:11:03,877][INFO ][filewatch.observingtail  ][main][8d6f83017ac8e57b5fbb84f15d6f06b6e0e068983c172ca0a69149b71f239e36] START, creating Discoverer, Watch with file and sincedb collections
[2024-09-10T23:11:03,907][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}

I tested on UNIX so I had to change the path and NUL values, but nothing else.

The console log will not show anything at INFO level if the pipeline is successfully processing events, so that's normal.

By the way, when using negate => true the codec does not create an event until there is a matching line. When your pattern is wrong that means it just never outputs anything until it accumulates enough lines to hit the limit on lines or bytes,

If the pattern is correct then if the last record in the CSV file contains a newline it will never get sent to the pipeline, because there is no matching line following it. You can fix this by setting the auto_flush_interval option.

Hi,
Currently the first output is correct and that is what i wanted. But somehow the logstash only able to did it once and done.


input {
    file {
        path => "C:/Users/ABCD/Desktop/ELK/logstash-8.6.2/testing2.csv"
        start_position => "beginning"
        sincedb_path => "NUL"
        codec => multiline {
            pattern => "^\\d{1,2}/\\d{1,2}/\\d{4},"  # Updated Here.
            negate => "true"
            what => "previous"
        }
    }
}

filter {



    csv {
        separator => ","
         columns => ["date","name","address"]
        
    }
   
}

output {

    file{
        path => "./testing.json"
    }
    stdout { codec => rubydebug } # Added this to show debug message.
}

Now the logstash read everything as one line and give me only one row of data.

[2024-09-11T00:28:46,768][INFO ][logstash.outputs.file    ][main][c61c7f3ccb67bb630ef958abf2e06caeb336881fe37422b5dbbac6387a0f3ee8] Opening file {:path=>"C:/Users/ABCD/Desktop/ELK/logstash-8.6.2/testing.json"}
{
          "date" => "5/23/2024",
         "event" => {
        "original" => "5/23/2024,\"Lee123\r\nHello3\",Tanjung789\r\n5/24/2024,Lee124,Tanjung790\r\n5/25/2024,Lee125,Tanjung791\r\n5/26/2024,Lee126,Tanjung792\r\n5/27/2024,Lee127,Tanjung793\r\n5/28/2024,\"Lee128\r"
    },
    "@timestamp" => 2024-09-10T16:28:46.618225200Z,
      "@version" => "1",
       "message" => "5/23/2024,\"Lee123\r\nHello3\",Tanjung789\r\n5/24/2024,Lee124,Tanjung790\r\n5/25/2024,Lee125,Tanjung791\r\n5/26/2024,Lee126,Tanjung792\r\n5/27/2024,Lee127,Tanjung793\r\n5/28/2024,\"Lee128\r",
          "host" => {
        "name" => "SUPER_PC"
    },
           "log" => {
        "file" => {
            "path" => "C:/Users/ABCD/Desktop/ELK/logstash-8.6.2/testing2.csv"
        }
    },
          "name" => "Lee123\r\nHello3",
       "address" => "Tanjung789",
          "tags" => [
        [0] "multiline"
    ]
}
[2024-09-11T00:29:01,020][INFO ][logstash.outputs.file    ][main][c61c7f3ccb67bb630ef958abf2e06caeb336881fe37422b5dbbac6387a0f3ee8] Closing file C:/Users/ABCD/Desktop/ELK/logstash-8.6.2/testing.json