Problems with enrichments

Hello,

The process of working with enrichments has been quite challenging for me, and I've been struggling with it for hours. It's first time so I may have done something wrong. However, I believe I'm close to a solution, though I’m still having difficulty finding something that works.

I have two dynamically generated fields from the MITRE ATT&CK framework:

attack_technique: "T1027.010"
attack_tactic: "TA0005"
I would like to enrich these fields to display their full names for better readability. For example:

TA0005 → "Defense Evasion"
T1027.010 → "Command Obfuscation"
I have already ingested the MITRE framework in JSON format. However, when I check Analytics → Discover in the logs, I cannot find fields like attack_tactic_description or attack_technique_description.

Below is my current configuration. Could you please help me identify what might be missing or what I could improve?

Thanks in advance!

#######################################
Bulk Indexing: Mitre Framework into json.

jq -c '. | {"index": {"_index": "mitre_mapping"}}, .' tactics.json > tactics_bulk.json
jq -c '. | {"index": {"_index": "mitre_mapping"}}, .' techniques.json > techniques_bulk.json

curl -X POST "https://x:9200/_bulk" -H "Content-Type: application/json" --data-binary "@tactics_bulk.json" -u elastic:xxx -k
curl -X POST "https://x:9200/_bulk" -H "Content-Type: application/json" --data-binary "@techniques_bulk.json" -u elastic:xxx -k
#######################################

#######################################
Filebeat Configuration:

output.elasticsearch.indices:
  - index: "carbon_black_observations-%{+yyyy.MM.dd}"
    pipeline: "mitre_attack_pipeline"

#######################################

Index Mapping:

PUT mitre_mapping
{
  "mappings": {
    "properties": {
      "attack_tactic": {
        "type": "keyword"
      },
      "attack_tactic_description": {
        "type": "text"
      },
      "attack_technique": {
        "type": "keyword"
      },
      "attack_technique_description": {
        "type": "text"
      }
    }
  }
}

#######################################

#######################################
Ingest Pipelines: mitre_attack_pipeline

[
  {
    "enrich": {
      "policy_name": "mitre_tactic_policy",
      "field": "attack_tactic",
      "target_field": "attack_tactic_description",
      "ignore_missing": true
    }
  },
  {
    "enrich": {
      "policy_name": "mitre_technique_policy",
      "field": "attack_technique",
      "target_field": "attack_technique_description",
      "ignore_missing": true
    }
  }
]

#######################################
#######################################
Enrich Policies:

mitre_tactic_policy
{
  "policies": [
    {
      "config": {
        "match": {
          "name": "mitre_tactic_policy",
          "indices": [
            "mitre_mapping"
          ],
          "match_field": "attack_tactic",
          "enrich_fields": [
            "attack_tactic_description"
          ]
        }
      }
    }
  ]
}

mitre_technique_policy
{
  "policies": [
    {
      "config": {
        "match": {
          "name": "mitre_technique_policy",
          "indices": [
            "mitre_mapping"
          ],
          "match_field": "attack_technique",
          "enrich_fields": [
            "attack_technique_description"
          ]
        }
      }
    }
  ]
}

#######################################
#######################################
Execute Enrich Policies
POST _enrich/policy/mitre_tactic_policy/_execute
POST _enrich/policy/mitre_technique_policy/_execute

#######################################
#######################################

Testing pipleing:
Which looks like it work

POST _ingest/pipeline/mitre_attack_pipeline/_simulate
{
  "docs": [
    {
      "_source": {
        "attack_tactic": "TA0005",
        "attack_technique": "T1027.010"
      }
    }
  ]
}



{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_version": "-3",
        "_id": "_id",
        "_source": {
          "attack_tactic": "TA0005",
          "attack_tactic_description": {
            "attack_tactic_description": "Defense Evasion",
            "attack_tactic": "TA0005"
          },
          "attack_technique": "T1027.010",
          "attack_technique_description": {
            "attack_technique_description": "Command Obfuscation",
            "attack_technique": "T1027.010"
          }
        },
        "_ingest": {
          "timestamp": "2025-02-04T13:28:23.091222826Z"
        }
      }
    }
  ]
}

#######################################

Hi @arcsons,

Thanks for sharing the details of your enrichment. Have your documents for enrichment been ingested after you have created and applied your policy as covered in these steps? I see a simulation example in the provided steps but not confirmation of a document being ingested once the ingest pipeline has been setup.

Let us know!

Hey Carly,
Yes it has been new logs :confused: Also I can't find the new fields.
Should be something I have forgot.. Any idea how can I troubleshoot further?

Do you have any errors coming through the ingest pipeline at all?

How do you know your pipeline is even being called / invoke?...

you can add a set processor and add field like pipeline_name and set the pipeline name so you know that the pipeline is actually being called.

