Enrichement process not consistently enrching

Enrichment is not consistent. I have source index from where i am enriching the a field on the basis of the destination ip. enrichemnt is working but sometime its not enriching the field even the same destination.ip is there and no condition on enrichment is applied?

what could be the possible issue?

Elasticsearch ingest-pipeline #enrich

Hi @kishorkumar

You're going to need to provide a lot more detail if you want help resolving this.

What version are you on?

Please provide your enrich policy

A Sample of the lookup data

a sample of the source data to be enriched

Please provide a couple samples that do not work

Perhaps then we can help.

My elastic Version is 8.9.2.

On simulate the documents the enrcihement is happening on same documetns which are not by defautlt

Please provide samples and show results of simulate.

_simulate does not actually write the document To the index .

Perhaps there is a mapping conflict which would then not allow the document to be indexed.

This is scenario that I've seen happen in the past And describes the exact behavior you're describing

So instead of _simulate you should try to actually post the document To the index with the pipeline.

If you provide samples like I asked for we can check.... Or You can do that on your own

I have tried today by directly sending log to DataStream by post and enrichment is working but in real time is not doing enrichment.

So what do you think i have tried on multiple documents that does are not enriched with fields but when i do through the post API it is enriching in same document.

Then I expected the enrich pipeline is not being called or there is a mapping conflict.

I can't really help unless you provide samples of everything I've asked.

What I would suggest is add another set processor to the pipeline that's called with the enrich and put a field in there that shows that that pipeline is actually being called.

That way you can test if the pipeline's being called...

Also, have you done any failure processing to catch the error?

Here is my custom pipeline for netflow
custom@netlfow

Summary
  "logs-netflow.log@custom": {
    "description": "Custom netflflow pipelne for transformation and enrichment",
    "processors": [
      {
        "set": {
          "field": "@timestamp",
          "copy_from": "event.created",
          "ignore_empty_value": true,
          "if": "null == ctx.event?.start",
          "ignore_failure": true
        }
      },
      {
        "set": {
          "field": "@timestamp",
          "copy_from": "event.start",
          "ignore_empty_value": true,
          "if": "null != ctx.event?.start",
          "ignore_failure": true
        }
      },
      {
        "set": {
          "field": "destination.ip",
          "override": false,
          "ignore_empty_value": true,
          "ignore_failure": true,
          "copy_from": "fields.vserver_ip"
        }
      },
      {
        "enrich": {
          "field": "destination.ip",
          "policy_name": "public-ip-service-enrich-policy-updated",
          "target_field": "enriched",
          "ignore_missing": true,
          "if": "null != ctx.destination?.ip",
          "ignore_failure": true
        }
      },
      {
        "rename": {
          "field": "enriched.service.name",
          "target_field": "service.name",
          "ignore_failure": true,
          "ignore_missing": true
        }
      },
      {
        "set": {
          "field": "service.current_step",
          "copy_from": "enriched.service.public.current_step",
          "ignore_empty_value": true,
          "ignore_failure": true
        }
      },
      {
        "set": {
          "field": "service.next_step",
          "copy_from": "enriched.service.public.next_step",
          "ignore_empty_value": true,
          "ignore_failure": true
        }
      }, 
      {
        "remove": {
          "field": [
            "enriched.service.public.ip"
          ],
          "ignore_missing": true,
          "ignore_failure": true
        }
      }
    ],
    "version": 20241009
  }
}

There are multiple enrichment policies there but i have added one as sample

enrichmnt soruce index

Summary
{
        "_index": "enrichment-source",
        "_id": "nmqp",
        "_score": 1,
        "_source": {
          "service": {
            "name": "NMQP",
            "public": {
              "current_step": "Public",
              "ip": "185.145.130.86",
              "next_step": "Perimeter FW"
            }
          }
        }
 }

Docs without enrichment:

Summary

