📌 [Filebeat HTTPJSON] Preserve Last Page Cursor for Next Run When Pagination Ends

Hi Elastic Team,

I'm using Filebeat’s httpjson input to integrate with a paginated API that uses a next field for pagination. Here's what I have working so far:

:white_check_mark: Current Working Setup

  • response.pagination is configured to follow the next page value but doesn't wotk:
"response.pagination": [
  {
    "set": {
      "target": "url.value",
      "value": "[[ .last_response.body.next ]]"
    },
    "if": "[[ .last_response.body.next ]] != null"
  },
  {
    "stop": {},
    "if": "[[ .last_response.body.next ]] == null"
  }
]
  • This is working fine:
  • response.split works correctly on body.results.
  • I’m using document_id: [[ .id ]] to avoid duplicates.

:cross_mark: The Problem

After the last page is reached ("next": null), the pipeline stops as expected. But on the next run, Filebeat starts again from the first page because there’s no mechanism to store and reuse the last page URL.

:bullseye: What I Want

I want Filebeat to:

  • Store the last known page URL (before reaching null), and
  • Start from that stored URL in the next interval run (rather than going back to the first page).

:magnifying_glass_tilted_left: Attempted Solutions

  • I tried using a second set on .last_response.body.previous if next is null — but that’s only for the current run and doesn’t persist.
  • I looked at custom request cursor, but there’s no way to store a fallback value in it on stop.

:folded_hands: Feature Request / Help Needed

Is there a way to:

  • Persist the cursor (URL or ID) when pagination ends?
  • Set a fallback cursor on stop condition?
  • Or enhance cursor handling to cache the last next value across runs?

Thanks for any insights or guidance!

We recommend the CEL input (_ Custom API using Common Expression Language_) for all new API integrations because it is more flexible.

In order to persist state between intervals you will need to configure the "custom request cursor" section (HTTP JSON input | Filebeat Reference [8.18] | Elastic). It might be helpful to look at some of the many examples in our integrations repo: Code search results · GitHub

I'm configuring a Custom API integration using Common Expression Language (CEL) in Elastic, aiming to paginate through API responses until the next field is null then save state with previous page for next run. However, I'm encountering the following error during program compilation:

Failed

failed to check program: failed compilation: ERROR: <input>:1:11: Syntax error: token recognition error at: '= ' | respBytes = get(state.page).Body | ..........^ ERROR: <input>:1:13: Syntax error: mismatched input 'get' expecting <EOF> | respBytes = get(state.page).Body | ............^ ERROR: <input>:2:9: Syntax error: token recognition error at: '= ' | decoded = body.decode_json(respBytes) | ........^ accessing config:contentReference[oaicite:14]{index=14}

Here's the API response I'm working with:

{
    "count": 243112,
    "next": "http://192.168.1.1:3400/feed/indicator/?page=2",
    "previous": null,
    "results": [
        {},
        {}
    ]
}

And this is the CEL program I'm using which is generated by AI:

respBytes := get(state.page).Body
decoded := body.decode_json(respBytes)
{
    events: decoded.results,
    state: { page: if decoded.next != null && decoded.next != "" then decoded.next else state.page }
}

I'm not sure what's causing these syntax errors. Could someone help me identify and resolve the issue?

Thank you in advance for your assistance!

Do you have a specification for this API that you can share? Here's something that will fetch the data and turn each result into an event. It is adapted from an existing integration (linked below). But without knowing more about the API behavior, I cannot say if this is correct.

state.with(
        get(state.?next.orValue(state.url)).as(resp, (resp.StatusCode == 200) ?
                bytes(resp.Body).decode_json().as(body,
                        {
                                "events": body.results.map(e,
                                        {
                                                "message": e.encode_json(),
                                        }
                                ),
                                ?"next": body.?next,
                                "want_more": has(body.next),
                        }
                )
        :
                {
                        "events": {
                                "error": {
                                        "code": string(resp.StatusCode),
                                        "id": string(resp.Status),
                                        "message": "GET:" +
                                        (                                                                                                                                                                                                                  (size(resp.Body) != 0) ?                                                                                                                                                                                           string(resp.Body)
                                                :
                                                        string(resp.Status) + " (" + string(resp.StatusCode) + ")"                                                                                                                                 ),                                                                                                                                                                                                 },
                        },
                        "want_more": false,
                }
        )
)

References