Pub/Sub integration with JSON Processor: "Cannot index event (status=400): dropping event! Enable debug logs to view the event and cause."

I have created a Pub/Sub integration instance which imports data from a topic and ingests it into Elastic. This part works as expected.

If I then re-configure the integration and specify a custom ingest pipeline (which is currently empty), this also works fine and ingests the data as expected.

I have then added a JSON processor which converts the field "message" into a JSON object. As soon as I save the pipeline I get a error in my Elastic Agent logs:

"Cannot index event (status=400): dropping event! Enable debug logs to view the event and cause."

FYI, I'm using: elastic-agent-8.12.2-darwin-x86_64

To attempt to enable logging, I edit /Library/Elastic/Agent/elastic-agent.yml and set agent.logging.level: debug. However after restarting the agent, I don't see any logging whatsoever.

I have used the pipeline simulator and can confirm the JSON processor does exactly what it's meant to do - so it's unlikely to be a problem however, it stops the elastic-agent from processing anything and I have no way to diagnose.

Can anyone provide some help to identify the issue please?

The processor configuration is as follows:

PUT _ingest/pipeline/mcd-transactions-prod
{
  "description": "Production version of the ingestion pipeline for mcd events.",
  "processors": [
    {
      "json": {
        "field": "_source.message",
        "target_field": "_source.livestore",
        "allow_duplicate_keys": true,
        "strict_json_parsing": false
      }
    }
  ]
}

Really struggling with this, any help provided would be most appreciated thanks in advance.

Could it be because the data stream has a specific schema which you need to honour and applying processors to the pipeline makes these documents non-conformant? Not sure how you would attest that theory though?

This is probably the issue, error 400 means bad request, this normally happens when you have a mapping conflict and Elasticsearch cannot index your document because of it.

How does your message looks like? Please share a sample of your json message so someone can try to replicate your issue.

Also, change your target field, If I'm not wrong you cannot make changes to the _source field.

Change this to something else, like livestore

Try something like this:

1 Like

Thanks for this! It is still failing with the same error. I had resorted to _source.message as I had originally used message and it didn't work either.

My processor looks like this:

PUT _ingest/pipeline/mcd-transactions-prod
{
  "description": "Production version of the ingestion pipeline for mcd events.",
  "processors": [
    {
      "json": {
        "field": "message",
        "target_field": "livestore",
        "allow_duplicate_keys": true,
        "strict_json_parsing": false
      }
    }
  ]
}

And the elastic agent has detected and restarted but still complaining:

Cannot index event (status=400): dropping event! Enable debug logs to view the event and cause.

Any ideas?

My sample keeps getting automatically deleted by this site, so I've linked below to a gist:

Yeah, not sure what is the issue.

I suggest that you enable failur handling on your processor to get the error, this should have more information on why the ingest pipeline is failing.

Check the documentation on how to handle pipeline failures.

Thanks for the help. Sadly it's not playing ball.. I uninstalled the elastic agent and re-installed it just in case it was a problem with receiving updated configuration, but I get the same error with the following pipeline:

PUT _ingest/pipeline/mcd-transactions
{
  "description": "Production version of the ingestion pipeline for mcd events.",
  "processors": [
    {
      "json": {
        "field": "message",
        "target_field": "livestore",
        "allow_duplicate_keys": true,
        "strict_json_parsing": false,
        "on_failure": [
          {
            "set": {
              "description": "Record error information",
              "field": "error_information",
              "value": "Processor {{ _ingest.on_failure_processor_type }} with tag {{ _ingest.on_failure_processor_tag }} in pipeline {{ _ingest.on_failure_pipeline }} failed with message {{ _ingest.on_failure_message }}"
            }
          }
        ]
      }
    }
  ]
}

Hi @tommed

Yup, you have a hard-to-debug mapping issue...

The pipeline is working but when elasticserach tries to write the doc ... the document is invalid.

Go to Kibana Dev Tools

If you run, _simulate, it will run