{
  "_index": ".ds-logs-netflow.log-default-2024.12.09-001458",
  "_id": "tO47qpMBV6TtyLZf-NUY",
  "_version": 1,
  "_score": 0,
  "_source": {
    "agent": {
      "name": "logstash02",
      "id": "82cca-0b9-4c5-ab7-72e68b",
      "type": "filebeat",
      "ephemeral_id": "1ce4-f3cb-401-a657-4c9d",
      "version": "8.7.0"
    },
    "destination": {
      "geo": {
        "country_iso_code": "SA",
        "timezone": "Asia/Riyadh",
        "country_name": "Saudi Arabia",
        "continent_code": "AS",
        "location": {
          "lon": 45,
          "lat": 25
        }
      },
      "port": 443,
      "ip": "185.145.130.86",
      "locality": "external"
    },
    "source": {
      "geo": {
        "region_iso_code": "SA-01",
        "city_name": "Riyadh",
        "country_iso_code": "SA",
        "timezone": "Asia/Riyadh",
        "country_name": "Saudi Arabia",
        "continent_code": "AS",
        "region_name": "Ar Riyāḑ",
        "location": {
          "lon": 6.752,
          "lat": 4.637
        }
      },
      "port": 43573,
      "bytes": 82,
      "ip": "45.182.16.3",
      "locality": "external",
      "packets": 2
    },
    "fileset": {
      "name": "log"
    },
    "network": {
      "community_id": "1:zqjXrBRFF4dfdggYPo=",
      "bytes": 82,
      "transport": "tcp",
      "type": "ipv4",
      "iana_number": "6",
      "packets": 2,
      "direction": "external"
    },
    "tags": [
      "forwarded",
      "beats_input_raw_event",
      "_geoip_database_unavailable_GeoLite2-ASN.mmdb",
      "_geoip_database_unavailable_GeoLite2-ASN.mmdb"
    ],
    "observer": {
      "ip": "10.10.14.20"
    },
    "input": {
      "type": "netflow"
    },
    "netflow": {
      "destination_ipv4_address": "185.145.130.86",
      "packet_delta_count": 2,
      "source_ipv4_address": "45.182.16.3",
      "protocol_identifier": 6,
      "exporter": {
        "uptime_millis": 398541755,
        "address": "10.10.14.20:645",
        "ip": "10.10.14.20",
        "source_id": 256,
        "version": 9,
        "timestamp": "2024-12-09T07:03:53.000Z"
      },
      "tcp_control_bits": 16,
      "octet_delta_count": 82,
      "egress_interface": 20,
      "ingress_interface": 18,
      "type": "netflow_flow",
      "destination_transport_port": 443,
      "source_transport_port": 43573
    },
    "@timestamp": "2024-12-09T07:04:26.914Z",
    "related": {
      "ip": [
        "45.182.16.3",
        "185.145.130.86"
      ]
    },
    "ecs": {
      "version": "8.6.0"
    },
    "service": {
      "type": "netflow"
    },
    "data_stream": {
      "namespace": "default",
      "type": "logs",
      "dataset": "netflow.log"
    },
    "@version": "1",
    "event": {
      "agent_id_status": "auth_metadata_missing",
      "ingested": "2024-12-09T07:04:36Z",
      "created": "2024-12-09T07:04:26.914Z",
      "kind": "event",
      "module": "netflow",
      "action": "netflow_flow",
      "category": [
        "network"
      ],
      "type": [
        "connection"
      ],
      "dataset": "netflow.log"
    },
    "flow": {
      "locality": "external",
      "id": "7sr-eX2CCV"
    }
  },
  "fields": {
    "event.ingested": [
      "2024-12-09T07:04:36.000Z"
    ],
    "@timestamp": [
      "2024-12-09T07:04:26.914Z"
    ],
    "event.created": [
      "2024-12-09T07:04:26.914Z"
    ],
    "netflow.exporter.timestamp": [
      "2024-12-09T07:03:53.000Z"
    ]
  }
}


