Filebeat CEL input blames timeout while debug shows proper response

Hi community,
I'm trying to fetch the Symantec's Event Stream (Broadcom Enterprise Security Group - API Documentation) with filebeat.
First of all, to check if Symantec's side works properly, I wrote this script to fetch the stream with curl:

#!/bin/sh

HOST=https://api.sep.eu.securitycloud.symantec.com
OAUTH="Basic Tz..."

BTOKEN=`curl -q -sS -X POST $HOST/v1/oauth2/tokens -H "accept: application/json" -H "authorization: $OAUTH" -H "content-type: application/x-www-form-urlencoded" | jq -r .access_token`

STREAM="v1/event-export/stream/1db8a6c9-7b60-4451-a698-f4d35a6c0271/0"

curl --location-trusted --request POST "$HOST/$STREAM" \
--header "Content-Type: application/json" \
--header "Accept: application/x-ndjson" \
--header "Accept-Encoding: gzip" \
--header "authorization: $BTOKEN" \
--data "{}" \
--compressed \
--output -

It works properly: gets the token, then connects and starts receiving an event stream.

To make it more robust, I want to use Filebeat.
As a starting point, I took the Symantec Endpoint Security integration for cloud.elastic.co (Symantec Endpoint Security | Elastic integrations | Elastic) and made necessary adjustments (url, post data, etc).

Here's my filebeat.yml:

filebeat.inputs:
  - type: cel
    auth:
      oauth2:
          client:
              id: O2ID*****
              secret: ******
          endpoint_params:
              grant_type: client_credentials
          token_url: https://api.sep.eu.securitycloud.symantec.com/v1/oauth2/tokens
    data_stream:
      dataset: some_dummy_value
      type: logs
    id: id_a
    interval: 10m
    resource:
      ssl: null
      timeout: 4s
      tracer:
          filename: ./logs/cel/trace-*.ndjson
          maxbackups: 3
          maxsize: 4
      url: https://api.sep.eu.securitycloud.symantec.com
    state:
      initial_interval: 24h
      limit: 2000
      next: 0
      want_more: false
    tags:
      - preserve_original_event
      - forwarded
      - rl-cv161

    program: |
      (
        state.want_more ?
          state
         :
          state.with({
            "limit": state.limit,
            "start_date": state.?cursor.last_timestamp.orValue(string(now - duration(state.initial_interval))),
            "end_date": now,
            "next": state.next,
          })
      ).as(state,
        post_request(
          state.url.trim_right("/") + "/v1/event-export/stream/1db8a6c9-7b60-4451-a698-f4d35a6c0271/0",
          "application/json",
          {}.encode_json()
        ).do_request().as(resp, resp.StatusCode == 200 ?
            bytes(resp.Body).decode_json().as(body, 
              (body.?next.orValue(body.total) != body.total).as(want_more, {
                "events": body.events.map(e, {
                  "message": e.encode_json(),
                }),
                "next": want_more ? body.next : 0,
                "want_more": want_more,
                "limit": state.limit,
                "start_date": string(state.start_date),
                "end_date": string(state.end_date),
                "cursor": {
                  ?"last_timestamp": want_more ?
                      state.?cursor.last_timestamp
                    :
                      optional.of(state.end_date),
                }
              })
            )
          :
            {
              "events": {
                "error": {
                  "code": string(resp.StatusCode),
                  "id": string(resp.Status),
                  "message": "POST:"+(
                    size(resp.Body) != 0 ?
                      string(resp.Body)
                    :
                      string(resp.Status) + ' (' + string(resp.StatusCode) + ')'
                  ),
                },
              },
              "want_more": false,
            }
        )
      )

output:
  console:
    pretty: true

logging:    
  to_syslog: false
  level: warning

logging.metrics.enabled: false

Being started, it shows me log entries like this:

{"message":"Post \"https://api.sep.eu.securitycloud.symantec.com/v1/event-export/stream/1db8a6c9-7b60-4451-a698-f4d35a6c0271/0\": net/http: request canceled (Client.Timeout exceeded while awaiting headers)"}

