Becoming ECS Compliant

I've been ingesting datasets from before ECS was a thing that now have an ECS mapping. What would be the most efficient means of ingesting data (moving forward) so that it is ECS compliant? Examples of datasets are Fortigate firewalls, CloudFlare logs, or Windows event logs. This data is all ingested in some way by Logstash and I'd like to keep Logstash in the loop for event caching and custom field injection, when there isn't an ECS complaint field.

This is in preparation for moving up to Elastic Cloud and taking full advantage of Enterprise licensing.

1 Like

I think I have a very similar scenario, been ingesting data before ECS was a thing from many different datasets, maybe I may provide some insights.

We also use and rely on Logstash, we made some tests with the Elastic Agent but decided to only use it for simple things like some SaaS audit, it was too limited and would add a lot more of work to manage everything on our use cases, so Logstash is also essential for us.

But since we wanted to make our data work with all the Security and built-in alerts provided by Elastic we decided to use the parse done by the Elastic Agent as reference for our Logstash pipelines.

For example, for the Fortigate pipeline we looked at the ingest pipeline used by the integration to validate which fields we need to correct in our pipeline so the final data generated by Logstash would be the same as if it was generated by the Integration.

Basically for every dataset we have we check if there is an integration and use the ingest pipeline as a reference to our logstash filters, when an integration for a specific data set does not exist we are able to at least map the majority of the fields to some ecs fields, according to the ECS Reference.

The fields that we can't match to any ECS equivalent, we store as nested fields under a top-level named after the dataset, for example, if the data set is for a tool named acme, we will probably end with a couple of acme.* fields.

Since our main use case is the SIEM and Security we also follow the guidelines on this documentation.

It is a lot of work to make your custom data fit into what Elastic wants, but I find it way easier to do that than to have to deal with Integrations and hundreds of custom ingest pipelines and mappings.

2 Likes

Ya, looks like we are both tackling (or have tackled) the same problem. I found some documentation from Elastic where they recommend using an Elasticsearch ingest pipeline to map source to destination. This seems to be an easy way to do it....but also seems to increase complexity. I'd have to keep track of both the field output in the Logstash pipeline as well as the Elasticsearch ingest mapping.

I've already had the idea of using ECS mappings from an integration to determine where my fields should map to, but that brings in some ambiguities pretty quickly for me. I was hoping there was an easier way, but looks like I'm gonna have to grind through remapping and then re-configure my pipeline(s).

Perfect example of ambiguity. CloudFlare has two fields:

  • ClientRequestPath - /index/query
  • ClientRequestURI - /index/query?user=john

The explanation of url.path seems to match ClientRequestPath, but then where does ClientRequestURI go? Is it a field the integration doesn't collect or am I missing the proper field it should go to.

For things like this you need to look at the ingest pipelines, this won't be explained in the documentation.

For example, in this case the ingest pipeline used is this one.

And this is how it will parse those two fields you mentioned:

# URL Fields
- uri_parts:
    field: json.ClientRequestURI
    tag: uri_parts_client_request_uri
    ignore_failure: true
    if: ctx?.json?.ClientRequestURI != null
- set:
    field: url.domain
    copy_from: json.ClientRequestHost
    ignore_empty_value: true
    if: ctx?.url?.domain == null
- set:
    field: url.path
    copy_from: json.ClientRequestPath
    ignore_empty_value: true
    if: ctx?.url?.path == null
- set:
    field: url.scheme
    copy_from: json.ClientRequestScheme
    ignore_empty_value: true
    if: ctx?.url?.scheme == null

It will basic use the uri_parts processor to extract most of the url fields from ClientRequestURI and than it will use the ClientRequestPath for the url.path if it already does not exist.

1 Like

Awesome, I was wondering if there was a way to see what Elasticsearch/integration was actually doing. To be clear that I'm reading this right,

processors:
# Event Time Fields
- convert:
    field: json.EdgeStartTimestamp
    tag: convert_edge_start_timestamp
    type: string

The above just converts EdgeStartTimestamp to a string datatype and doesn't change the field name? So ECS compliant datasets may still have fields that don't fall under something like cloudflare.*? I would have expected this field to be like cloudflare.edge.timestamp.start

Later in the same ingest pipeline this field is used as the source for the @timestamp field.

- date:
    field: json.EdgeStartTimestamp
    tag: date_edge_start_timestamp
    formats:
    - ISO8601
    - uuuu-MM-dd'T'HH:mm:ssX
    - uuuu-MM-dd'T'HH:mm:ss.SSSX
    - yyyy-MM-dd'T'HH:mm:ssZ
    - yyyy-MM-dd'T'HH:mm:ss.SSSZ
    - UNIX_MS
    timezone: UTC
    target_field: "@timestamp"

And after that the field event.start is created using the value of the @timestamp field.

- set:
    field: event.start
    copy_from: "@timestamp"
    ignore_empty_value: true

So in the end you will have the event.start with the value from EdgeStartTimestamp, which is an ecs field, it would be redundant to have a cloudflare.edge.timestamp.start in my opinion.

Probably not, some fields will be renamed and some fields will be used as the source for other fields and then removed in the end.

If you check the default pipeline for this integration, you will see that it calls the http pipeline to parse the http requests and after that it will remove the json top-level field.

- remove:
    field:
      - json
    ignore_missing: true  

So when the pipeline reachs this processor the fields were already renamed or used as source for other fields.

When trying to port an ingest pipeline to logstash you need to look at the full pipeline to see what it is doing, normally I go field by field until all the fields are mapped, most of the times is just a series of renames.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.