{
  "_index": ".ds-logs-netflow.log-default-2024.12.09-001458",
  "_id": "t-47qpMBV6TtyLZf-NUY",
  "_version": 1,
  "_score": 0,
  "_source": {
    "agent": {
      "name": "logstash02",
      "id": "82cca-0b9-4c5-ab7-72e68b",
      "type": "filebeat",
      "ephemeral_id": "1ce4-f3cb-401-a657-4c9d",
      "version": "8.7.0"
    },
    "destination": {
      "geo": {
        "country_iso_code": "SA",
        "timezone": "Asia/Riyadh",
        "country_name": "Saudi Arabia",
        "continent_code": "AS",
        "location": {
          "lon": 45,
          "lat": 25
        }
      },
      "port": 443,
      "ip": "185.145.130.86",
      "locality": "external"
    },
    "source": {
      "geo": {
        "country_iso_code": "SA",
        "timezone": "Asia/Riyadh",
        "country_name": "Saudi Arabia",
        "continent_code": "AS",
        "location": {
          "lon": 45,
          "lat": 25
        }
      },
      "port": 62540,
      "bytes": 82,
      "ip": "174.140.0.207",
      "locality": "external",
      "packets": 2
    },
    "fileset": {
      "name": "log"
    },
    "network": {
      "community_id": "1:hvIAxyvabjwMMyuPdb8US/ljuUw=",
      "bytes": 82,
      "transport": "tcp",
      "type": "ipv4",
      "iana_number": "6",
      "packets": 2,
      "direction": "external"
    },
    "tags": [
      "forwarded",
      "beats_input_raw_event",
      "_geoip_database_unavailable_GeoLite2-ASN.mmdb",
      "_geoip_database_unavailable_GeoLite2-ASN.mmdb"
    ],
    "observer": {
      "ip": "10.10.14.20"
    },
    "input": {
      "type": "netflow"
    },
    "netflow": {
      "destination_ipv4_address": "185.145.130.86",
      "packet_delta_count": 2,
      "source_ipv4_address": "174.140.0.207",
      "protocol_identifier": 6,
      "exporter": {
        "uptime_millis": 398541738,
        "address": "10.10.14.20:645",
        "ip": "10.10.14.20",
        "source_id": 256,
        "version": 9,
        "timestamp": "2024-12-09T07:03:53.000Z"
      },
      "tcp_control_bits": 16,
      "octet_delta_count": 82,
      "egress_interface": 20,
      "ingress_interface": 18,
      "type": "netflow_flow",
      "destination_transport_port": 443,
      "source_transport_port": 62540
    },
    "@timestamp": "2024-12-09T07:04:26.879Z",
    "related": {
      "ip": [
        "174.140.0.207",
        "185.145.130.86"
      ]
    },
    "ecs": {
      "version": "8.6.0"
    },
    "service": {
      "type": "netflow"
    },
    "data_stream": {
      "namespace": "default",
      "type": "logs",
      "dataset": "netflow.log"
    },
    "@version": "1",
    "event": {
      "agent_id_status": "auth_metadata_missing",
      "ingested": "2024-12-09T07:04:36Z",
      "created": "2024-12-09T07:04:26.879Z",
      "kind": "event",
      "module": "netflow",
      "action": "netflow_flow",
      "category": [
        "network"
      ],
      "type": [
        "connection"
      ],
      "dataset": "netflow.log"
    },
    "device": {
      "next_step": "SW Aggregator",
      "name": "Internet Router"
    },
    "flow": {
      "locality": "external",
      "id": "5E4M1WfXYXw"
    }
  },
  "fields": {
    "event.ingested": [
      "2024-12-09T07:04:36.000Z"
    ],
    "@timestamp": [
      "2024-12-09T07:04:26.879Z"
    ],
    "event.created": [
      "2024-12-09T07:04:26.879Z"
    ],
    "netflow.exporter.timestamp": [
      "2024-12-09T07:03:53.000Z"
    ]
  }
}

