Avoid duplicates via node ingest pipelines

Hi all,

I need a solution on the elastic side to handle duplicate logs. I need to develop an elastic ingest node pipeline that can manage duplicates by replacing _id with uuid, any other suggestions are welcome, but I do need to use elastic to manage it. We would like to avoid using logstash for this matter, so I was wondering if this is possible to do using elastic ingest nodes with a pipeline, I was unable to find a solution other than logstash for now.

Thanks a lot

Hi @omartinez,

I'm not sure I understood your usage scenario correctly. Do you mean that you are ingesting documents with the same _id, but you don't want them to overwrite each other?

Hi Felix,

Our cluster is ingesting logs from fluentd, and when there is a network issue, or elastic is less responsive fluentd is trying to send the same bulk request over an over again to elastic. Elastic is inserting those documents automatically generating an _id everytime, so when that happens we get many duplicates.

Even though there is a solution from fluentd side via a plugin that generates a hash from each document and insert that in the _id field, and that solution works, I cannot implement it because in our setup Rancher does not allow to add custom filters, in logging v1, and logging v2 of their integration.

So I need to manage that from elastic. Although I found a solution using logstash, I would like to avoid having to deploy an additional piece in our elastic architecture to manage it, I'd rather use elastic ingest nodes. So from my investigation I found that I can update the _id field with the uuid of the document, but I don't know if that is possible using an node ingest pipeline.

I'm a huge believer in giving people what they're asking before commenting on should-versus-shouldn't because at this point, only you know what you need. For this reason, here is an example of how you can generate a unique ID to your documents — given the payload from a given log:

  1. Create a script that generates the ID based on a parameter provider:
POST _scripts/generate_id
{
  "script": {
	"lang": "painless",
	"source": """
		String getUUID(def str) {
		  def res = null;
		  if (str != null) {
			 char[] buffer = str.toCharArray();
			 byte[] b = new byte[buffer.length];
			 for (int i = 0; i < b.length; i++) {
			  b[i] = (byte) buffer[i];
			 }
			 res = UUID.nameUUIDFromBytes(b).toString();
		  } else {
			  res = Math.random().toString();
		  }
		  return res;
		}
		if (ctx.containsKey(params.field) && !ctx[params.field].isEmpty()) {
		  ctx._id = getUUID(ctx[params.field]) + "." + getUUID(ctx[params.field].encodeBase64());
		} else {
		  ctx._id = params.field + "_empty_" + getUUID(null);
		}
	"""
  }
}
  1. Create a ingest pipeline that uses the script:
PUT _ingest/pipeline/AvoidDuplicatesPipeline
{
  "processors": [
    {
      "script": {
        "ignore_failure": false,
          "id": "generate_id",
          "params": {
             "field": "message"
          }
      }
    }
  ]
}
  1. Use the ingest pipeline to index documents:
POST /yourIndex/_doc?pipeline=AvoidDuplicatesPipeline
{
  "message" : "This is the content of your log"
}

A few comments about the solution above:

  • It can still lead to duplicates. It just delays it as much as possible, though.
  • Watch the performance of this in high load scenarios and beef up your nodes.
  • It depends on the uniqueness of the message. You may need to tweak the script.

That being said, it is important to highlight the fact that Elasticsearch always generates an identifier for you (UUID based) if you don't specify one during indexing. So why not let Elasticsearch do this for you?

Also, Elasticsearch supports the concept of versions. Even if you index a document with the same ID, a new version of that document will be created. Does versioning sound like a possible solution for your use case?

You can design your use case to use data streams in Elasticsearch, and let the @timestamp field govern the uniqueness of your logging process. This way, you can have different copies of likely "the same log" but at different point-in-times. This is the design that new versions of Filebeat and the Elastic agent is using, apropos.

@riferrei

1 Like

Ah, so I indeed misunderstood. Thanks for the clarification. To summarize: You have no control over the _id with which documents are submitted to Elasticsearch and want to de-duplicate identical documents that are re-submitted with a different _id.

In that case a custom uuid generation script like the one @riferrei just proposed could work or you could try to use the fingerprint processor on a set of fields of your choice (such as @timestamp and @message) and replace the _id field content with that fingerprint value.

2 Likes

This is greatly appreciated information. I will take a deeper look into it. Re to your questions, I hope I understood you correctly.

The issue is if fluentd sends a bulk request to elastic and if that fails, it will retry until it gets ingested. So it can happen that it can try to ingest the same log record many times when there are connections or responsiveness issues, and elastic its going to ingest the same log with different documents _id (autogenerated by elastic), so we get many duplicated logs ingested to elastic. If I am able generate an id for it based on the document that is unique for it, when fluentd tries to ingest the same document it will just update it instead of creating a new one with a different id, and thats the goal. Basically that leads to an ingestion loop.

I also took a look at the data streams, but I think those are implemented in 7.9 up, is that right?. I also noticed that filebeat can also control the uniqueness of a document by generating a hash, but we need to work for now with what we have.

1 Like

The issue is if fluentd sends a bulk request to elastic and if that fails, it will retry until it gets ingested. So it can happen that it can try to ingest the same log record many times when there are connections or responsiveness issues, and elastic its going to ingest the same log with different documents _id (autogenerated by elastic), so we get many duplicated logs ingested to elastic.

I totally understand where you're coming from. You're dealing with arguably one of the worst problems in distributed environments there is: network unavailability. You have our empathy :hugs:

Albeit not the focus of this forum but you may want to take a look at this plugin for FluentD:

I have never used it — but according to this blog post, it seems to help with the problem you're dealing with. Eventually, I recommend talking to your team to upgrade your Elastic deployment and leverage some of the goodies that we discussed here.

Nevertheless, the Elastic community is always here to help :+1:t2:

@riferrei

Thats exactly what I was talking about before. I cannot implement the solution because of rancher fluentd integrations in logging v1, and logging v2, and I do think something like this should be default. Hope someone from rancher is reading this. :slight_smile: but maybe there is a reason for that.

Have a read here if interested. That solution works, just not possible to implement.
GitHub - uken/fluent-plugin-elasticsearch

Thanks a lot for all the information, I have a lot of stuff to test now, really appreciate it.

1 Like

I am just updating the thread to let you know that I tested and implemented the above script but just changing the field, and that worked like a charm. So thank you very much for that. Because of this flooding loop sometimes indices from today and previous day are being ingested, that has performances issues as well, is it possible to also manage to avoid ingesting logs from previous days?

Thanks in advance
Oscar

Thanks for sharing your success!

You might want to look into the drop processor to prevent documents from being indexed based on their timestamp. There is some risk involved to just use time as the criterion since it can't guarantee that the document has been indexed successfully before.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.