Dec 18th 2022: [EN] Data Ingestion using GraphQL & Filebeat

Introduction

At Elastic InfoSec we leverage our products as much as possible and we’ve published a number of blogs on how we leverage our products internally. In one of these blogs, as the Elastic InfoSec Security Engineering team, we highlighted how we deploy our infrastructure using Elastic Cloud on Kubernetes (ECK) and Helm. Our responsibility is to deploy and manage InfoSec’s infrastructure and tools. The goal of this short post is to serve as an informative one on how one can configure Filebeat to ingest data using GraphQL.

Filebeat might not be the first solution that comes to mind to use together with GraphQL. However, as we’ll see below, it’s super powerful to query data and ingest it into an Elasticsearch deployment.

What is GraphQL?

GraphQL which is now an emerging technology was once an internal project within Meta to solve a particular problem. Later on, Meta engineers open-sourced the project which saw immediate usage within the industry. GraphQL is a query language that enables developers to query specific data via APIs.

Filebeat Overview

Filebeat is a data shipper that makes it super easy to ingest data into an Elasticsearch cluster. Filebeat has various modules out-of-the-box which helps the user jump-start ingesting data faster. At SecEng, we leverage Filebeat quite heavily and have different deployments ingesting specific data. In some cases, when a module is not available, we leverage Filebeat's various inputs to be able to achieve the same result.

Configuring Filebeat for GraphQL

The input module that will be configured here is the HTTP JSON input. This input is used to read data from those API with JSON payloads.

As a use case, we’re going to ingest BuildKite Audit logs into an Elasticsearch cluster.

Our input config will be mainly divided into four parts:

The first part of the configuration file is setting the httpjson input together with the GraphQL URL endpoint and body with an empty query. The reason behind the empty query is that this will be updated in part two.

filebeat.inputs:
- type: httpjson
  enabled: true
  config_version: 2
  interval: 1h
  request.url: https://graphql.buildkite.com/v1
  request.method: POST
  request.body:
      "query": ""

The second part of the configuration is where the Authorization header is set and defined as an environment variable. The request body is being transformed to include the GraphQL query to get the data. Here the query has the cursor variable that is being defined further below in part four.

request.transforms:
  - set:
      target: header.Authorization
      value: 'Bearer ${BUILDKITE_ACCESS_TOKEN}'
  - set:
      target: body.query
      value: "query {\n  organization(slug: \"elastic\") {\n    auditEvents (last: 100[[if .cursor]], before:\"[[.cursor.last_requested_at]]\"[[end]])  {\n      edges {\n        cursor\n        node {\n          uuid\n          type\n          occurredAt \n          actor {\n            name\n            id\n            type\n            uuid\n          }\n          subject {\n            name\n            type\n          }\n          data \n        }\n      }\n      pageInfo {\n        endCursor\n        hasNextPage\n      }\n    }\n  }\n}"

We want to avoid having Filebeat ingesting duplicate data every hour. response.request_body_on_pagination allow us to paginate the data.

response.split:
  target: body.data.organization.auditEvents.edges
  type: Array
response.request_body_on_pagination: true
response.pagination:
  - set:
      target: body.query
      value: "query {\n  organization(slug: \"elastic\") {\n    auditEvents (last: 100, before:\"[[.cursor.last_requested_at]]\")  {\n      edges {\n        cursor\n        node {\n          uuid\n          type\n          occurredAt \n          actor {\n            name\n            id\n            type\n            uuid\n          }\n          subject {\n            name\n            type\n          }\n          data \n        }\n      }\n      pageInfo {\n        endCursor\n        hasNextPage\n      }\n    }\n  }\n}" 

The cursor in GraphQL is the variable used to keep the offset of the data. In Filebeat, the cursor keeps the state between the first request and the last one, when all the data of each request has been published. Each of the request and response will read the cursor defined here to be able to paginate throughout the data.

cursor:
  last_requested_at:
    value: '[[(index .last_response.body.data.organization.auditEvents.edges 0).cursor]]'