{
  "_index": ".ds-logs-netflow.log-default-2024.12.09-001458",
  "_id": "pAE9qpMBV6TtyLZfAiAb",
  "_version": 1,
  "_score": 0,
  "_source": {
    "agent": {
      "name": "logstash02",
      "id": "82cca-0b9-4c5-ab7-72e68b",
      "type": "filebeat",
      "ephemeral_id": "1ce4-f3cb-401-a657-4c9d",
      "version": "8.7.0"
    },
    "destination": {
      "geo": {
        "country_iso_code": "SA",
        "timezone": "Asia/Riyadh",
        "country_name": "Saudi Arabia",
        "continent_code": "AS",
        "location": {
          "lon": 45,
          "lat": 25
        }
      },
      "port": 443,
      "ip": "185.145.130.86",
      "locality": "external"
    },
    "source": {
      "geo": {
        "region_iso_code": "SA-01",
        "city_name": "Riyadh",
        "country_iso_code": "SA",
        "timezone": "Asia/Riyadh",
        "country_name": "Saudi Arabia",
        "continent_code": "AS",
        "region_name": "Ar Riyāḑ",
        "location": {
          "lon": 6.752,
          "lat": 4.637
        }
      },
      "port": 2908,
      "bytes": 3182,
      "ip": "83.169.121.37",
      "locality": "external",
      "packets": 23
    },
    "fileset": {
      "name": "log"
    },
    "network": {
      "community_id": "1:KI5MzOuDociiuWaNV5cKd60pCFM=",
      "bytes": 3182,
      "transport": "tcp",
      "type": "ipv4",
      "iana_number": "6",
      "packets": 23,
      "direction": "external"
    },
    "tags": [
      "forwarded",
      "beats_input_raw_event",
      "_geoip_database_unavailable_GeoLite2-ASN.mmdb",
      "_geoip_database_unavailable_GeoLite2-ASN.mmdb"
    ],
    "observer": {
      "ip": "10.10.122.2"
    },
    "input": {
      "type": "netflow"
    },
    "netflow": {
      "packet_delta_count": 23,
      "forwarding_status": 64,
      "post_nat_destination_ipv4_address": "192.178.59.86",
      "type": "netflow_flow",
      "source_ipv4_address": "83.169.121.37",
      "flow_end_reason": 3,
      "exporter": {
        "uptime_millis": 390195528,
        "address": "10.10.122.2:3502",
        "ip": "10.10.122.2",
        "source_id": 2,
        "version": 9,
        "timestamp": "2024-12-09T07:09:10.000Z"
      },
      "post_packet_delta_count": 23,
      "post_napt_source_transport_port": 0,
      "post_ip_diff_serv_code_point": 0,
      "destination_transport_port": 443,
      "protocol_identifier": 6,
      "post_octet_delta_count": 3182,
      "flow_start_sys_up_time": 390063348,
      "octet_delta_count": 3182,
      "egress_interface": 91,
      "post_napt_destination_transport_port": 443,
      "application_id": [
        20,
        0,
        0,
        38,
        58,
        0,
        0,
        128,
        160
      ],
      "destination_ipv4_address": "185.145.130.86",
      "post_ip_class_of_service": 0,
      "ip_class_of_service": 0,
      "post_nat_source_ipv4_address": "0.0.0.0",
      "ingress_interface": 90,
      "flow_end_sys_up_time": 390188838,
      "source_transport_port": 2908
    },
    "@timestamp": "2024-12-09T07:05:39.531Z",
    "related": {
      "ip": [
        "83.169.121.37",
        "185.145.130.86"
      ]
    },
    "ecs": {
      "version": "8.6.0"
    },
    "service": {
      "type": "netflow"
    },
    "data_stream": {
      "namespace": "default",
      "type": "logs",
      "dataset": "netflow.log"
    },
    "@version": "1",
    "event": {
      "duration": 125490000000,
      "agent_id_status": "auth_metadata_missing",
      "ingested": "2024-12-09T07:05:44Z",
      "created": "2024-12-09T07:05:39.531Z",
      "kind": "event",
      "module": "netflow",
      "start": "2024-12-09T07:05:39.531Z",
      "action": "netflow_flow",
      "end": "2024-12-09T07:09:03.310Z",
      "category": [
        "network"
      ],
      "type": [
        "connection"
      ],
      "dataset": "netflow.log"
    },
    "device": {
      "next_step": "WAF",
      "name": "Perimeter FW"
    },
    "flow": {
      "locality": "external",
      "id": "NNAUJjIxvYI"
    }
  },
  "fields": {
    "event.end": [
      "2024-12-09T07:09:03.310Z"
    ],
    "event.ingested": [
      "2024-12-09T07:05:44.000Z"
    ],
    "@timestamp": [
      "2024-12-09T07:05:39.531Z"
    ],
    "event.start": [
      "2024-12-09T07:05:39.531Z"
    ],
    "event.created": [
      "2024-12-09T07:05:39.531Z"
    ],
    "netflow.exporter.timestamp": [
      "2024-12-09T07:09:10.000Z"
    ]
  }
}