However, while checking the resource.tracer file, I see these records:

  1. HTTP request with authentication info to token URL
  2. HTTP response with bearer token
  3. HTTP request to stream url with bearer token
  4. HTTP response with "http.response.status_code": 200, "http.response.body.content": "{"events": [ {"actor..., "http.response.body.truncated": false, "http.response.body.bytes": 359761, some other fields, and "error.message": "failed to read response body: net/http: request canceled".

This confuses me.

Could you please tell me what can be done to identify an issue?

Best,
Roman Levitsky,
Systems Administrator, Exadel.

I'd recommend breaking this into more testing steps. From the error, we can't be sure that filebeat is connecting properly. Try to simplify your cel program as much as possible to confirm you are connecting.

Here's a program to test your cel program outside of filebeat. GitHub - elastic/mito: message stream processing engine based on CEL
You can save the output from your curl command and test your cel program with mito.

Thank you Trevor for your reply!

I'll give mito a go once I get familiar with it.

However, didn't the log I've mentioned show proper connection?

% head -n4 logs/cel/trace-id_a.ndjson | jq -c | cut -c1-300
{"log.level":"debug","@timestamp":"2025-02-19T21:41:30.530Z","message":"HTTP request","transaction.id":"JACQKRNKN4IHG-1","url.original":"https://api.sep.eu.securitycloud.symantec.com/v1/oauth2/tokens","url.scheme":"https","url.path":"/v1/oauth2/tokens","url.domain":"api.sep.eu.securitycloud.symantec
{"log.level":"debug","@timestamp":"2025-02-19T21:41:31.135Z","message":"HTTP response","transaction.id":"JACQKRNKN4IHG-1","http.response.status_code":200,"http.response.body.content":"{\"access_token\":\"eyJraWQiOiI3b2UxNHBKZVRZU0dqcmJULWVTM01BIiwidHlwIjoiSldUIiwiYWxnIjoiUlMyNTYifQ.eyJzdWIiOiJ7XCJkb
{"log.level":"debug","@timestamp":"2025-02-19T21:41:31.135Z","message":"HTTP request","transaction.id":"JACQKRNKN4IHG-2","url.original":"https://api.sep.eu.securitycloud.symantec.com/v1/event-export/stream/1db8a6c9-7b60-4451-a698-f4d35a6c0271/0","url.scheme":"https","url.path":"/v1/event-export/stre
{"log.level":"debug","@timestamp":"2025-02-19T21:41:35.136Z","message":"HTTP response","transaction.id":"JACQKRNKN4IHG-2","http.response.status_code":200,"http.response.body.content":"{\"events\": [ {\"actor\":{\"file\":{\"folder\":\"\",\"name\":\"\",\"path\":\"\"}},\"category_id\":1,\"connection\":

Hmm....
I noticed that fourth record is always a timeout seconds after third record.
I.e. if timeout set to 4s, it's 4s behind. If timeout is set to 8s, fourth record is 8s behind third and sixth record is 8s behind fifth:

% cat logs/cel/trace-id_a.ndjson | jq -c | cut -c1-160
{"log.level":"debug","@timestamp":"2025-02-19T22:13:37.888Z","message":"HTTP request","transaction.id":"FLPIABLLNCIHG-1","url.original":"https://api.sep.eu.secu
{"log.level":"debug","@timestamp":"2025-02-19T22:13:38.034Z","message":"HTTP response","transaction.id":"FLPIABLLNCIHG-1","http.response.status_code":200,"http.
{"log.level":"debug","@timestamp":"2025-02-19T22:13:38.035Z","message":"HTTP request","transaction.id":"FLPIABLLNCIHG-2","url.original":"https://api.sep.eu.secu
{"log.level":"debug","@timestamp":"2025-02-19T22:13:46.035Z","message":"HTTP response","transaction.id":"FLPIABLLNCIHG-2","http.response.status_code":200,"http.
{"log.level":"debug","@timestamp":"2025-02-19T22:13:47.044Z","message":"HTTP request","transaction.id":"FLPIABLLNCIHG-3","url.original":"https://api.sep.eu.secu
{"log.level":"debug","@timestamp":"2025-02-19T22:13:55.046Z","message":"HTTP response","transaction.id":"FLPIABLLNCIHG-3","http.response.status_code":200,"http.

However, why this "http.response.status_code":200, body, response headers etc???

Pointing it to invalid URL and it handles it properly:

% cat logs/cel/trace-id_a.ndjson | jq -c | cut -c1-170
{"log.level":"debug","@timestamp":"2025-02-19T22:31:27.912Z","message":"HTTP request","transaction.id":"3D398K5ENGIHG-1","url.original":"https://api.sep.eu.securitycloud.
{"log.level":"debug","@timestamp":"2025-02-19T22:31:28.067Z","message":"HTTP response","transaction.id":"3D398K5ENGIHG-1","http.response.status_code":200,"http.response.b
{"log.level":"debug","@timestamp":"2025-02-19T22:31:28.067Z","message":"HTTP request","transaction.id":"3D398K5ENGIHG-2","url.original":"https://api.sep.eu.securitycloud.
{"log.level":"debug","@timestamp":"2025-02-19T22:31:28.099Z","message":"HTTP response","transaction.id":"3D398K5ENGIHG-2","http.response.status_code":400,"http.response.b

log:

{"log.level":"error","@timestamp":"2025-02-19T22:31:28.100Z","log.logger":"input.cel","log.origin":{"function":"github.com/elastic/beats/v7/x-pack/filebeat/input/cel.input.run.func1","file.name":"cel/input.go","file.line":383},"message":"single event object returned by evaluation","service.name":"filebeat","id":"id_a","input_source":"https://api.sep.eu.securitycloud.symantec.com","input_url":"https://api.sep.eu.securitycloud.symantec.com","event":{"error":{"code":"400","id":"400 Bad Request","message":"POST:{\"message\":\"Invalid stream id : 1db8a6c9-7b60-4451-a698-f4d35a6c0271 or shard id : 1\"}"}},"ecs.version":"1.6.0"}

I think the issue might be the timeout setting. Looking at the broadcom documentation, the default timeout there is 120 seconds, seems worth trying that.

I was starting with 30 seconds timeout set by Elastic team in their SES integration and issue was the same.
Set to 120 and issue is the same.

I suspect "request" is not fit to the streaming, because we don't expect our request finish soon. We are making connect and just waiting while server pushing events through it...

Also I've noticed that if I set timeout to several minutes, every two minute it says "unexpected EOF", which conforms the API description stating "The default response timeout is 120 seconds. If there are no new events generated in 120 seconds, the stream connection is closed."

It is unclear to me how this can be handled properly. While using command line, I can press Ctrl-C to stop curl, but how can I implement it with Filebeat input and should I?

Have you tried using the integration directly? Do you want to send this data to Elasticsearch?

No, it loads data from different API.
So I just used it as an example.

I also found that Symantec Event Stream integration is under development (Integrations with Symantec Endpoint Security Event Stream with Elastic · Issue #8972 · elastic/integrations · GitHub) so it is probably too soon to fate it work.

I am considering using the Filebeat httpjson input against Symantec Event Search instead.

I'll follow up on that issue, but you may have luck with the streaming input - Streaming Input | Filebeat Reference [8.17] | Elastic

Thank you Trevor,
I've tried it and was unable to make it work over HTTP, it wants websocket.

Thanks for testing this Roman. I've raised this with the security integrations team to review

I've gotten feedback that the CEL input is intrinsically batched. I've noted the enhancement and hopefully we can support generic http streaming with the streaming_input.
I believe your best bet in the short term is to get this wired up with object storage to take advantage of the existing integration.