Setting Up Logstash In Docker-Compose For Bulk Ingest Of CSV Files In Local Machine

Okay I don't know how each of the _source ended up being different. This is crazy

I didn't save my data as .xlsx . I always ensured to never save my opened csv files if I viewed them thru Excel

That was me showing you an example to demonstrate that the \r definitely is a problem. I did 1 with the \r and one without..

That data is definitely coming in with \r from your CSV .. it is right there in your logstash debug above...

Not Sure what is happening... but there is some "entropy" going on :slight_smile:

1 Like

Before description keyword

2023-11-17 11:16:45               "host" => {
2023-11-17 11:16:45         "name" => "94e10b4ac1ae"
2023-11-17 11:16:45     },
2023-11-17 11:16:45                "log" => {
2023-11-17 11:16:45         "file" => {
2023-11-17 11:16:45             "path" => "/usr/share/logstash/csv_files/events2022-01-01.csv"
2023-11-17 11:16:45         }
2023-11-17 11:16:45     },
2023-11-17 11:16:45     "graphicelement" => "-1",
2023-11-17 11:16:45         "sourcetime" => "2022-01-01 01:03:31.885",
2023-11-17 11:16:45         "@timestamp" => 2023-11-17T03:16:38.290874500Z,
2023-11-17 11:16:45               "zone" => "-1",
2023-11-17 11:16:45          "equipment" => "90370/RSC/DRS/ALL",
2023-11-17 11:16:45          "subsystem" => "DRS",
2023-11-17 11:16:45             "system" => "RSC",
2023-11-17 11:16:45           "uniqueid" => "OCC_11668339",
2023-11-17 11:16:45           "operator" => "null",
2023-11-17 11:16:45         "alarmvalue" => "0",
2023-11-17 11:16:45              "alarm" => "OCC_0",
2023-11-17 11:16:45          "eventtype" => "DIAG_IAllDoorModeStatus[1]",
2023-11-17 11:16:45              "event" => {
2023-11-17 11:16:45         "original" => "11668339,OCC_11668339,OCC_0,DIAG_IAllDoorModeStatus[1],RSC,DRS,2022-01-01 01:03:31.885,null,0,CLOSED & LOCKED,90370/RSC/DRS/ALL,90370,0,Summary of Train Doors Status with Closed & Locked,5,0,-1,-1"
2023-11-17 11:16:45     },
2023-11-17 11:16:45            "message" => "11668339,OCC_11668339,OCC_0,DIAG_IAllDoorModeStatus[1],RSC,DRS,2022-01-01 01:03:31.885,null,0,CLOSED & LOCKED,90370/RSC/DRS/ALL,90370,0,Summary of Train Doors Status with Closed & Locked,5,0,-1,-1",
2023-11-17 11:16:45              "state" => "5",
2023-11-17 11:16:45           "severity" => "0",
2023-11-17 11:16:45                 "id" => "11668339",
2023-11-17 11:16:45              "value" => "CLOSED & LOCKED",
2023-11-17 11:16:45           "location" => "90370",
2023-11-17 11:16:45        "description" => "Summary of Train Doors Status with Closed & Locked",
2023-11-17 11:16:45           "mmsstate" => "0",
2023-11-17 11:16:45           "@version" => "1"
2023-11-17 11:16:45 }

After description keyword (Now)

{
       "message" => "54161229,CCC_54161229,CCC_0,DIAG_ISkipStOdrRecAto,SIG,CTC,2023-01-01 01:51:51.515,null,0,SKIP ACTIVE,CC062/CCG/AAA/TDMS_SYS,90062,0,Skip ddd reported by AAA,5,0,-1,-1",  
          "host" => {
        "name" => "3ddc142e035e"
    },
           "log" => {
        "file" => {
            "path" => "/usr/share/logstash/ats-logs-mainline/2023/01-Jan-23/events2023-01-01.csv"
        }
    },
         "event" => {
        "original" => "54161229,CCC_54161229,CCC_0,DIAG_ISkipStOdrRecAto,SIG,CTC,2023-01-01 01:51:51.515,null,0,SKIP ACTIVE,CC062/CCG/AAA/TDMS_SYS,90062,0,Skip ddd reported by AAA,5,0,-1,-1" 
    },
    "@timestamp" => 2023-11-22T18:53:08.192485038Z,
      "@version" => "1"
}

Now I am getting traction to ingest files via original csv.

Curious but I noticed that now the file ingesting result on the console is different from how it was originally.

Is this because I made description keyword ?

So if you are seeing this out of logstash it means you still have the filter section in the logstash.conf because logstsash SHOULD not be doing the CSV... the ingest pipeline it... I think you are loading old code... NOTE My logsstash.conf.

input { 
    file { 
        path => "/usr/share/logstash/csv_files/events2023-06-01.csv"
        start_position => "beginning" 
        sincedb_path => "/dev/null"
    } 
}

output { 
    elasticsearch {
        index => "ats-events-2023-06" 
        hosts => ["https://es01:9200"]
        user => "elastic"
        password => "mypassword"
        pipeline => "ats-events-pipeline"
        ssl_verification_mode=> "none"
    }
    stdout{ codec => "rubydebug"} 
}

and yes the output of logstash should like...

