Ingest pipeline multiplies processed documents (resulting in three copies)

I use an ingest pipeline with a grok processor to add some extra fields to my documents.

I have noticed however, that when the pipeline is active, in the end there are three copies of the document in ElastcSearch, all identical except for the field _id:

All three copies have the same fields (including the extra fields added by the pipeline).

In detail, every document looks like this (many, but not all fields can be seen):

The pipeline I am using looks like this:

"processors" : [
  {
    "grok" : {
      "field" : "deviceInfo.inventarTag",
      "patterns" : [
        "%{DATA:inventarTag1},%{DATA:inventarTag2},%{DATA:inventarTag3};%{DATA:inventarTag4};%{NUMBER:inventarTag5};(%{DATA:deviceInfo.workProfile};%{DATA:deviceInfo.hubInstalled};%{DATA:deviceInfo.authType};)?"
      ]
    }
  }
]

So it creates the new fields inventarTag1 ... inventarTag5 and deviceInfo.workProfile, deviceInfo.hubInstalled, deviceInfo.authType.

As I wrote, if I deactivate the pipeline the problem with the 3 copies of the same document disappears. Why does the ingest pipeline multiply the documents?

A pipeline cannot index a document more than once (bugs excluded :slight_smile:) . I have never seen any behaviour.

Can you share your Elastic Stack version, the full pipeline and along with index mappings and sample documents for a full recreation on a local system? That would be extremely helpful.

Thank you for your answer. To answer your questions in order:

Elasticsearch version 6.8.6.

GET /_ingest/pipeline/666666

{
  "666666" : {
    "description" : "A",
    "processors" : [
      {
        "grok" : {
          "field" : "deviceInfo.inventarTag",
          "patterns" : [
            "%{DATA:inventarTag1},%{DATA:inventarTag2},%{DATA:inventarTag3};%{DATA:inventarTag4};%{NUMBER:inventarTag5};(%{DATA:deviceInfo.workProfile};%{DATA:deviceInfo.hubInstalled};%{DATA:deviceInfo.authType};)?"
          ]
        }
      }
    ]
  }
}

The index mappings are so (the pipeline has previously been active during that day, so that the mapping also contains the extra fields. I have not defined any fields, I have let ElasticSearch determine the types at the time of the indexing):

GET fluentbit-acc-einbuchung-2020.10.16

{
  "fluentbit-acc-einbuchung-2020.10.16" : {
    "aliases" : { },
    "mappings" : {
      "acc-einbuchung" : {
        "properties" : {
          "@timestamp" : {
            "type" : "date"
          },
          "anbieter" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "callid" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "deviceInfo" : {
            "properties" : {
              "appVersion" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              },
              "authType" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              },
              "deviceOS" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              },
              "deviceType" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              },
              "fahrplanVersion" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              },
              "hubInstalled" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              },
              "inventarTag" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              },
              "workProfile" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              }
            }
          },
          "deviceUUID" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "fahrtabschnitt" : {
            "properties" : {
              "bis" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              },
              "bisZeit" : {
                "type" : "date"
              },
              "von" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              },
              "vonZeit" : {
                "type" : "date"
              }
            }
          },
          "hostname" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "inventarTag1" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "inventarTag2" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "inventarTag3" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "inventarTag4" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "inventarTag5" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "log" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "loglevel" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "mobileNr" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "msgjson" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "path" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "rolle" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "stream" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "time" : {
            "type" : "date"
          },
          "zugId" : {
            "properties" : {
              "startBahnhof" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              },
              "startZeit" : {
                "type" : "date"
              },
              "zugNr" : {
                "type" : "text",
                "fields" : {
                  "keyword" : {
                    "type" : "keyword",
                    "ignore_above" : 256
                  }
                }
              }
            }
          }
        }
      }
    },
    "settings" : {
      "index" : {
        "number_of_shards" : "1",
        "provided_name" : "fluentbit-acc-einbuchung-2020.10.16",
        "default_pipeline" : "_none",
        "creation_date" : "1602828864100",
        "number_of_replicas" : "1",
        "uuid" : "1OixMJXwRxmERrhaUgRaLw",
        "version" : {
          "created" : "6080699"
        }
      }
    }
  }
}

