Logstash pipeline not finding generated object

#1

Hello! I am new to Logstash pipelining and I am trying my hand at some basic functionality before trying something more complex.

HERE IS MY PROBLEM: I can't seem to access the fields that appear in the log message output from the pipeline. I would like to do further parsing based on condition statements, such as by the kind of event_type. These values may appear, but I can't access the data members. Every conditional shown above evaluates to false, as indicated by the tags in the message sent.

DETAILS:
My current pipeline is as follows:

Host (eve.json) --> picked up by Filebeats* --> passed to Logstash** --> received by ES

Seems straight forward, right? Well, let's address the asterisks:

  • Filebeat -- Rather than use the Filebeat module "Suricata" to ingest from the Suricata generated eve.json, I elected to parse the JSON myself. This takes JSON content from the log field and parses it into fields: eve.<somefield>. This is done successfully and I can see the fields in Kibana.
    ** Logstash -- Created pipeline in two segments. Input and filtering. Logstash input block confirms it is from beats and over the expected port. Adds tags. That works fine. Filtering takes that and given the found tags, processes based on the eve.<somefield> content.

Therein lies the problem. It identifies the tags, but cannot identify the eve.<somefield> content. I am likely missing something silly, but I spent the better part of 8 hours tweaking, testing, and googling in my lab to no avail. I am convinced I have been looking at the problem too long.

Obligatory attached file time!

Sample log in JSON formatted log file:

/nsm/sensor_data/near-so-ens19/eve.json
{"timestamp":"2019-05-16T03:05:40.907668+0000","flow_id":1376862689583799,"in_iface":"ens19","event_type":"dns","src_ip":"192.168.1.40","src_port":53993,"de
st_ip":"192.168.1.1","dest_port":53,"proto":"UDP","dns":{"type":"query","id":60147,"rrname":"slack.com","rrtype":"A","tx_id":1}}

Appears post pipeline as:

{
  "_index": "near-so:logstash-beats-2019.05.16",
  "_type": "doc",
  "_id": "Cw2avmoB0un8RUUYRE3O",
  "_version": 1,
  "_score": null,
  "_source": {
    "@timestamp": "2019-05-16T03:05:41.239Z",
    "beat": {
      "hostname": "near-so",
      "name": "near-so",
      "version": "6.5.4"
    },
    "offset": 885250729,
    "tags": [
      "beat",
      "beats_input_codec_plain_applied",
      "kbl_candidate",
      "opt1fail",
      "no_target",
      "no_eve",
      "conf_file_9500"
    ],
    "prospector": {
      "type": "log"
    },
    "source": "/nsm/sensor_data/near-so-ens19/eve.json",
    "eve": {
      "src_ip": "192.168.1.40",
      "dest_ip": "192.168.1.1",
      "proto": "UDP",
      "timestamp": "2019-05-16T03:05:40.907668+0000",
      "dns": {
        "rrname": "slack.com",
        "id": 60147,
        "type": "query",
        "rrtype": "A",
        "tx_id": 1
      },
      "src_port": 53993,
      "flow_id": 1376862689583799,
      "event_type": "dns",
      "dest_port": 53,
      "in_iface": "ens19"
    },
    "beat_host": {
      "id": "220367b45a2e2544428637d15b635477",
      "architecture": "x86_64",
      "containerized": false,
      "name": "near-so",
      "os": {
        "codename": "xenial",
        "family": "debian",
        "platform": "ubuntu",
        "version": "16.04.5 LTS (Xenial Xerus)"
      }
    },
    "input": {
      "type": "log"
    },
    "logstash_time": 0.0012030601501464844,
    "@version": "1"
  },
  "fields": {
    "@timestamp": [
      "2019-05-16T03:05:41.239Z"
    ]
  },
  "sort": [
    1557975941239
  ]
}

The pipeline affecting the log progresses through two steps: 1) ingestion 2) filtering.

Ingestion should look familiar! I haven't updated it:

# Author: Justin Henderson
[...]
input {
  beats {
    port => "5044"
    tags => [ "beat" ]
  }
}

The reader will notice that the beat tag was successfully added to the log as it appeared in Logstash.

Filtering is where the problem is and, big surprise, it's mine.

filter {
  if "beat" in [tags] {
    if "eve.json" in [source] {
      json {
        source => "message"
        target => "eve"
        remove_field => [ "message" ]
        add_tag => ["kbl_candidate"]
      }
      if "dns" in [target.event_type] {
        mutate { add_tag => ["opt1success"] }
      } else { mutate { add_tag => ["opt1fail"] } }
      if [target.event_type] { mutate { add_tag => ["target.event_type"] }
      } else { mutate { add_tag => ["no_target"] } }
      if [eve.event_type] { mutate { add_tag => ["eve.event_type"] }
      } else { mutate { add_tag => ["no_eve"] } }
    }
  }
}

My plan is to, if the file is a beat and comes from eve.json, use the json filter to format the message, save it as a root field eve, and toss a tag on the log. This part worked fine. The tag appears in the log and all fields appear, though the eve. values aren't indexed... I will come back to that.

PROBLEM STATEMENT: I can't seem to access the fields that appear in the log message output from the pipeline. I would like to do further parsing based on condition statements, such as by the kind of event_type. These values may appear, but I can't access the data members. Every conditional shown above evaluates to false, as indicated by the tags in the message sent.

Any ideas? What simple thing am I missing here?

#2

In logstash [eve.event_type] refers to a field with a period in its name. If event_type is a field inside the eve object this has to be [eve][event_type]

#3

That was all it took. I seem to remember having a different parsing syntax error a long time ago and changing away from that syntax as I was testing... and never changing it back.

The sample file that acted as expected is as follows:

filter {
  if "beat" in [tags] {
    if "eve.json" in [source] {
      json {
        source => "message"
        target => "eve"
        remove_field => [ "message" ]
        add_tag => ["kbl_candidate"]
      }
      if "dns" in [eve][event_type] {
        mutate { add_tag => ["opt1success"] }
      } else { mutate { add_tag => ["opt1fail"] } }
      if [eve][event_type] { mutate { add_tag => ["target_event_type"] }
      } else { mutate { add_tag => ["no_target"] } }
      if [eve][event_type] { mutate { add_tag => ["eve_event_type"] }
      } else { mutate { add_tag => ["no_eve"] } }
    }
  }
}

output { file {
  path => "/tmp/debug.txt"
  codec => rubydebug
} }

Thank you for your help!