Issue with Hourly Data Collection When Initial Interval Has No Data

I am trying to collect data from an API using CEL filebeat! The API returns data in hourly chunks. The API expects a parameter in the format YYYY-MM-DD:HH, and when queried, it provides data generated during that specific hour.

Data Collection Approach

  • At the start of data collection, I determine the initial_time as (current time - initial interval).

    • For example, if the current time is 2025-01-31 13:00 and the initial interval is 5 hours, then the initial_time will be 2025-01-31 08:00, and data collection will start from that time.
  • I collect data by iterating over hourly time windows, incrementing curr_time by one hour per request, until I reach the current time.
    Referring to the above example, the API requests would be made for:

    • 2025-01-31 08:00
    • 2025-01-31 09:00
    • 2025-01-31 10:00
    • 2025-01-31 11:00
    • 2025-01-31 12:00

    The timestamp is passed in the path parameters.

  • The curr_time is stored in the cursor only when the API responds with a 200 status and data is successfully collected. This ensures data loss prevention.
    Considering successful data collection at each time step, the cursor will store:

    • 2025-01-31 09:00
    • 2025-01-31 10:00
    • 2025-01-31 11:00
    • 2025-01-31 12:00
    • 2025-01-31 13:00

    After the next interval, data collection resumes from 2025-01-31 13:00.

Issue

  • Now there is a catch - If there is no data available at the initial_time (2025-01-31 08:00 in above example), the cursor is not updated.
  • As a result, data collection stops prematurely instead of continuing to fetch hourly data until the current time.
  • This is not expected behavior because data may be available in the next hour (2025-01-31 09:00).
  • Expected behavior: Data collection should continue hourly, regardless of whether data exists at the initial interval.

Question

How can I modify my approach to ensure that data collection does not stop when no data is available at the initial interval, while still maintaining data loss prevention?

Would appreciate any insights or best practices on handling this scenario efficiently.

Please Refer to the filebeat Sample CEL Code:

filebeat.inputs:
  - type: cel
    config_version: 2
    interval: 120m
    resource.timeout: 1m
    resource.tracer.filename: request/tracer/path
    resource.url: BASE_URL
    state:
      access_token: API_KEY
      initial_interval: 24h
    redact:
      fields:
        - access_token
    program: |-
      (
        state.?want_more.orValue(false) ?
          state
        :
          state.with({
            "start_time": state.?cursor.last_timestamp.orValue((now - duration(state.initial_interval)).format("2006010215")),
            "end_time": now.format("2006010215"),
          })
      ).as(state, state.with(
        request(
          "GET",
          state.url.trim_right("/") + "/api/endpoint"+state.start_time
        ).with({
          "Header": {
            "api_key": [string(state.access_token)]
          }
        }).do_request().as(resp, resp.StatusCode == 200 ?
          bytes(resp.Body).decode_json().as(body, {
            "events": body.?data.orValue([]).map(e, {
              "message": e.encode_json()
            }),
            "cursor": {
              "last_timestamp": (state.start_time.parse_time("2006010215") + duration("1h")).format("2006010215"),
            },
            "start_time": (state.start_time.parse_time("2006010215") + duration("1h")).format("2006010215"),
            "want_more": state.start_time.parse_time("2006010215") + duration("1h") < state.end_time.parse_time("2006010215")
          })
        :
          {
            "events": {
              "error": {
                "code": string(resp.StatusCode),
                "id": string(resp.Status),
                "message": "GET:"+(
                  size(resp.Body) != 0 ?
                    string(resp.Body)
                  :
                    string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
                ),
              },
            },
            "want_more": false,
          }
        )
      ))
output.file:
  path: /path/to/output/file
  filename: file_name

Why not split it up in two parts? First make an ingest to load the historical data, next add an ingest to keep up with the current data. Will be much simpler than trying to combine both logics in one ingest.

@catn0b0t If the user selects an initial interval of 7 days (168 hours), with 5,000 data points per hour, storing 840,000 events in a single state variable may not be the best approach due to potential performance and memory issues.