There is no mapping confict at all elasticsearch ECS version 8.9.2

I think you will need to share your enrich policy public-ip-service-enrich-policy-updated as well.

You shared just the enrich processor, not how the policy is configured.

here is the policy

 {
      "config": {
        "match": {
          "name": "public-ip-service-enrich-policy-updated",
          "indices": [
            "enrichment-source"
          ],
          "match_field": "service.public.ip",
          "enrich_fields": [
            "service.name",
            "service.public.next_step",
            "service.public.current_step"
          ]
        }
      }
    },

??? On enrichment but I do not think that is the problem you could take it out...

So hmmmmmm

Everything looks pretty good...

Are you running dedicated Ingest nodes or Combo Data / Ingest Nodes?

Have you tried to delete and recreate the enrich policy?

Is your cluster could be overloaded?

Ingest+coordinating nodes we have 3
Of 16 GB.

Our cluster is also stable.

We haven't tried Delete and create so from where we can track failure enrichment? Cluster logs?

      {
        "enrich": {
          "field": "destination.ip",
          "policy_name": "public-ip-service-enrich-policy-updated",
          "target_field": "enriched",
          "ignore_missing": true, << Set to false 
          "if": "null != ctx.destination?.ip",
          "ignore_failure": true. <<< Set to false and add failure handling... 
        }
      },

Add failure Handling..

I think this can be particularly helpful.

The following example uses the metadata fields to include information about pipeline failures in documents.

PUT _ingest/pipeline/my-pipeline
{
  "processors": [ ... ],
  "on_failure": [
    {
      "set": {
        "description": "Record error information",
        "field": "error_information",
        "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
      }
    }
  ]
}

I attempted to create a new one instead of deleting the old one and also applied the tag on failure also in the pipeline. Here the error.message we are recieving for our backend_ip_range_policy

error.message:
Processor conditional with tag backend_app_enrichment in pipeline failed with message

there is not other information why its failing where to find that information

Proper condition is try that...

ctx.destination?.ip != null

It looks like you only put the catch-all error handler; you can add more detailed ones for each processor.

The good news is something is failing... the bad news it can be hard to find...

You could also try turning up the debug logging for the ingest module.

1 Like

Have tried both conditions but it does not make any difference.

Logically it needs to be failed. as all netflows are not same the destiantion ip is different and if it does not match then it will fail. if we make ignore failture false.

i will try the debug also but i think this is the elasticsearch issue. their enrichment processor is not good enough to handle this. It's is also increasing per documnet time to 2000ms on 4 enrichment policies only.

Maybe i have to look more into it.

Something does not seem right.

How many IPs where are in your lookup data?

Enrich should be pretty fast. I have other customers doing 50K EPS With enrich on IPs...

You say You have coordinating nodes turns out that complicates enrich.... And can make it slower ....

This is because coordinating nodes do not have data... Enrich policies which are data only live on data nodes. So the coordinating nodes actually have to call the data node for every enrichment.

Do you have stack monitoring?

You can also add the elastic stack integration assets from the integrations on the cluster where are doing stack monitoring ... This adds to dashboards to do in detail analysis of ingest pipelines and you can see where you're spending your time.

No, you don't actually have to deploy elastic agent and the integration. The dashboards will be valid just with stack monitoring data.