Elasticsearch ingest pipeline - not extract data passed from filebeat

Hi,

I'm slowly teaching myself the Elastic stack. Current project is attempting to ingest and modelling alerts from snort3 against the elastic common schema. I've run into an issue where an ingest pipeline is not correctly extracting fields out of a json file.

Approach being taken is: filebeat (reading alerts_json.txt file) -> elasticsearch (index template and ingestion pipeline defined).

A snippet from my index template is below:

PUT /_index_template/snort3_template 
{
  "index_patterns": [
    "snort3-*"
  ],
  "template": {
    "mappings": {
      "properties": {
        "destination": {
          "properties": {
            "address": {
              "type": "keyword"
            },
            "ip": {
              "type": "ip"
            },
            "mac": {
              "type": "keyword"
            }
          }
        }
}

A snippet from my ingestion pipeline is below:

PUT /_ingest/pipeline/snort-json-pipeline
{
  "description": "Pipeline for ingesting JSON snort3 data",
  "processors": [
    {
      "convert": {
        "field": "pkt_num",
        "type": "integer",
        "ignore_missing": true
      }
    },
    {
      "convert": {
        "field": "pkt_len",
        "type": "integer",
        "ignore_missing": true
      }
    },
    {
      "convert": {
        "field": "src_port",
        "type": "integer",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "src_port",
        "target_field": "source.port",
        "ignore_missing": true
      }
    },
    {
      "convert": {
        "field": "dst_port",
        "type": "integer",
        "ignore_missing": true
      }
    },
    {
      "convert": {
        "field": "priority",
        "type": "integer",
        "ignore_missing": true
      }
    },
    {
      "rename": {
        "field": "src_addr",
        "target_field": "source.address",
        "if": "ctx._source?.src_addr != '' && ctx._source?.src_addr !=null"
      }
    },

If I simulate the ingestion pipeline I get the expected result:

POST /_ingest/pipeline/snort-json-pipeline/_simulate
{
  "docs": [{"_index":"index","_id":"id","_source":{ "seconds" : 1626683982, "action" : "allow", "class" : "none", "b64_data" : "AAAAAAAAAAAAAAAAAAAAAAAA", "dir" : "UNK", "dst_ap" : ":0", "eth_dst" : "F0:6E:0B:0F:7A:E4", "eth_len" : 60, "eth_src" : "F0:9F:C2:C7:69:1C", "eth_type" : "0x806", "gid" : 112, "iface" : "ens161", "msg" : "(arp_spoof) unicast ARP request", "pkt_gen" : "raw", "pkt_len" : 18, "pkt_num" : 694811210, "priority" : 3, "proto" : "ARP", "rev" : 1, "rule" : "112:1:1", "service" : "unknown", "sid" : 1, "src_ap" : ":0", "vlan" : 0, "timestamp" : "07/19-18:39:42.586277" }}]
}

If I ingest the data via filebeat it does not extract any of the fields from the json message. It will add all of the fields I add via append in the ingest pipeline.

Am stuck at the minute - trying to work out why the field extraction is not working correctly.

cheers,

Michael

Hey,

have you specified the pipeline in your filebeat configuration? As it is not specified as a default pipeline in the template, you need to specify somewhere that you would like to make use of that pipeline.

--Alex

You need to set output.elasticsearch.pipeline: snort-json-pipeline in Filebeat. Can you post your config?

Hi Alex,

The pipeline is being applied correctly. Below is what I have at the bottom of my index template.

    "settings": {
      "default_pipeline": "snort-json-pipeline"
    }

The approach I'm trying is filebeat sends the logs to an index which is matched by the index template. Elasticsearch then creates a index using the template and dynamically assigns the additional fields not mentioned in the template. As part of setting up the template the default ingestion pipeline is applied to the index.

I am adding a few fields to the indexed document via append statements inside the ingestion pipeline. This is working correctly so I know the ingestion pipeline is correctly applied to the index.

Current theory is filebeat is sending the data with extra fields. Due to this the processors can't find the right fields in the data I'm ingesting.

What is the best way of seeing an event as sent by filebeat prior to processing via the ingestion pipline? For example - if the passed value stores its data in nested objects how can I access those from the ingestion pipeline statement? I have tried things like message.pkt_num and that hasn't worked.

    {
      "convert": {
        "field": "pkt_num",
        "type": "integer",
        "ignore_missing": true
      }
    },

You can output to a file or stdout from filebeat or run in debugging more and it will output the events sent to elasticsearch in the log. Also do I understand, the data is making it to the ingest pipeline but it's just not processing properly? If so what processors specifically aren't working?

Can you share the index settings of that concrete index? The index template is only applied on index creation, so changes on the template are not reflected in the indices.

What index? I'm asking about the ingest pipeline. What specifically isn't working with the pipeline?

I'm asking about the proper configuration of the indexing pipeline in the index settings, as you do not specify the pipeline for each indexing/bulk operation in the beat. Can you share the index settings of the index you are indexing into?

@spinscale Whoops, sorry. I wasn't paying attention, thought the question came from the OP.

Hi,

Thank you for the help.

Settings of the current index are below. This is correctly picking up the ingestion template and attempting to apply it.

    "settings" : {
      "index" : {
        "routing" : {
          "allocation" : {
            "include" : {
              "_tier_preference" : "data_content"
            }
          }
        },
        "number_of_shards" : "1",
        "provided_name" : "snort3-2021.07.20",
        "default_pipeline" : "snort-json-pipeline",
        "creation_date" : "1626703207180",
        "number_of_replicas" : "1",
        "uuid" : "Wsw3OcyAQROOX6VqZQCeMA",
        "version" : {
          "created" : "7130399"
        }
      }
    }

From experimenting further I feel I'm close to the solution.

When filebeat passes the message to elasticsearch it includes a number of other nested data sets as part of _source. Details of the original message are passed in the message nested object.

{
  },
  "ecs": {
    "version": "1.8.0"
  },
  "input": {
    "type": "log"
  },
  "log": {
    "file": {
      "path": "/var/log/snort/alert_json.txt"
    },
    "offset": 93914389
  },
  "message": {
  "seconds": 1626589993,
  "action": "allow",
  "class": "none",
  "b64_data": "AAAAAAAAAAAAAAAAAAAAAAAA",
  "dir": "UNK",
  "dst_ap": ":0",
  "eth_dst": "F0:9F:C2:C7:69:1C",
  "eth_len": 60,
  "eth_src": "BA:30:D5:05:8D:1C",
  "eth_type": "0x806",
  "gid": 112,
  "iface": "ens161",
  "msg": "(arp_spoof) unicast ARP request",
  "pkt_gen": "raw",
  "pkt_len": 18,
  "pkt_num": 615506767,
  "priority": 3,
  "proto": "ARP",
  "rev": 1,
  "rule": "112:1:1",
  "service": "unknown",
  "sid": 1,
  "src_ap": ":0",
  "vlan": 0,
  "timestamp": "07/18-16:33:13.829379"
}
}

The fields the ingestion pipeline target are not in the main _source section. Due to that the ingestion pipeline is not finding them. How can I access the data stored in a nested json object under _source? I have tried the message.field notation but haven't had any luck so far.

cheers,

Michael

Hi,

I have continued to work at this.

I can extract a whole event and run it through a simulated pipeline okay.

The current pipeline I'm working with is below. This is using the message. notation and extracting data out of the ingested data okay.

PUT /_ingest/pipeline/snort-json-pipeline
{
  "description": "Pipeline for ingesting JSON snort3 data",
  "processors": [
    { 
      "set": {
        "field" : "pkt_num",
        "copy_from": "message.pkt_num",
        "tag": "set pkt_num"
      }
    },
    {
      "set": {
        "field": "pkt_len",
        "copy_from": "message.pkt_len"
      }
    },
    {
      "set" : {
        "field": "source.port",
        "copy_from": "message.src_port",
        "ignore_empty_value": true
      }
    },
    {
      "set" : {
        "field": "destination.port",
        "copy_from": "message.dst_port",
        "ignore_empty_value": true
      }
    },
    {
      "set": {
        "field": "source.address", 
        "copy_from": "message.src_addr",
        "ignore_empty_value": true
      }
    },
    {
      "set": {
        "field": "source.ip",
        "copy_from": "message.src_addr",
        "ignore_empty_value": true
      }
    },
    {
      "set": {
        "field": "source.mac",
        "copy_from": "message.eth_src"
      }
    },
    {
      "set": {
        "field": "destination.address", 
        "copy_from": "message.dst_addr",
        "ignore_empty_value": true
      }
    },
    {
      "set": {
        "field": "destination.ip",
        "copy_from": "message.dst_addr",
        "ignore_empty_value": true
      }
    },
    {
      "set": {
        "field": "destination.mac",
        "copy_from": "message.eth_dst"
      }
    },
    {
      "set": {
        "field": "network.protocol",
        "copy_from": "message.service"
      }
    },
    {
      "set": {
        "field": "network.transport",
        "copy_from": "message.proto"
      }
    },
    {
      "lowercase": {
        "field": "network.transport"
      }
    },
    {
      "date": {
        "field": "message.seconds",
        "formats": [
          "UNIX"
        ],
        "ignore_failure": true
      }
    },
    {
      "set": {
        "field": "event.severity",
        "copy_from": "message.priority",
        "ignore_empty_value": true
      }
    },
    {
      "append": {
        "field": "event.category",
        "value": [
          "intrusion_detection"
        ]
      }
    },
    {
      "append": {
        "field": "event.kind",
        "value": [
          "alert"
        ]
      }
    },
    {
      "append": {
        "field": "event.module",
        "value": [
          "snort3"
        ]
      }
    },
    {
      "community_id": {
        "seed": "15",
        "ignore_missing": true
      }
    },
    {
      "network_direction": {
        "internal_networks": ["private"]
      }
    }
  ],
  "version": 1
}

When I try to point data at the pipeline using filebeat I'm getting the below error. I'm understanding this to mean the ingestion pipeline is not finding the path in the message it is being sent.

If data is being passed from filebeat under the message: {} object how do I access that in a filebeat processor?

2021-07-20T20:38:35.823+1000    WARN    [elasticsearch] elasticsearch/client.go:408     Cannot index event publisher.Event{Content:beat.Event{Timestamp:time.Time{wall:0xc035c7ca3ac8a866, ext:549832714, loc:(*time.Location)(0x55dde5819dc0)}, Meta:{"raw_index":"snort3-2021.07.20"}, Fields:{"agent":{"ephemeral_id":"301f064e-3102-461f-9541-6c78f85c155c","hostname":"audite","id":"91fdd338-2c8f-40ff-a54d-c37ad962898d","name":"audite","type":"filebeat","version":"7.13.3"},"ecs":{"version":"1.8.0"},"host":{"architecture":"x86_64","containerized":false,"hostname":"audite","id":"3ce8587ede524b83abbc7d20f13212c2","ip":["192.168.10.5","2403:5800:7700:3204:20c:29ff:fe53:ecfe","fe80::20c:29ff:fe53:ecfe"],"mac":["00:0c:29:53:ec:fe","00:0c:29:53:ec:1c","00:0c:29:53:ec:08","00:0c:29:53:ec:12"],"name":"audite","os":{"codename":"focal","family":"debian","kernel":"5.4.0-77-generic","name":"Ubuntu","platform":"ubuntu","type":"linux","version":"20.04.2 LTS (Focal Fossa)"}},"input":{"type":"log"},"log":{"file":{"path":"/var/log/snort/alert_json.txt"},"offset":94499421},"message":"{ \"seconds\" : 1626777398, \"action\" : \"allow\", \"class\" : \"none\", \"b64_data\" : \"AAAAAAAAAAAAAAAAAAAAAAAA\", \"dir\" : \"UNK\", \"dst_ap\" : \":0\", \"eth_dst\" : \"4C:72:B9:41:D5:62\", \"eth_len\" : 60, \"eth_src\" : \"F0:9F:C2:C7:69:1C\", \"eth_type\" : \"0x806\", \"gid\" : 112, \"iface\" : \"ens161\", \"msg\" : \"(arp_spoof) unicast ARP request\", \"pkt_gen\" : \"raw\", \"pkt_len\" : 18, \"pkt_num\" : 790495972, \"priority\" : 3, \"proto\" : \"ARP\", \"rev\" : 1, \"rule\" : \"112:1:1\", \"service\" : \"unknown\", \"sid\" : 1, \"src_ap\" : \":0\", \"vlan\" : 0, \"timestamp\" : \"07/20-20:36:38.165235\" }"}, Private:file.State{Id:"native::1992103-64768", PrevId:"", Finished:false, Fileinfo:(*os.fileStat)(0xc0001c2680), Source:"/var/log/snort/alert_json.txt", Offset:94499963, Timestamp:time.Time{wall:0xc035c7ca3ac2ec46, ext:549456907, loc:(*time.Location)(0x55dde5819dc0)}, TTL:-1, Type:"log", Meta:map[string]string(nil), FileStateOS:file.StateOS{Inode:0x1e65a7, Device:0xfd00}, IdentifierName:"native"}, TimeSeries:false}, Flags:0x1, Cache:publisher.EventCache{m:common.MapStr(nil)}} (status=400): {"type":"illegal_argument_exception","reason":"cannot resolve [pkt_num] from object of type [java.lang.String] as part of path [message.pkt_num]"}

It appears that message is being read as a JSON string and needs to be decoded to be referenced as an object, JSON processor | Elasticsearch Guide [7.13] | Elastic

what Alex above wrote is a valid point. On top of that, I think all your objects are within the message field, so maybe your processors need to specify the correct field like message.pkt_num?

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.