The output of this configuration is the Elastisearch cluster we will be ingesting data in. Here the host, username, and password are being defined as environment variables.

setup.template.enabled: false
setup.ilm.enabled: false
 
output.elasticsearch:
    hosts: 'https://${ELASTICSEARCH_ADDRESS}'
    username: ${ELASTICSEARCH_USERNAME}
    password: ${ELASTICSEARCH_PASSWORD}
    index: buildkite_auditlogs
    ssl:
      enabled: true
      supported_protocols: TLSv1.2

logging.level: debug

Ingest Pipeline

The initial data ingested by Filebeat would not follow the Elastic Common Schema (ECS). In light of this, ingest pipelines allows us to perform transformations on the data just before the data is ingested. The following ingest pipeline will cover all fields and transform them into ECS-compliant fields.

{
 "description": "Ingest pipeline used for Buildkite auditlogs",
 "processors": [
   {
     "json": {
       "field": "message",
       "ignore_failure": true
     }
   },
   {
     "json": {
       "field": "message.node.data",
       "target_field": "data",
       "ignore_failure": true
     }
   },
   {
     "set": {
       "field": "cursor",
       "override": false,
       "ignore_empty_value": true,
       "ignore_failure": true,
       "copy_from": "message.cursor"
     }
   },
   {
     "set": {
       "field": "actor.id",
       "override": false,
       "ignore_empty_value": true,
       "ignore_failure": true,
       "copy_from": "message.node.actor.id"
     }
   },
   {
     "set": {
       "field": "actor.type",
       "copy_from": "message.node.actor.type",
       "override": false,
       "ignore_empty_value": true,
       "ignore_failure": true
     }
   },
   {
     "set": {
       "field": "user.name",
       "copy_from": "message.node.actor.name",
       "override": false,
       "ignore_empty_value": true,
       "if": "ctx.actor != null && ctx.actor.type == 'USER'",
       "ignore_failure": true
     }
   },
   {
     "set": {
       "field": "actor.uuid",
       "copy_from": "message.node.actor.uuid",
       "override": false,
       "ignore_empty_value": true,
       "ignore_failure": true
     }
   },
   {
     "date": {
       "field": "message.node.occurredAt",
       "formats": [
         "ISO8601"
       ],
       "target_field": "event.created",
       "ignore_failure": true
     }
   },
   {
     "set": {
       "field": "subject.name",
       "copy_from": "message.node.subject.name",
       "override": false,
       "ignore_empty_value": true,
       "ignore_failure": true
     }
   },
   {
     "set": {
       "field": "subject.type",
       "copy_from": "message.node.subject.type",
       "override": false,
       "ignore_empty_value": true,
       "ignore_failure": true
     }
   },
   {
     "set": {
       "field": "event.action",
       "copy_from": "message.node.type",
       "override": false,
       "ignore_empty_value": true,
       "ignore_failure": true
     }
   },
   {
     "set": {
       "field": "uuid",
       "copy_from": "message.node.actor.uuid",
       "override": false,
       "ignore_empty_value": true,
       "ignore_failure": true
     }
   },
   {
     "set": {
       "field": "repository.name",
       "copy_from": "data.provider_settings.repository",
       "if": "ctx.data.provider_settings != null && ctx.data.provider_settings.repository != null",
       "ignore_failure": true
     }
   },
   {
     "set": {
       "field": "repository.url",
       "copy_from": "data.repository",
       "if": "ctx.data.repository != null",
       "ignore_failure": true
     }
   },
   {
     "set": {
       "field": "user.target.name",
       "copy_from": "data.user.name",
       "if": "ctx.data.user != null && ctx.data.user.name != null",
       "ignore_failure": true
     }
   },
   {
     "set": {
       "field": "user.target.email",
       "value": " ",
       "ignore_failure": true,
       "description": "This will be populated by the script processor"
     }
   },
   {
     "script": {
       "lang": "painless",
       "source": "if (ctx.data.email != null) {\n    ctx.user.target.email = ctx.data.email;\n}\nelse if (ctx.data.user.email != null) {\n    ctx.user.target.email = ctx.data.user.email;\n}",
       "if": "ctx.data != null",
       "ignore_failure": true
     }
   },
   {
     "remove": {
       "field": "message",
       "ignore_failure": true
     }
   }
 ]
}