A sample document, which I got from Kibana at the time that the Pipeline was not active (so that there exists only one copy of it). As I wrote before, my experience is that if I set the above pipeline to be the default for the above index, then as a result I get not only the extra fields by grok, but also a repetition of the document two more times, with the only difference being the _id field.

{
  "_index": "fluentbit-acc-einbuchung-2020.10.16",
  "_type": "acc-einbuchung",
  "_id": "oin4MXUBnFBbs6sFPjPS",
  "_version": 1,
  "_score": null,
  "_source": {
    "@timestamp": "2020-10-16T15:12:32.289Z",
    "hostname": "einbuchung-6648d8fcc7-sk8f5",
    "mobileNr": "000000000000",
    "deviceUUID": "6647817622e654ef",
    "rolle": "2",
    "zugId": {
      "zugNr": "13597",
      "startBahnhof": "RM",
      "startZeit": "2020-10-16T16:54:00+02:00"
    },
    "anbieter": "R",
    "deviceInfo": {
      "deviceType": "SM-J730F",
      "deviceOS": "9",
      "appVersion": "2.92.14-279",
      "inventarTag": "RIS-ME_GS,RIS-ME_NV,_RIS-ME;SM-J730F;9;no;yes;cert;"
    },
    "loglevel": "INFO ",
    "callid": "ee230e6d-706a-4c94-832a-f96bcc6df9e9",
    "msgjson": "{\"mobileNr\":\"000000000000\",\"deviceUUID\":\"6647817622e654ef\",\"rolle\":\"2\",\"zugId\":{\"zugNr\":\"13597\",\"startBahnhof\":\"RM\",\"startZeit\":\"2020-10-16T16:54:00+02:00\"},\"anbieter\":\"R\",\"deviceInfo\":{\"deviceType\":\"SM-J730F\",\"deviceOS\":\"9\",\"appVersion\":\"2.92.14-279\",\"inventarTag\":\"RIS-ME_GS,RIS-ME_NV,_RIS-ME;SM-J730F;9;no;yes;cert;\"}}",
    "path": "/var/log/containers/einbuchung-6648d8fcc7-sk8f5_zowsdna-acc_einbuchung-cdfa3cded36c4ed72431a0ca1333e080d58bd92d58415943de6e86286b2d89dc.log",
    "log": "2020-10-16 17:12:32.289|http-nio-8080-exec-9|INFO |ee230e6d-706a-4c94-832a-f96bcc6df9e9|d.d.r.e.i.i.r.ServiceRestController|Received Einbuchung: {\"mobileNr\":\"000000000000\",\"deviceUUID\":\"6647817622e654ef\",\"rolle\":\"2\",\"zugId\":{\"zugNr\":\"13597\",\"startBahnhof\":\"RM\",\"startZeit\":\"2020-10-16T16:54:00+02:00\"},\"anbieter\":\"R\",\"deviceInfo\":{\"deviceType\":\"SM-J730F\",\"deviceOS\":\"9\",\"appVersion\":\"2.92.14-279\",\"inventarTag\":\"RIS-ME_GS,RIS-ME_NV,_RIS-ME;SM-J730F;9;no;yes;cert;\"}}\n",
    "stream": "stdout",
    "time": "2020-10-16T15:12:32.289281602Z"
  },
  "fields": {
    "@timestamp": [
      "2020-10-16T15:12:32.289Z"
    ],
    "time": [
      "2020-10-16T15:12:32.289Z"
    ],
    "zugId.startZeit": [
      "2020-10-16T14:54:00.000Z"
    ]
  },
  "sort": [
    1602861152289
  ]
}

This is not so much a full recreation but sharing your state. Please provide all the commands you did in order to reproduce. I miss an index creation and indexing request and the corresponding search requests showing the duplication.