logstash-logstash-1  | {
logstash-logstash-1  |       "@version" => "1",
logstash-logstash-1  |          "event" => {
logstash-logstash-1  |         "original" => "3432482,CCC_3432482,CCC_0,DIAG_IHndovr1Radio,SSS,ATC,2023-06-01 00:00:04.614,null,0,ALARM,xx022/SSS/ATC/AAA_SYS,xx022,0,Auto Norm: performed handover with one radio only,5,0,-1,-1"
logstash-logstash-1  |     },
logstash-logstash-1  |        "message" => "3432482,CCC_3432482,CCC_0,DIAG_IHndovr1Radio,SSS,ATC,2023-06-01 00:00:04.614,null,0,ALARM,xx022/SSS/ATC/AAA_SYS,xx022,0,Auto Norm: performed handover with one radio only,5,0,-1,-1",
logstash-logstash-1  |           "host" => {
logstash-logstash-1  |         "name" => "6e71e0dd4966"
logstash-logstash-1  |     },
logstash-logstash-1  |            "log" => {
logstash-logstash-1  |         "file" => {
logstash-logstash-1  |             "path" => "/usr/share/logstash/csv_files/events2023-06-01.csv"
logstash-logstash-1  |         }
logstash-logstash-1  |     },
logstash-logstash-1  |     "@timestamp" => 2023-11-22T19:08:27.540865711Z
logstash-logstash-1  | }

NOTE : Unparsed, because the parsing happens in the ingest pipeline...

Do you understand that we are NOT using logstash to parse the CSV...

The logstash simpl reads the file line by line and sends the message fields (and some others) to elasicsearch and the ingest pipeline running in elasticsearch parses the CSV...

I am not sure what you are doing... I have given you the exact code ... each time...

Gotta run... good luck.

I swear by it. I was never on the filters the past few hours.

Thanks for the tip

{
    "@timestamp" => 2023-11-22T19:13:06.671293416Z,
          "host" => {
        "name" => "9ad435a480de"
    },
       "message" => "54147799,OCC_54147799,OCC_1442517,ALARM_NORMALIZED,RSC,ATI,2023-01-01 00:12:05.168,null,1,NORMAL,90712/RSC/ATI,90712,3,ATI System Mode,2,4,-1,-1\r",
      "@version" => "1",
         "event" => {
        "original" => "54147799,OCC_54147799,OCC_1442517,ALARM_NORMALIZED,RSC,ATI,2023-01-01 00:12:05.168,null,1,NORMAL,90712/RSC/ATI,90712,3,ATI System Mode,2,4,-1,-1\r"
    },
           "log" => {
        "file" => {
            "path" => "/usr/share/logstash/ats-logs-mainline/2023/01-Jan-23/events2023-01-01-test.csv"
        }
    }
}

I can confirm the root cause is due to the stupid

The addition of the extra \r (carriage return) in the CSV file might have occurred due to the default behavior of the csv.writer() method when writing rows using the csv module.

Now spending time trying to improve on my python script such that \r is not appended at the end of every row.

pandas always does this.

do u know of any other library that doesn't append \r ?

You can use regex to replace or gub in LS.

I found this from CSV Filter - Quote character causing _csvparsefailure - Elastic Stack / Logstash - Discuss the Elastic Stack

Mutate filter plugin | Logstash Reference [8.11] | Elastic

Here is my improvising in my use-case.

I wrote this in logstash config. "graphicelement" is where I have the \r and I replace it with nothing hence "".

input { 
    file { 
        path => "/usr/share/logstash/ats-logs-mainline/2023/01-Jan-23/formatted_events2023-01-01.csv"
        start_position => "beginning" 
    } 
    mutate {
            gsub => ["graphicelement", "\r", ""]
       }
}

Oh no I need help to install my mutate filter plugin. I'm not sure how to do it I just read the readme. My use case is within docker environment.

The mutate filter is bundled by default, you do not need to install it. Note, however, that because you are parsing the csv in the ingest pipeline the [graphicelelement] field will not exist in logstash, you will need to mutate the [message] field.

1 Like

Edited to [message] - makes sense. Coz we are doing the mutate on the logstash output to which the json output has the message: xxxx

[2023-11-23T17:15:13,140][ERROR][logstash.plugins.registry] Unable to load plugin. {:type=>"input", :name=>"mutate"}

I think I'm putting the mutate json in the wrong place.

I actually have pipeline template where the filtering happens. Am i supposed to put it there?

EDIT: console doesn't allow.

Correct. I logically reasoned filter.

Thanks tho. Do bear with me. Sometimes still clueless on where certain parts are supposed to go (to which files?)

Indeed, the mutate filter needs to be in the filter section, not the input section.

input { 
    file { 
        path => "/usr/share/logstash/ats-logs-mainline/2023/01-Jan-23/formatted_events2023-01-01.csv"
        start_position => "beginning" 
    } 
}
filter { 
    mutate {
            gsub => ["message", "\r", ""]
       }
}
[2023-11-23T17:46:56,709][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of [ \\t\\r\\n], \"#\", \"{\" at line 10, column 7 (byte 256) after filter", :backtrace=>["/usr/share/logstash/logstash-core/lib/logstash/compiler.rb:32:in `compile_imperative'", "org/logstash/execution/AbstractPipelineExt.java:239:in `initialize'", "org/logstash/execution/AbstractPipelineExt.java:173:in `initialize'", "/usr/share/logstash/logstash-core/lib/logstash/java_pipeline.rb:48:in `initialize'", "org/jruby/RubyClass.java:931:in `new'", "/usr/share/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:49:in `execute'", "/usr/share/logstash/logstash-core/lib/logstash/agent.rb:386:in `block in converge_state'"]}

Now I get this.

EDIT: My bad. I wrote filters and not filterstrong text

OK, so logstash is objecting to whatever comes immediately after the word filter in your configuration. So what does the configuration look like?

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.