I've been ingesting datasets from before ECS was a thing that now have an ECS mapping. What would be the most efficient means of ingesting data (moving forward) so that it is ECS compliant? Examples of datasets are Fortigate firewalls, CloudFlare logs, or Windows event logs. This data is all ingested in some way by Logstash and I'd like to keep Logstash in the loop for event caching and custom field injection, when there isn't an ECS complaint field.
This is in preparation for moving up to Elastic Cloud and taking full advantage of Enterprise licensing.
I think I have a very similar scenario, been ingesting data before ECS was a thing from many different datasets, maybe I may provide some insights.
We also use and rely on Logstash, we made some tests with the Elastic Agent but decided to only use it for simple things like some SaaS audit, it was too limited and would add a lot more of work to manage everything on our use cases, so Logstash is also essential for us.
But since we wanted to make our data work with all the Security and built-in alerts provided by Elastic we decided to use the parse done by the Elastic Agent as reference for our Logstash pipelines.
For example, for the Fortigate pipeline we looked at the ingest pipeline used by the integration to validate which fields we need to correct in our pipeline so the final data generated by Logstash would be the same as if it was generated by the Integration.
Basically for every dataset we have we check if there is an integration and use the ingest pipeline as a reference to our logstash filters, when an integration for a specific data set does not exist we are able to at least map the majority of the fields to some ecs fields, according to the ECS Reference.
The fields that we can't match to any ECS equivalent, we store as nested fields under a top-level named after the dataset, for example, if the data set is for a tool named acme, we will probably end with a couple of
Since our main use case is the SIEM and Security we also follow the guidelines on this documentation.
It is a lot of work to make your custom data fit into what Elastic wants, but I find it way easier to do that than to have to deal with Integrations and hundreds of custom ingest pipelines and mappings.
Ya, looks like we are both tackling (or have tackled) the same problem. I found some documentation from Elastic where they recommend using an Elasticsearch ingest pipeline to map source to destination. This seems to be an easy way to do it....but also seems to increase complexity. I'd have to keep track of both the field output in the Logstash pipeline as well as the Elasticsearch ingest mapping.
I've already had the idea of using ECS mappings from an integration to determine where my fields should map to, but that brings in some ambiguities pretty quickly for me. I was hoping there was an easier way, but looks like I'm gonna have to grind through remapping and then re-configure my pipeline(s).
Perfect example of ambiguity. CloudFlare has two fields:
- ClientRequestPath - /index/query
- ClientRequestURI - /index/query?user=john
The explanation of
url.path seems to match ClientRequestPath, but then where does ClientRequestURI go? Is it a field the integration doesn't collect or am I missing the proper field it should go to.
For things like this you need to look at the ingest pipelines, this won't be explained in the documentation.
For example, in this case the ingest pipeline used is this one.
And this is how it will parse those two fields you mentioned:
# URL Fields
if: ctx?.json?.ClientRequestURI != null
if: ctx?.url?.domain == null
if: ctx?.url?.path == null
if: ctx?.url?.scheme == null
It will basic use the
uri_parts processor to extract most of the url fields from
ClientRequestURI and than it will use the
ClientRequestPath for the
url.path if it already does not exist.
Awesome, I was wondering if there was a way to see what Elasticsearch/integration was actually doing. To be clear that I'm reading this right,
# Event Time Fields
The above just converts
EdgeStartTimestamp to a string datatype and doesn't change the field name? So ECS compliant datasets may still have fields that don't fall under something like
cloudflare.*? I would have expected this field to be like
Later in the same ingest pipeline this field is used as the source for the
And after that the field
event.start is created using the value of the
So in the end you will have the
event.start with the value from
EdgeStartTimestamp, which is an ecs field, it would be redundant to have a
cloudflare.edge.timestamp.start in my opinion.
Probably not, some fields will be renamed and some fields will be used as the source for other fields and then removed in the end.
If you check the default pipeline for this integration, you will see that it calls the
http pipeline to parse the http requests and after that it will remove the
json top-level field.
So when the pipeline reachs this processor the fields were already renamed or used as source for other fields.
When trying to port an ingest pipeline to logstash you need to look at the full pipeline to see what it is doing, normally I go field by field until all the fields are mapped, most of the times is just a series of renames.
This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.