POST _ingest/pipeline/mcd-transactions-prod/_simulate
{
  "docs": [
    {
      "_source": {
    "input": {
      "type": "gcp-pubsub"
    },
...
    "event": {
      "created": "2024-03-05T16:05:05.943Z",
      "id": "457b5de599-9390117397579038",
      "dataset": "gcp_pubsub.notifications"
    },
    "message": "{\"uid\":\"c4556918eda84716b97169e23f91347f\",\"event_time_local\":\"2024-03-05T11:04:48Z\",\"event_time_utc\":\"2024-03-05T16:04:48Z\",\"rule_triggered\":\"Sale\",\"agent_urn\":\"REDACTED.mcd.agents.ca.vipro.online\",\"region\":\"REDACTED\",\"store\":\"REDACTED\",\"store_number\":\"REDACTED\",\"pos\":\"REDACTED\",\"has_media\":true,\"meta\":{\"Booth\":\"0\",\"BusinessDate\":\"638451936000000000\",\"Cashless\":\"REDACTED\",\"CheapestItem\":\"2.35\",\"CrewId\":\"0\",\"CrewName\":\"REDACTED\",\"CustomerId\":\"\",\"FilePath\":\"c4556918eda84716b97169e23f91347f.zip\",\"GrillQuantity\":\"0\",\"HasVoidedItems\":\"False\",\"Id\":\"REDACTED\",\"IniAttachmentWriteMethods\":\"c4556918eda84716b97169e23f91347f-Itl=base64,c4556918eda84716b97169e23f91347f-Receipt=utf8_bytes\",\"IsMobileOrder\":\"False\",\"IsPromo\":\"False\",\"IsRecall\":\"No\",\"IsRefund\":\"False\",\"IsTransactionVoid\":\"0\",\"ItemActions\":\"[RECIPE, True]\",\"ItemCategories\":\"[FOOD, True]\",\"ItemCodes\":\"[11627, True],[89100020, True]\",\"ItemQuantity\":\"3\",\"ItemTypes\":\"[PRODUCT, True],[NON_FOOD_PRODUCT, True]\",\"Key\":\"POS0011:942173294\",\"Kind\":\"Sale\",\"LineItemCount\":\"3\",\"LiveStore.Version\":\"8.0.0.0\",\"LiveStore.Version.File\":\"8.3.269.0\",\"LiveStore.Version.Product\":\"8.3.269.0\",\"LocalTime\":\"638452334880000000\",\"MachineName\":\"REDACTED\",\"Major\":\"1824\",\"Managers\":\"\",\"Minor\":\"0\",\"MobileOrderKey\":\"\",\"MobileOrderType\":\"\",\"MostExpensiveItem\":\"2.35\",\"NodeId\":\"POS0011\",\"NodeStatus\":\"NO_STATUS\",\"NonProductAmount\":\"0\",\"NonProductTax\":\"0\",\"Pod\":\"REDACTED\",\"PosTimingsItemsCount\":\"3\",\"PosTimingsUntilPay\":\"20240305110448888\",\"PosTimingsUntilStore\":\"\",\"PosTimingsUntilTotal\":\"20240305110422862\",\"Products\":\"REDACTED\",\"PromoItemQuantity\":\"0\",\"Reason\":\"Receipt\",\"Receipt\":\"ContentType: Receipt\\r\\nTime:        2024-03-05 11:04:48\\r\\nReceipt:     24\\r\\nCrewId:      0\\r\\nCrewName:    REDACTED\\r\\nTillNumber:  REDACTED\\r\\nLine Items:  3\\r\\nItem Count:  3\\r\\nPromo Items: 0\\r\\nRecalled?    No\\r\\nStoredAway?  False\\r\\nCashless:    REDACTED\\r\\nReduced By:  0.00\\r\\nManagers:    \\r\\nItems:\\r\\nArchCardRedeemed\\r\\n  REDACTED @ 2.35[TAX=0.31]\\r\\n    3x Milk @ 0.00\\r\\n    2x Sugar @ 0.00\\r\\n  REDACTED @ 2.35[TAX=0.30]\\r\\n    REDACTED @ 0.00\\r\\n    REDACTED @ 0.00\\r\\n  REDACTED @ 0.00\\r\\n==========================\\r\\n Total:       5.31\\r\\n==========================\\r\\n\\r\\nPayment Methods: INTERAC\",\"ReceiptNumber\":\"24\",\"ReductionAmount\":\"0.00\",\"RegId\":\"2447\",\"RoomNumber\":\"\",\"Rule.ListenFor\":\"Sale\",\"Rule.Name\":\"Hydra_Receipts\",\"Rule.OnTrigger\":\"Hydra_Receipts\",\"Rule.Policies\":\"(CatchAll='1')\",\"SalesType\":\"TakeOut\",\"Side\":\" \",\"Status\":\"Paid\",\"SysCustomer\":\"VApps\",\"SysRegion\":\"REDACTED\",\"SysStore\":\"REDACTED\",\"TabNumber\":\"\",\"TenderDiscount\":\"No\",\"Tenders\":\"DEBIT CARD\",\"TillEventStrategy\":\"SaleTillEvent\",\"TotalAmount\":\"4.70\",\"TotalExcTax\":\"4.70\",\"TotalPlusTax\":\"5.31\",\"TotalTax\":\"0.61\",\"UniqueId\":\"1lydyc8h2m\",\"UsedGrill\":\"False\",\"UtcOffset\":\"-5\",\"UtcTimestamp\":\"638452334880000000\",\"UtcTimestampFrom\":\"EvSaleEnd\",\"VoidedItemQuantity\":\"0\",\"WasStored\":\"False\",\"WindowsIdentity\":\"NT AUTHORITY\\\\SYSTEM\",\"WindowsUserName\":\"SYSTEM\",\"WindowsVersion\":\"Microsoft Windows NT 6.2.9200.0\"}}",
    "labels": {
      "agent-serial-number": "94087701183300122020706496654009847034311326751",
      "source-service": "cloud-ingress-mcdonalds"
    },
    "tags": [
      "mcd"
    ]
  }
    }
    ]
}