How are you invoking the pipeline?
What version of the stack are you on?

I'm not sure to how check enrichment pipeline logs.

GET _nodes/stats/ingest

Give me something like:

  "mitre_attack_pipeline": {
        "count": 48,
        "time_in_millis": 33,
        "current": 0,
        "failed": 24,
        "ingested_as_first_pipeline_in_bytes": 0,
        "produced_as_first_pipeline_in_bytes": 0,
        "processors": [
          {
            "enrich": {
              "type": "enrich",
              "stats": {
                "count": 22,
                "time_in_millis": 4,
                "current": 0,
                "failed": 0
              }
            }
          },

I'm a beginner of Elastic so I dont know if its called or not...

Add processor, do you mean something like this?

PUT _ingest/pipeline/mitre_attack_pipeline
{
  "description": "Pipeline to enrich MITRE ATT&CK fields",
  "processors": [
    {
      "set": {
        "field": "pipeline_name",
        "value": "mitre_attack_pipeline"
      }
    },
    {
      "enrich": {
        "policy_name": "mitre_tactic_policy",
        "field": "attack_tactic",
        "target_field": "attack_tactic_description",
        "ignore_missing": true
      }
    },
    {
      "enrich": {
        "policy_name": "mitre_technique_policy",
        "field": "attack_technique",
        "target_field": "attack_technique_description",
        "ignore_missing": true
      }
    }
  ]
}

Invoking the pipeline do you mean i filebeat.yml:
output.elasticsearch.indices:

  • index: "carbon_black_observations-%{+yyyy.MM.dd}"
    pipeline: "mitre_attack_pipeline"

Eleastic version is: 8.15.3

so when you add the set processor the field pipeline_name in your logs now? if not the pipeline is not getting called.

Do you have logstash in the middle for direct filebeat -> elasticsearch?

Can you share the rest of your filebeat.yml?

Perhaps try this .. .i think pipeline is under the wrong level I don't think it is getting called pipeline is at the top level of output.elasticsearch

output.elasticsearch.indices:
  index: "carbon_black_observations-%{+yyyy.MM.dd}"
output.elasticsearch.pipeline: "mitre_attack_pipeline"

FIlebeat -> Elastic.

This gave me error when I restarted filebeat.

output.elasticsearch.indices:
  - index: "carbon_black_observations-%{+yyyy.MM.dd}"
#    pipeline: "mitre_attack_pipeline"
    when.contains:
      event.dataset: "carbon_black.observations"
output.elasticsearch.pipeline: "mitre_attack_pipeline"

Exactly what error... can not help without details

please share the entire filebeat.yml and the entire error otherwise we are looking through a pin hole at an elephant :slight_smile:

Perhaps review the docs...

are you trying to set a pipeline per input or per index ?

sudo systemctl status filebeat
× filebeat.service - Filebeat sends log files to Logstash or directly to Elasticsearch.
Loaded: loaded (/usr/lib/systemd/system/filebeat.service; disabled; preset: enabled)
Active: failed (Result: exit-code) since Fri 2025-02-07 19:35:04 CET; 37s ago
Duration: 68ms
Docs: Filebeat: Lightweight Log Analysis & Elasticsearch | Elastic
Process: 1791384 ExecStart=/usr/share/filebeat/bin/filebeat --environment systemd $BEAT_LOG_OPTS $BEAT_CONFIG_OPTS $BEAT_PATH_OPTS (code=exited, status=1/FAILURE)
Main PID: 1791384 (code=exited, status=1/FAILURE)
CPU: 77ms

Feb 07 19:35:04 x.arc.local systemd[1]: filebeat.service: Scheduled restart job, restart counter is at 5.
Feb 07 19:35:04 x.arc.local systemd[1]: filebeat.service: Start request repeated too quickly.
Feb 07 19:35:04 x.arc.local systemd[1]: filebeat.service: Failed with result 'exit-code'.
Feb 07 19:35:04 x.arc.local systemd[1]: Failed to start filebeat.service - Filebeat sends log files to Logstash or directly to Elasticsearch..

cat /etc/filebeat/filebeat.yml
setup.template.name: "carbon_black"
setup.template.pattern: "carbon_black-*"
setup.template.enable: false
setup.ilm.enabled: false

filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /home/arc/cb.json
  json.keys_under_root: true
  json.overwrite_keys: true
  processors:
    - fingerprint:
        fields: ["event_id", "device_timestamp"]
        target_field: "@metadata._id"
    - add_fields:
        target: event
        fields:
          dataset: "carbon_black.observations"

- type: log
  enabled: true
  paths:
   - /home/arc/vuln.json
  json.keys_under_root: true
  json.overwrite_keys: true
  processors:
    - fingerprint:
        fields: ["created_at", "cve_id"]
        target_field: "@metadata._id"
    - add_fields:
        target: event
        fields:
          dataset: "carbon_black.vulnerabilities"

filebeat.config.modules:
  path: ${path.config}/modules.d/*.yml
  reload.enabled: false

output.elasticsearch:
  hosts: ["https://x:9200", "https://x:9200", "https://x:9200"]
  protocol: "https"
  ssl.verification_mode: "none"
  username: "elastic"
  password: "xxxx"


output.elasticsearch.indices:
  - index: "carbon_black_observations-%{+yyyy.MM.dd}"
#    pipeline: "mitre_attack_pipeline"
    when.contains:
      event.dataset: "carbon_black.observations"
output.elasticsearch.pipeline: "mitre_attack_pipeline"

  - index: "carbon_black_vulnerabilities-%{+yyyy.MM.dd}"
    when.contains:
      event.dataset: "carbon_black.vulnerabilities"

logging.level: info
logging.to_files: true
logging.files:
  path: /var/log/filebeat
  name: filebeat
  keepfiles: 7
  permissions: 0640
´´´

Mostly Likely you have .yml errors

run

/usr/share/filebeat/bin/filebeat -c /etc/filebeat.yml test config

logs... what shown is not enough

journalctl -u filebeat.service

Logs also may be /var/log/filebeat

Should look like

output.elasticsearch:
  hosts: ["https://x:9200", "https://x:9200", "https://x:9200"]
  protocol: "https"
  ssl.verification_mode: "none"
  username: "elastic"
  password: "xxxx"
  pipeline: "mitre_attack_pipeline"

take out
output.elasticsearch.pipeline: "mitre_attack_pipeline"

Yes that was the original config I had before you said I should try with:
output.elasticsearch.pipeline: "mitre_attack_pipeline"

output.elasticsearch:
hosts: ["https://x:9200", "https://x:9200", "https://x:9200"]
protocol: "https"
ssl.verification_mode: "none"
username: "elastic"
password: "xxxx"
pipeline: "mitre_attack_pipeline"

Also, if the filebeat service is not starting, you won't get the error on filebeat logs, you need to look in the system logs, /var/log/messages or /var/log/syslog.

I have this now and it has no errors when i start it.

output.elasticsearch:
hosts: ["https://x:9200", "https://x:9200", "https://x:9200"]
protocol: "https"
ssl.verification_mode: "none"
username: "elastic"
password: "xxx"

output.elasticsearch.indices:

  • index: "carbon_black_observations-%{+yyyy.MM.dd}"
    pipeline: "mitre_attack_pipeline"
    when.contains:
    event.dataset: "carbon_black.observations"

I have tested with add pipeline: "mitre_attack_pipeline" under output.elasticsearch without luck.

Put it back the Way I showed you.

And run the command I gave you to run

/usr/share/filebeat/bin/filebeat -c /etc/filebeat.yml test config

also run

/usr/share/filebeat/bin/filebeat -c /etc/filebeat.yml test output

Or just run the entire run command in the foreground

/usr/share/filebeat/bin/filebeat -e -c /etc/filebeat.yml

You will see the actual error in one of these

I have no error for start filebeat. Its the enirchment I have problems with :slightly_smiling_face:

$ sudo /usr/bin/filebeat -c /etc/filebeat/filebeat.yml test config
Config OK

sudo /usr/bin/filebeat -c /etc/filebeat/filebeat.yml test output
elasticsearch: https://x:9200...
parse url... OK
connection...
parse host... OK
dns lookup... OK
addresses: x
dial up... OK
TLS...
security... WARN server's certificate chain verification is disabled
handshake... OK
TLS version: TLSv1.3
dial up... OK
talk to server... OK
version: 8.15.3
elasticsearch: https://x:9200...
parse url... OK
connection...
parse host... OK
dns lookup... OK
addresses: x
dial up... OK
TLS...
security... WARN server's certificate chain verification is disabled
handshake... OK
TLS version: TLSv1.3
dial up... OK
talk to server... OK
version: 8.15.3
elasticsearch: https://x:9200...
parse url... OK
connection...
parse host... OK
dns lookup... OK
addresses: x
dial up... OK
TLS...
security... WARN server's certificate chain verification is disabled
handshake... OK
TLS version: TLSv1.3
dial up... OK
talk to server... OK
version: 8.15.3

Good now start in the foreground

Look for errors.

If no errors Go to Discover and look for documents

They should at the very least have the field you added with the set processorThey should at the very least have the fields you added with the set processor

Sounds like a nice plan, started it now. Lets wait for a new log entry to come. Thanks for helping really nice!

Keep and eye on the filebeat logs should be printing to stdout...