How to parse csv via Elastic Agent?

Hi!
I want to collects csv logs and if I understand correct, I need to add new integration based on "Custom Logs"?
I'm not sure how to do it.

I have this config for logstash under conf.d folder and everything work fine:

input {
        file {
                path => "/opt/nsm/*"
                start_position => "beginning"
                sincedb_path => "/dev/null"
        }
}

filter {
        csv {
                separator => ","
                columns => ["Score, Source IP, Destination IP, Connections, Avg. Bytes, Total Bytes, TS Score, DS Score, Dur Score, Hist Score, Top Intvl"]
        }
}

output {
        elasticsearch {
                hosts => ["http://192.168.0.108:9200"]
        {
}

But now I need to put this config (or rewrite) somehow into Elastic Agent. How can I do it?

Related, but nearly a year old: How to integrate custom logs with Elastic Agent | by Benoit Luttringer | Zenika

I saw this guide, but it about how to parse rows in log file, but I need a csv.

It is the same approach, you just need to use the correct processor.

Check the documentation for the CSV processor.

I used it, but it not parsed anything.

@leandrojmp I tried "Test pipeline" and I see that it parsed it, but in KIbana "Discover" tab I see non-parsed events.

You need to provide context, what is the ingest pipeline you are using? Please share it.

What is the output you have in Elasticsearch? Copy the json part you have in discover and share it.

It is not possible to know what could be the issue without seeing your data and the output you are getting.

JSON output in Discover

Example 1
{
  "_index": ".ds-logs-generic-default-2023.01.27-000001",
  "_id": "b7JV84UBXqgfEVa3cNbf",
  "_version": 1,
  "_score": 0,
  "_source": {
    "@timestamp": "2023-01-27T13:05:39.737Z",
    "host": {
      "mac": [
        "00-0C-29-B1-4F-7B"
      ],
      "name": "nsm",
      "hostname": "nsm",
      "architecture": "x86_64",
      "os": {
        "family": "debian",
        "name": "Ubuntu",
        "kernel": "5.4.0-137-generic",
        "codename": "focal",
        "type": "linux",
        "platform": "ubuntu",
        "version": "20.04.5 LTS (Focal Fossa)"
      },
      "id": "432ef884cea7488bb03fee5d5f1b56a9",
      "containerized": false,
      "ip": [
        "192.168.0.108",
        "fe80::20c:29ff:feb1:4f7b"
      ]
    },
    "event": {
      "dataset": "generic"
    },
    "elastic_agent": {
      "version": "8.6.0",
      "id": "7f80d869-90d0-4dd7-b991-c5c68f993a50",
      "snapshot": false
    },
    "agent": {
      "id": "7f80d869-90d0-4dd7-b991-c5c68f993a50",
      "type": "filebeat",
      "version": "8.6.0",
      "ephemeral_id": "e7d1928e-09d8-4770-8c7f-e3090eb480d2",
      "name": "nsm"
    },
    "ecs": {
      "version": "8.0.0"
    },
    "input": {
      "type": "log"
    },
    "message": "0.279,192.168.137.88,34.95.113.255,25,644,16119,0.424,0.667,0.025,0,0",
    "data_stream": {
      "dataset": "generic",
      "namespace": "default",
      "type": "logs"
    },
    "log": {
      "offset": 327,
      "file": {
        "path": "/opt/nsm/1.log"
      }
    }
  },
  "fields": {
    "elastic_agent.version": [
      "8.6.0"
    ],
    "host.hostname": [
      "nsm"
    ],
    "host.mac": [
      "00-0C-29-B1-4F-7B"
    ],
    "host.ip": [
      "192.168.0.108",
      "fe80::20c:29ff:feb1:4f7b"
    ],
    "agent.type": [
      "filebeat"
    ],
    "host.os.version": [
      "20.04.5 LTS (Focal Fossa)"
    ],
    "host.os.kernel": [
      "5.4.0-137-generic"
    ],
    "host.os.name": [
      "Ubuntu"
    ],
    "agent.name": [
      "nsm"
    ],
    "host.name": [
      "nsm"
    ],
    "elastic_agent.snapshot": [
      false
    ],
    "host.id": [
      "432ef884cea7488bb03fee5d5f1b56a9"
    ],
    "host.os.type": [
      "linux"
    ],
    "elastic_agent.id": [
      "7f80d869-90d0-4dd7-b991-c5c68f993a50"
    ],
    "data_stream.namespace": [
      "default"
    ],
    "host.os.codename": [
      "focal"
    ],
    "input.type": [
      "log"
    ],
    "log.offset": [
      327
    ],
    "message": [
      "0.279,192.168.137.88,34.95.113.255,25,644,16119,0.424,0.667,0.025,0,0"
    ],
    "data_stream.type": [
      "logs"
    ],
    "host.architecture": [
      "x86_64"
    ],
    "@timestamp": [
      "2023-01-27T13:05:39.737Z"
    ],
    "agent.id": [
      "7f80d869-90d0-4dd7-b991-c5c68f993a50"
    ],
    "host.os.platform": [
      "ubuntu"
    ],
    "ecs.version": [
      "8.0.0"
    ],
    "host.containerized": [
      false
    ],
    "log.file.path": [
      "/opt/nsm/1.log"
    ],
    "data_stream.dataset": [
      "generic"
    ],
    "agent.ephemeral_id": [
      "e7d1928e-09d8-4770-8c7f-e3090eb480d2"
    ],
    "agent.version": [
      "8.6.0"
    ],
    "host.os.family": [
      "debian"
    ],
    "event.dataset": [
      "generic"
    ]
  }
}

Custom Logs Elastic Agent config:
image

Full config
{
  "csv": {
    "field": "message",
    "target_fields": ["Score", "Source IP", "Destination IP", "Connections", "Avg. Bytes", "Total Bytes", "TS Score",	"DS Score", "Dur Score", "Hist Score", "Top Intvl"]
  }
}
Result of tested pipeline based on Example 1
{
  "docs": [
    {
      "doc": {
        "_index": ".ds-logs-generic-default-2023.01.27-000001",
        "_id": "b7JV84UBXqgfEVa3cNbf",
        "_version": "-3",
        "_source": {
          "Connections": "25",
          "agent": {
            "name": "nsm",
            "id": "7f80d869-90d0-4dd7-b991-c5c68f993a50",
            "type": "filebeat",
            "ephemeral_id": "e7d1928e-09d8-4770-8c7f-e3090eb480d2",
            "version": "8.6.0"
          },
          "Total Bytes": "16119",
          "log": {
            "file": {
              "path": "/opt/nsm/1.log"
            },
            "offset": 327
          },
          "elastic_agent": {
            "id": "7f80d869-90d0-4dd7-b991-c5c68f993a50",
            "version": "8.6.0",
            "snapshot": false
          },
          "Destination IP": "34.95.113.255",
          "Dur Score": "0.025",
          "message": "0.279,192.168.137.88,34.95.113.255,25,644,16119,0.424,0.667,0.025,0,0",
          "input": {
            "type": "log"
          },
          "DS Score": "0.667",
          "Score": "0.279",
          "TS Score": "0.424",
          "@timestamp": "2023-01-27T13:05:39.737Z",
          "Avg": {
            " Bytes": "644"
          },
          "ecs": {
            "version": "8.0.0"
          },
          "data_stream": {
            "namespace": "default",
            "type": "logs",
            "dataset": "generic"
          },
          "host": {
            "hostname": "nsm",
            "os": {
              "kernel": "5.4.0-137-generic",
              "codename": "focal",
              "name": "Ubuntu",
              "family": "debian",
              "type": "linux",
              "version": "20.04.5 LTS (Focal Fossa)",
              "platform": "ubuntu"
            },
            "containerized": false,
            "ip": [
              "192.168.0.108",
              "fe80::20c:29ff:feb1:4f7b"
            ],
            "name": "nsm",
            "id": "432ef884cea7488bb03fee5d5f1b56a9",
            "mac": [
              "00-0C-29-B1-4F-7B"
            ],
            "architecture": "x86_64"
          },
          "Source IP": "192.168.137.88",
          "Hist Score": "0",
          "event": {
            "dataset": "generic"
          },
          "Top Intvl": "0"
        },
        "_ingest": {
          "timestamp": "2023-01-30T14:50:14.581791334Z"
        }
      }
    }
  ]
}

You did not show how you set up the Custom Logs integration...where / how did you do specify the ingest pipeline.?

What version are you using?

Hi @stephenb!
Thank for comment.
My version is ELK 8.6.1.

how you set up the Custom Logs integration

Integrations > Custom logs > Add Custom Logs integration >
Custom log file = /opt/nsm/* (path where all my csv stores)
Processors =

{
  "csv": {
    "field": "message",
    "target_fields": ["Score", "Source IP", "Destination IP", "Connections", "Avg. Bytes", "Total Bytes", "TS Score",	"DS Score", "Dur Score", "Hist Score", "Top Intvl"]
  }
}

how did you do specify the ingest pipeline.?

image
I modify logs-log.log@custom

And it linked to my elastic agent and host

I didn't do anything more. Maybe I miss something? I'm new to ELK.

@test_qweqwe you look like you are on the right path.

When you look in Discover do you see any of the logs from the files in /opt/nsm/

Perhaps the agent read all the files before you added the pipeline?

Yes, I see logs from /opt/nsm/, but they not parsed.
All new files still not parsed after I added pipeline and restarted ELK.

Few sec ago event:

{
  "_index": ".ds-logs-generic-default-2023.01.27-000001",
  "_id": "gO9QB4YBxWb1fMfgEZty",
  "_version": 1,
  "_score": 0,
  "_source": {
    "@timestamp": "2023-01-31T10:12:12.610Z",
    "data_stream": {
      "namespace": "default",
      "type": "logs",
      "dataset": "generic"
    },
    "event": {
      "dataset": "generic"
    },
    "elastic_agent": {
      "version": "8.6.0",
      "id": "7f80d869-90d0-4dd7-b991-c5c68f993a50",
      "snapshot": false
    },
    "log": {
      "file": {
        "path": "/opt/nsm/13.log"
      },
      "offset": 115
    },
    "message": "0.839,192.168.0.108,185.125.190.58,30,76,2280,1,0.667,0.688,1,2048",
    "agent": {
      "type": "filebeat",
      "version": "8.6.0",
      "ephemeral_id": "40f0d334-e31a-49b5-b36b-bb795f5b4b69",
      "id": "7f80d869-90d0-4dd7-b991-c5c68f993a50",
      "name": "nsm"
    },
    "ecs": {
      "version": "8.0.0"
    },
    "input": {
      "type": "log"
    },
    "host": {
      "mac": [
        "00-0C-29-B1-4F-7B"
      ],
      "hostname": "nsm",
      "architecture": "x86_64",
      "os": {
        "codename": "focal",
        "type": "linux",
        "platform": "ubuntu",
        "version": "20.04.5 LTS (Focal Fossa)",
        "family": "debian",
        "name": "Ubuntu",
        "kernel": "5.4.0-137-generic"
      },
      "id": "432ef884cea7488bb03fee5d5f1b56a9",
      "containerized": false,
      "name": "nsm",
      "ip": [
        "192.168.0.108",
        "fe80::20c:29ff:feb1:4f7b"
      ]
    }
  },
  "fields": {
    "elastic_agent.version": [
      "8.6.0"
    ],
    "host.hostname": [
      "nsm"
    ],
    "host.mac": [
      "00-0C-29-B1-4F-7B"
    ],
    "host.ip": [
      "192.168.0.108",
      "fe80::20c:29ff:feb1:4f7b"
    ],
    "agent.type": [
      "filebeat"
    ],
    "host.os.version": [
      "20.04.5 LTS (Focal Fossa)"
    ],
    "host.os.kernel": [
      "5.4.0-137-generic"
    ],
    "host.os.name": [
      "Ubuntu"
    ],
    "agent.name": [
      "nsm"
    ],
    "elastic_agent.snapshot": [
      false
    ],
    "host.name": [
      "nsm"
    ],
    "host.id": [
      "432ef884cea7488bb03fee5d5f1b56a9"
    ],
    "host.os.type": [
      "linux"
    ],
    "elastic_agent.id": [
      "7f80d869-90d0-4dd7-b991-c5c68f993a50"
    ],
    "data_stream.namespace": [
      "default"
    ],
    "host.os.codename": [
      "focal"
    ],
    "input.type": [
      "log"
    ],
    "log.offset": [
      115
    ],
    "message": [
      "0.839,192.168.0.108,185.125.190.58,30,76,2280,1,0.667,0.688,1,2048"
    ],
    "data_stream.type": [
      "logs"
    ],
    "host.architecture": [
      "x86_64"
    ],
    "@timestamp": [
      "2023-01-31T10:12:12.610Z"
    ],
    "agent.id": [
      "7f80d869-90d0-4dd7-b991-c5c68f993a50"
    ],
    "host.containerized": [
      false
    ],
    "ecs.version": [
      "8.0.0"
    ],
    "host.os.platform": [
      "ubuntu"
    ],
    "data_stream.dataset": [
      "generic"
    ],
    "log.file.path": [
      "/opt/nsm/13.log"
    ],
    "agent.ephemeral_id": [
      "40f0d334-e31a-49b5-b36b-bb795f5b4b69"
    ],
    "agent.version": [
      "8.6.0"
    ],
    "host.os.family": [
      "debian"
    ],
    "event.dataset": [
      "generic"
    ]
  }
}

My next Suggestion is to add a simple set processor into the custom pipeline with a field and string to see if your ingest pipeline is executed at all.

Also, did you set "ignore_failure": true for the CSV processor?

Hi @stephenb
Sorry for long reply.
I added "ignore_failure": true to CSV processor, but it doesn't helped.

My next Suggestion is to add a simple set processor into the custom pipeline with a field and string to see if your ingest pipeline is executed at all.

Can you please share how to do it? Sorry, I'm new to Elastic.

Following along ... I'm working on ingesting a custom file with data formatted in a CSV format and have been unable to ingest data as well. In the past I've done this through logstash and with the Elastic Agents I've been unsuccessful.

(The agent doesn't even seem to be ingesting the log file)

have been unable to ingest data as well

Do you mean that there was some ingestion, or no results at all?

I also thought about an alternative, such as converting the csv output to json format and then to Elastic Agent, but my competence in sprinting is not enough.

It is looking like there is some security issue for mine:

{"type":"security_exception","reason":"action [indices:admin/auto_create] is unauthorized for API key id [8cPGg4UBytBGNw1dSsxz] of user [elastic/fleet-server] on indices [logs-connectedclients-default], this action is granted by the index privileges [auto_configure,create_index,manage,all]"}, dropping event!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.