But if you try to actually try to write the doc it will fail

POST discuss-test/_doc?pipeline=mcd-transactions-prod
{
  "input": {
    "type": "gcp-pubsub"
  },
  "agent": {
...

Here is the actual error

{
  "error": {
    "root_cause": [
      {
        "type": "illegal_argument_exception",
        "reason": "can't merge a non object mapping [livestore.meta.LiveStore.Version] with an object mapping"
      }
    ],
    "type": "illegal_argument_exception",
    "reason": "can't merge a non object mapping [livestore.meta.LiveStore.Version] with an object mapping"
  },
  "status": 400
}

You have "LiveStore.Version": all over the place, which is causing mapping conflicts

How to fix?
You will need to clean that up before you write
You probably need to de-dot some fields and move some fields around etc...

BTW ^^^ Pro Tips for debugging Ingest pipelines :slight_smile:

2 Likes

Amazing, thanks for the tip about adding the processor in an index PUT!

Silly question though, how would I go about de-dotting the field:

{
  "livestore": {
    "meta": {
      "LiveStore.Version": "123.0.0.1"
    }
  }
}

I've tried both:

{
  "dot_expander": {
    "field": "livestore.meta.LiveStore.Version"
  }
}

and

{
  "dot_expander": {
    "field": "LiveStore.Version"
  }
}

The problem is clearly defining a path which contains fields with dots in them and having a genuine path there too?!

One alternative that you may try is to use gsub processor before your json processor and change the two conflicting fields.

Something like this:

{
  "gsub": {
    "field": "message",
    "pattern": "LiveStore.Version.File",
    "replacement": "LiveStore.VersionFile"
  }
},
{
    "gsub": {
      "field": "message",
      "pattern": "LiveStore.Version.Product",
      "replacement": "LiveStore.VersionProduct"
    }
}

As @stephenb mentioned, the conflict that you have is, at least on the sample provided, on the LiveStore.Version field.

You have this field as both an object with LiveStore.Version.File and LiveStore.Version.Product and as a string on LiveStore.Version, and this is not possible to have in an elasticsearch document.

So if you rename the two conflicting fields, it may work.

2 Likes

Amazing I've done that. I've certainly learnt a LOT today! :slight_smile:

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.