Filebeat to Logatsh, csv header d'ont send in first

Hi.

Description
I need import csv file logs to elasticsearch, for this, have 3 stack Filebeats > logstash > elasticsearch.
Filebeat read csv , datas is filter by logstash and elasticsearch is stores datas

Problem
The problem is that filebeat seems to send them in the disorder, except that logstash wait a header to name the columns the of csv, but the header is not the first data received by logstash.

Is the filebeat > logstash > elasticsearch configuration a good idea, and if so how can I solve the header problem?

My config

filebeat.yml

    filebeat.inputs:
      - type: log
      paths:
      - /usr/share/filebeat/shared/*.csv

    output.logstash:
      hosts: ["logstash:5044"]

pipeline.conf

    input {
        beats {
            port => "5044"
        }
    }

    filter {
        csv {
            autodetect_column_names => true
            autogenerate_column_names => true
            convert => {
                "id" => "integer"
                "user_id" => "integer"
                "objectId" => "integer"
                "createAt" => "date_time"
            }

        }
    }

    output {
        file {
            path => "./output.txt"
            codec => line { format => "%{message}"}
        }
        stdout { codec => rubydebug }
    }

input file

id,user_id,objectId,classname,status,createdAt,shortMessage,message
1,17,17,"Entity\User",status.connection,"2020-10-01 07:26:59","string.","stringFormatJson"
2,7,7,"Entity\User",status.connection,"2020-10-01 07:39:02","string","stringFormatJson"
3,23,23,"Entity\User",status.connection,"2020-10-01 07:39:52","string","stringFormatJson"
4,14,14,"Entity\User",status.connection,"2020-10-01 07:52:03","string","stringFormatJson"
5,21,21,"Entity\User",status.connection,"2020-10-01 08:12:40","string","stringFormatJson"
6,24,24,"Entity\User",status.connection,"2020-10-01 08:13:02","string","stringFormatJson"
7,25,25,"Entity\User",status.connection,"2020-10-01 08:26:36","string","stringFormatJson"
8,16,16,"Entity\User",status.connection,"2020-10-01 08:28:37","string","stringFormatJson"
9,16,72366,"Entity\SaleInvoice",status.update,"2020-10-01 08:30:58","string","stringFormatJson"

output stdout for only one data

{
                  "input" => {
        "type" => "log"
    },
       "stringFormatJson" => "stringFormatJson",
             "@timestamp" => 2020-11-30T15:05:19.603Z,
                     "25" => "21",
               "@version" => "1",
                    "ecs" => {
        "version" => "1.6.0"
    },
                   "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
                "message" => "5,21,21,\"Entity\\User\",status.connection,\"2020-10-01 08:12:40\",\"string\",\"stringFormatJson\"",
      "status.connection" => "status.connection",
                      "7" => "5",
                 "string" => "string",
                  "agent" => {
             "version" => "7.10.0",
                "name" => "fcc9183acc57",
                "type" => "filebeat",
            "hostname" => "fcc9183acc57",
                  "id" => "dd75d4f4-b305-488a-8262-7d4b492928dc",
        "ephemeral_id" => "300f7a36-94ec-40db-9a64-4e55c14a55c6"
    },
                   "host" => {
        "name" => "fcc9183acc57"
    },
                    "log" => {
        "offset" => 427,
          "file" => {
            "path" => "/usr/share/filebeat/shared/logs-Oct-10-2020.csv"
        }
    },
           "Entity\\User" => "Entity\\User",
    "2020-10-01 08:26:36" => "2020-10-01 08:12:40"
}

out stdout header csv read as data

{
                  "input" => {
        "type" => "log"
    },
       "stringFormatJson" => "message",
             "@timestamp" => 2020-11-30T15:05:19.603Z,
                     "25" => "objectId",
               "@version" => "1",
                "message" => "id,user_id,objectId,classname,status,createdAt,shortMessage,message",
                   "tags" => [
        [0] "beats_input_codec_plain_applied"
    ],
                    "ecs" => {
        "version" => "1.6.0"
    },
      "status.connection" => "status",
                      "7" => "id",
                 "string" => "shortMessage",
                  "agent" => {
                "name" => "fcc9183acc57",
             "version" => "7.10.0",
                "type" => "filebeat",
                  "id" => "dd75d4f4-b305-488a-8262-7d4b492928dc",
            "hostname" => "fcc9183acc57",
        "ephemeral_id" => "300f7a36-94ec-40db-9a64-4e55c14a55c6"
    },
                    "log" => {
        "offset" => 0,
          "file" => {
            "path" => "/usr/share/filebeat/shared/logs-Oct-10-2020.csv"
        }
    },
                   "host" => {
        "name" => "fcc9183acc57"
    },
           "Entity\\User" => "classname",
    "2020-10-01 08:26:36" => "createdAt"
}

file output csv format

csv not first line

5,21,21,"Entity\User",status.connection,"2020-10-01 08:12:40","string","stringFormatJson"
4,14,14,"Entity\User",status.connection,"2020-10-01 07:52:03","string","stringFormatJson"
3,23,23,"Entity\User",status.connection,"2020-10-01 07:39:52","string","stringFormatJson"
2,7,7,"Entity\User",status.connection,"2020-10-01 07:39:02","string","stringFormatJson"
id,user_id,objectId,classname,status,createdAt,shortMessage,message
8,16,16,"Entity\User",status.connection,"2020-10-01 08:28:37","string","stringFormatJson"
6,24,24,"Entity\User",status.connection,"2020-10-01 08:13:02","string","stringFormatJson"
1,17,17,"Entity\User",status.connection,"2020-10-01 07:26:59","string.","stringFormatJson"
9,16,72366,"Entity\SaleInvoice",status.update,"2020-10-01 08:30:58","string","stringFormatJson"

currently it does not send data to elasticsearch because the pipeline configuration is not yet good

Thx by advance

If you want logstash to preserve the order of events you must set pipeline.workers to 1. In future versions (8.x) you may also need to set pipeline.ordered to true (in 7.x it defaults to true if pipeline.workers is 1).

Thx. I will try now, but what will be the effect on the speed of importing data?
It will simply be necessary that the header is the 1st given that logstash receives so that it is able to name the columns correctly.

logstash will be limited to processing the filters in a single thread, so it does not scale with the number of CPUs.

Ok it works, I don't understand why, but it's the solution.
Thank you very much!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.