Mapping

The next step is to prepare an index template in Kibana that will hold the data. This can be set via Kibana UI Stack Management > Index Management > Index Templates > Create Template

{
 "template": {
   "settings": {
     "index": {
       "lifecycle": {
         "name": "buildkite_auditlogs",
         "rollover_alias": "buildkite_auditlogs"
       },
       "default_pipeline": "buildkite_auditlogs_ecs"
     }
   },
   "mappings": {
     "dynamic": true,
     "numeric_detection": false,
     "date_detection": true,
     "dynamic_date_formats": [
       "strict_date_optional_time",
       "yyyy/MM/dd HH:mm:ss Z||yyyy/MM/dd Z"
     ],
     "_source": {
       "enabled": true,
       "includes": [],
       "excludes": []
     },
     "_routing": {
       "required": false
     }
   }
 },
 "index_patterns": [
   "buildkite_auditlogs*"
 ],
 "composed_of": [
   "ecs-1.12.1"
 ]
}

Initiate Index

The final part before switching on Filebeat is to initiate the index via Kibana Dev Tools.

PUT %3Cbuildkite_auditlogs-%7Bnow%2Fd%7D-00000%3E
{
  "aliases": {
    "buildkite_auditlogs": {
      "is_write_index": true
    }
  }
}

Sample Output

As soon as Filebeat is started, we should start seeing the first data being ingested in Kibana. The below is one of many types of data from Buildkite Audit logs.

{
   "cursor": "ktkhMjAyMi0xMS0yMSAxNDozMTowNC4zMzg1NjEwMDAgVVRD2SQwMTg0OWE5OS04OTk0LTQ0ZWYtOTNhOS1hMDdlMTQ0NTZiODk=",
   "agent": {
     "hostname": "filebeat-buildkite-statefulset-0",
     "name": "filebeat-buildkite-statefulset-0",
     "id": "5e35b5d7-3a1e-464b-ad5d-f54420877f1b",
     "type": "filebeat",
     "ephemeral_id": "3d4f0504-8bcb-4ead-b4c3-790329f3f57e",
     "version": "7.17.7"
   },
   "data": {
     "account_uuids": [
       "e0f3970e-3a75-4621-919f-e6c773e2bb12"
     ],
     "description": "default token",
     "scopes": [
       "read_agents",
       "read_teams",
       "read_artifacts",
       "read_builds",
       "read_job_env",
       "read_organizations",
       "read_pipelines"
     ]
   },
   "subject": {
     "name": "default token",
     "type": "API_ACCESS_TOKEN"
   },
   "uuid": "9b2b0115-7de3-4e3c-881d-66e5d4aa1b8a",
   "actor": {
     "id": "QXVkaXRBY3Rvci0tLTAxODQ5YTk5LTg5OTQtNDRlZi05M2E5LWEwN2UxNDQ1NmI4OQ==",
     "type": "USER",
     "uuid": "9b2b0115-7de3-4e3c-881d-66e5d4aa1b8a"
   },
   "input": {
     "type": "httpjson"
   },
   "@timestamp": "2022-11-21T14:44:01.820Z",
   "ecs": {
     "version": "1.12.0"
   },
   "host": {
     "name": "filebeat-buildkite-statefulset-0"
   },
   "event": {
     "created": "2022-11-21T14:31:04.338Z",
     "action": "USER_API_ACCESS_TOKEN_ORGANIZATION_ACCESS_ADDED"
   },
   "user": {
     "name": "Chris Cutajar",
     "target": {
       "email": " "
     }
   }
 }

Conclusion

Above we’ve seen how Filebeat can be configured to use GraphQL queries and ingest data into Elasticsearch. We’ve used BuildKite Audit logs as our use case. The same process can be used for other platforms that leverage GraphQL to ingest data into the stack.

1 Like

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.