I am trying to collect data from an API using CEL filebeat! The API returns data in hourly chunks. The API expects a parameter in the format YYYY-MM-DD:HH, and when queried, it provides data generated during that specific hour.
Data Collection Approach
-
At the start of data collection, I determine the initial_time as (current time - initial interval).
- For example, if the current time is 2025-01-31 13:00 and the initial interval is 5 hours, then the initial_time will be 2025-01-31 08:00, and data collection will start from that time.
-
I collect data by iterating over hourly time windows, incrementing curr_time by one hour per request, until I reach the current time.
Referring to the above example, the API requests would be made for:- 2025-01-31 08:00
- 2025-01-31 09:00
- 2025-01-31 10:00
- 2025-01-31 11:00
- 2025-01-31 12:00
The timestamp is passed in the path parameters.
-
The curr_time is stored in the cursor only when the API responds with a 200 status and data is successfully collected. This ensures data loss prevention.
Considering successful data collection at each time step, the cursor will store:- 2025-01-31 09:00
- 2025-01-31 10:00
- 2025-01-31 11:00
- 2025-01-31 12:00
- 2025-01-31 13:00
After the next interval, data collection resumes from 2025-01-31 13:00.
Issue
- Now there is a catch - If there is no data available at the initial_time (2025-01-31 08:00 in above example), the cursor is not updated.
- As a result, data collection stops prematurely instead of continuing to fetch hourly data until the current time.
- This is not expected behavior because data may be available in the next hour (2025-01-31 09:00).
- Expected behavior: Data collection should continue hourly, regardless of whether data exists at the initial interval.
Question
How can I modify my approach to ensure that data collection does not stop when no data is available at the initial interval, while still maintaining data loss prevention?
Would appreciate any insights or best practices on handling this scenario efficiently.
Please Refer to the filebeat Sample CEL Code:
filebeat.inputs:
- type: cel
config_version: 2
interval: 120m
resource.timeout: 1m
resource.tracer.filename: request/tracer/path
resource.url: BASE_URL
state:
access_token: API_KEY
initial_interval: 24h
redact:
fields:
- access_token
program: |-
(
state.?want_more.orValue(false) ?
state
:
state.with({
"start_time": state.?cursor.last_timestamp.orValue((now - duration(state.initial_interval)).format("2006010215")),
"end_time": now.format("2006010215"),
})
).as(state, state.with(
request(
"GET",
state.url.trim_right("/") + "/api/endpoint"+state.start_time
).with({
"Header": {
"api_key": [string(state.access_token)]
}
}).do_request().as(resp, resp.StatusCode == 200 ?
bytes(resp.Body).decode_json().as(body, {
"events": body.?data.orValue([]).map(e, {
"message": e.encode_json()
}),
"cursor": {
"last_timestamp": (state.start_time.parse_time("2006010215") + duration("1h")).format("2006010215"),
},
"start_time": (state.start_time.parse_time("2006010215") + duration("1h")).format("2006010215"),
"want_more": state.start_time.parse_time("2006010215") + duration("1h") < state.end_time.parse_time("2006010215")
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET:"+(
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
))
output.file:
path: /path/to/output/file
filename: file_name