I guess you are looking then for something like the following, where I try to index the document (as I find it in _source) in a new index, named duplication. (As I wrote above, I use no index creation request, I just post documents to an index and it gets created if it does not already exist).

However, I could not reproduce -- in this case I see only one copy of the indexed document, even with the pipeline active.

PUT /_ingest/pipeline/666666
{
    "description" : "A",
    "processors" : [
      {
        "grok" : {
          "field" : "deviceInfo.inventarTag",
          "patterns" : [ "%{DATA:inventarTag1},%{DATA:inventarTag2},%{DATA:inventarTag3};%{DATA:inventarTag4};%{NUMBER:inventarTag5};(%{DATA:deviceInfo.workProfile};%{DATA:deviceInfo.hubInstalled};%{DATA:deviceInfo.authType};)?"
          ]
        }
      }
    ]
}

PUT /_template/duplication-ingest
{
  "index_patterns": ["duplication"],
  "settings": {
    "index": {
	    "default_pipeline": "666666"
	  }
  }
}

POST duplication/_doc
{
    "@timestamp": "2020-10-16T15:12:32.289Z",
    "hostname": "einbuchung-6648d8fcc7-sk8f5",
    "mobileNr": "000000000000",
    "deviceUUID": "6647817622e654ef",
    "rolle": "2",
    "zugId": {
      "zugNr": "13597",
      "startBahnhof": "RM",
      "startZeit": "2020-10-16T16:54:00+02:00"
    },
    "anbieter": "R",
    "deviceInfo": {
      "deviceType": "SM-J730F",
      "deviceOS": "9",
      "appVersion": "2.92.14-279",
      "inventarTag": "RIS-ME_GS,RIS-ME_NV,_RIS-ME;SM-J730F;9;no;yes;cert;"
    },
    "loglevel": "INFO ",
    "callid": "ee230e6d-706a-4c94-832a-f96bcc6df9e9",
    "msgjson": "{\"mobileNr\":\"000000000000\",\"deviceUUID\":\"6647817622e654ef\",\"rolle\":\"2\",\"zugId\":{\"zugNr\":\"13597\",\"startBahnhof\":\"RM\",\"startZeit\":\"2020-10-16T16:54:00+02:00\"},\"anbieter\":\"R\",\"deviceInfo\":{\"deviceType\":\"SM-J730F\",\"deviceOS\":\"9\",\"appVersion\":\"2.92.14-279\",\"inventarTag\":\"RIS-ME_GS,RIS-ME_NV,_RIS-ME;SM-J730F;9;no;yes;cert;\"}}",
    "path": "/var/log/containers/einbuchung-6648d8fcc7-sk8f5_zowsdna-acc_einbuchung-cdfa3cded36c4ed72431a0ca1333e080d58bd92d58415943de6e86286b2d89dc.log",
    "log": "2020-10-16 17:12:32.289|http-nio-8080-exec-9|INFO |ee230e6d-706a-4c94-832a-f96bcc6df9e9|d.d.r.e.i.i.r.ServiceRestController|Received Einbuchung: {\"mobileNr\":\"000000000000\",\"deviceUUID\":\"6647817622e654ef\",\"rolle\":\"2\",\"zugId\":{\"zugNr\":\"13597\",\"startBahnhof\":\"RM\",\"startZeit\":\"2020-10-16T16:54:00+02:00\"},\"anbieter\":\"R\",\"deviceInfo\":{\"deviceType\":\"SM-J730F\",\"deviceOS\":\"9\",\"appVersion\":\"2.92.14-279\",\"inventarTag\":\"RIS-ME_GS,RIS-ME_NV,_RIS-ME;SM-J730F;9;no;yes;cert;\"}}\n",
    "stream": "stdout",
    "time": "2020-10-16T15:12:32.289281602Z"
  }
  
GET duplication/_search
{
    "query": {
        "match_all": {}
    }
}

DELETE duplication

The problem appears only in the indexes where fluentbit sends the documents for indexing to elasticsearch, and not when I send them manually to this new index.

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.