Hello,
I have a question about tracking state in CEL programs. Why do I need to publish my state values at the end of my program or wrap the program with state.with()
to reuse values defined in my state?
State.with() Example:
Let's look at the integration for AbuseCH.
You can see at the end the only State values published are events and url, but future executions can access state.auth_key. After researching, it seems that wrapping the entire program with the state.with()
method ensures the state is preserved between executions. This works great when possible.
program: |
state.with(
request("GET", state.url).with({
"Header":{
"Content-Type": ["application/json"],
?"Auth-Key": has(state.auth_key) ?
optional.of([state.auth_key])
:
optional.none(),
}
}).as(req, req.do_request().as(resp,
bytes(resp.Body).decode_json().as(body, {
"events": body.payloads.map(payload, {
"message": payload.encode_json()
}),
"url": state.url
})
))
)
Publish State Values at End Example:
Now, take a look at the OpenAI integration, and you will see, even during errors at the end, the necessary state values are saved to use during future executions. The program is not wrapped with state.with()
, so the program ends by writing these to State explicitly at the end. But I thought that the state was saved and accessible during each run, so why do these values need to be written?
program: |
(
!has(state.initial_start_time) ?
state.with({
"initial_start_time": int(timestamp(now - duration(state.initial_interval)))
})
:
state
).as(state,
(
state.?want_more.orValue(false) ?
state
:
state.with({
"page": null,
"initial_start_time": state.?cursor.last_bucket_start.orValue(int(timestamp(now - duration(state.initial_interval))))
})
).as(state,
request(
"GET",
state.url + "?" + {
"start_time": [string(int(state.initial_start_time))],
"page": state.page != null ? [state.page] : [],
"bucket_width": ["{{ bucket_width }}"],
"group_by": ["project_id,user_id,api_key_id,model"]
}.format_query()
).with({
"Header": {
"Authorization": ["Bearer " + state.access_token],
"Content-Type": ["application/json"]
}
}).do_request().as(resp, resp.StatusCode == 200 ?
bytes(resp.Body).decode_json().as(body,
{
"events": body.data.filter(bucket,
!(body.has_more == false && bucket.start_time >= body.data.map(bucket, bucket.start_time).max()))
.map(bucket, size(bucket.results) > 0 ? bucket.results.map(result, { "message": result.with({"start_time": bucket.start_time, "end_time": bucket.end_time }).encode_json() }) : [{}]).flatten()
,
"cursor": {
"last_bucket_start": size(body.data) > 0 ? body.data.map(bucket, bucket.start_time).max() : state.?cursor.last_bucket_start,
"last_bucket_end": size(body.data) > 0 ? body.data.map(bucket, bucket.end_time).max() : state.?cursor.last_bucket_end
},
"want_more": body.has_more,
"page": body.has_more ? body.next_page : null,
"access_token": state.access_token,
"initial_start_time": state.initial_start_time,
"url": state.url
}
)
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"message": "GET: " + (size(resp.Body) != 0 ? string(resp.Body) : string(resp.Status) + " (" + string(resp.StatusCode) + ")")
}
},
"want_more": false,
"access_token": state.access_token,
"initial_start_time": state.initial_start_time
}
)
)
)
In summary, there seems to be two primary State strategies in CEL integration programs.
- Wrap the entire program with
state.with()
and it will keep any initialized state values for future executions unless they are exility overwritten - Publish the values to State at the end during success and failures to ensure the state values are accessible during future executions when the logic is more complex and the program cannot simply be wrapped with
state.with()
I am confused why these options are needed at all, if you initialize a program with state values and never overwrite these state values, why are they not accessible during future executions?
For example, this code will fail because I did not publish account_id or any of the auth parameters at the end, but they were initialized in State at one point in time and I don't see where I could potentially be clearing them out. At the beginning I keep the state and only add to it with state.with()
or persist it with .as(state, ... )
. So any State values from the previous run like account_id would be there still, but it eventually fails, meaning somehow they are getting dropped.
If you have any experience with CEL and can help me wrap my head around this, that would be much appreciated. Thanks!
My Code Broken Example:
program: |
(
has(state.want_more) && state.want_more ?
state
:
state.with({
"cursor" : {
"page": 1,
"last_timestamp": state.?cursor.last_timestamp.orValue((now - duration(state.?initial_interval.orValue("720h"))).format(time_layout.RFC3339)),
},
"per_page" : has(state.per_page) && state.per_page > 0 ? state.per_page : 100,
})
).as(state, state.with(get_request(
state.url.trim_right("/") + "/client/v4/accounts/" + state.account_id + "/audit_logs?" + {
"since": [state.cursor.last_timestamp],
"page": [string(state.cursor.page)],
"per_page": [string(state.per_page)],
"direction": ["asc"],
}.format_query()
)).with({
"Header": {
"Content-Type": ["application/json"],
?"Authorization": has(state.auth_token) && size(state.auth_token) > 0 ?
optional.of(["Bearer " + state.auth_token])
:
optional.none(),
?"X-Auth-Email": has(state.auth_email) && size(state.auth_email) > 0 && !(has(state.auth_token) && size(state.auth_token) > 0 ) ?
optional.of([state.auth_email])
:
optional.none(),
?"X-Auth-Key": has(state.auth_key) && size(state.auth_key) > 0 && !(has(state.auth_token) && size(state.auth_token) > 0 ) ?
optional.of([state.auth_key])
:
optional.none(),
}
}).do_request().as(resp, resp.StatusCode == 200 ?
bytes(resp.Body).decode_json().as(body, {
"events": body.result.map(e, {
"message": e.encode_json(),
}),
"want_more": (has(body.result) && size(body.result) == state.per_page ? true : false),
"cursor": (
(has(body.result) && size(body.result) == state.per_page ?
{
"page": int(state.cursor.page) + 1,
"last_timestamp": state.cursor.last_timestamp,
}
:
has(body.result) && size(body.result) > 0 ?
({
"page": 1,
"last_timestamp": (timestamp(body.result.map(e, e.when).map(t, timestamp(t)).max().as(batch_last_timestamp, (
batch_last_timestamp > timestamp(state.cursor.last_timestamp) ?
batch_last_timestamp.format(time_layout.RFC3339)
:
state.cursor.last_timestamp
))) + duration("1s")).format(time_layout.RFC3339),
})
:
state.cursor.last_timestamp
)
),
})
:
{
"events": {
"error": {
"code": string(resp.StatusCode),
"id": string(resp.Status),
"message": "GET:"+(
size(resp.Body) != 0 ?
string(resp.Body)
:
string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
),
},
},
"want_more": false,
}
)
)