Ingesting PCAP dataset into ELK via Zeek and Zeek scripting

Hello, I am having trouble with filebeat and logstash configurations I think.

What I am trying to do is send a DDoS pcap dataset (saved on my ubuntu machine) to ELK, using Zeek while applying Zeek scripts to it.

I have followed the configurations set in this link: Building your first SIEM with the Elastic Stack | cronocide.com
because I liked the elastalert configurations which I also want.

I tried packetbeats, which worked, but I need zeek to be involved in the process for scripting reasons.

The problem I'm facing: no index is being created on ELK with today's date. I would like to run zeek -r pcap and have it ingest. I may be using the incorrect commands, or missing few lines in a yml file.

I am going to attach my configurations here, so see what feedback I get from the community.

/etc/filebeat/filebeat.yml

filebeat.inputs:
filebeat.config.modules:
  path: /etc/filebeat/modules.d/*.yml
  reload.enabled: true
setup.template.settings:
  index.number_of_shards: 1
setup.kibana:
  host: "localhost:5601"
output.logstash:
  hosts: ["localhost:5044"]
processors:
  - add_host_metadata: ~
  - add_cloud_metadata: ~
logging.files:
  path: /var/log/filebeat
  name: filebeat
  keepfiles: 7
  permissions: 0644

/etc/logstash/logstash.yml

# Settings file in YAML
#
# Settings can be specified either in hierarchical form, e.g.:
#
#   pipeline:
#     batch:
#       size: 125
#       delay: 5
#
# Or as flat keys:
#
#   pipeline.batch.size: 125
#   pipeline.batch.delay: 5
#
# ------------  Node identity ------------
#
# Use a descriptive name for the node:
#
# node.name: test
#
# If omitted the node name will default to the machine's host name
#
# ------------ Data path ------------------
#
# Which directory should be used by logstash and its plugins
# for any persistent needs. Defaults to LOGSTASH_HOME/data
#
path.data: /var/lib/logstash
#
# ------------ Pipeline Settings --------------
#
# The ID of the pipeline.
#
# pipeline.id: main
#
# Set the number of workers that will, in parallel, execute the filters+outputs
# stage of the pipeline.
#
# This defaults to the number of the host's CPU cores.
#
# pipeline.workers: 2
#
# How many events to retrieve from inputs before sending to filters+workers
#
# pipeline.batch.size: 125
#
# How long to wait in milliseconds while polling for the next event
# before dispatching an undersized batch to filters+outputs
#
# pipeline.batch.delay: 50
#
# Force Logstash to exit during shutdown even if there are still inflight
# events in memory. By default, logstash will refuse to quit until all
# received events have been pushed to the outputs.
#
# WARNING: enabling this can lead to data loss during shutdown
#
# pipeline.unsafe_shutdown: false
#
# Set the pipeline event ordering. Options are "auto" (the default), "true" or "false".
# "auto" will  automatically enable ordering if the 'pipeline.workers' setting
# is also set to '1'.
# "true" will enforce ordering on the pipeline and prevent logstash from starting
# if there are multiple workers.
# "false" will disable any extra processing necessary for preserving ordering.
#
pipeline.ordered: auto
#
# ------------ Pipeline Configuration Settings --------------
#
# Where to fetch the pipeline configuration for the main pipeline
#
# path.config:
#
# Pipeline configuration string for the main pipeline
#
# config.string:
#
# At startup, test if the configuration is valid and exit (dry run)
#
# config.test_and_exit: false
#
# Periodically check if the configuration has changed and reload the pipeline
# This can also be triggered manually through the SIGHUP signal
#
 config.reload.automatic: true
#
# How often to check if the pipeline configuration has changed (in seconds)
# Note that the unit value (s) is required. Values without a qualifier (e.g. 60) 
# are treated as nanoseconds.
# Setting the interval this way is not recommended and might change in later versions.
#
# config.reload.interval: 3s
#
# Show fully compiled configuration as debug log message
# NOTE: --log.level must be 'debug'
#
# config.debug: false
#
# When enabled, process escaped characters such as \n and \" in strings in the
# pipeline configuration files.
#
# config.support_escapes: false
#
# ------------ HTTP API Settings -------------
# Define settings related to the HTTP API here.
#
# The HTTP API is enabled by default. It can be disabled, but features that rely
# on it will not work as intended.
# http.enabled: true
#
# By default, the HTTP API is bound to only the host's local loopback interface,
# ensuring that it is not accessible to the rest of the network. Because the API
# includes neither authentication nor authorization and has not been hardened or
# tested for use as a publicly-reachable API, binding to publicly accessible IPs
# should be avoided where possible.
#
# http.host: 127.0.0.1
#
# The HTTP API web server will listen on an available port from the given range.
# Values can be specified as a single port (e.g., `9600`), or an inclusive range
# of ports (e.g., `9600-9700`).
#
# http.port: 9600-9700
#
# ------------ Module Settings ---------------
# Define modules here.  Modules definitions must be defined as an array.
# The simple way to see this is to prepend each `name` with a `-`, and keep
# all associated variables under the `name` they are associated with, and
# above the next, like this:
#
# modules:
#   - name: MODULE_NAME
#     var.PLUGINTYPE1.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE1.PLUGINNAME1.KEY2: VALUE
#     var.PLUGINTYPE2.PLUGINNAME1.KEY1: VALUE
#     var.PLUGINTYPE3.PLUGINNAME3.KEY1: VALUE
#
# Module variable names must be in the format of
#
# var.PLUGIN_TYPE.PLUGIN_NAME.KEY
#
# modules:
#
# ------------ Cloud Settings ---------------
# Define Elastic Cloud settings here.
# Format of cloud.id is a base64 value e.g. dXMtZWFzdC0xLmF3cy5mb3VuZC5pbyRub3RhcmVhbCRpZGVudGlmaWVy
# and it may have an label prefix e.g. staging:dXMtZ...
# This will overwrite 'var.elasticsearch.hosts' and 'var.kibana.host'
# cloud.id: <identifier>
#
# Format of cloud.auth is: <user>:<pass>
# This is optional
# If supplied this will overwrite 'var.elasticsearch.username' and 'var.elasticsearch.password'
# If supplied this will overwrite 'var.kibana.username' and 'var.kibana.password'
# cloud.auth: elastic:<password>
#
# ------------ Queuing Settings --------------
#
# Internal queuing model, "memory" for legacy in-memory based queuing and
# "persisted" for disk-based acked queueing. Defaults is memory
#
# queue.type: memory
#
# If using queue.type: persisted, the directory path where the data files will be stored.
# Default is path.data/queue
#
# path.queue:
#
# If using queue.type: persisted, the page data files size. The queue data consists of
# append-only data files separated into pages. Default is 64mb
#
# queue.page_capacity: 64mb
#
# If using queue.type: persisted, the maximum number of unread events in the queue.
# Default is 0 (unlimited)
#
# queue.max_events: 0
#
# If using queue.type: persisted, the total capacity of the queue in number of bytes.
# If you would like more unacked events to be buffered in Logstash, you can increase the
# capacity using this setting. Please make sure your disk drive has capacity greater than
# the size specified here. If both max_bytes and max_events are specified, Logstash will pick
# whichever criteria is reached first
# Default is 1024mb or 1gb
#
# queue.max_bytes: 1024mb
#
# If using queue.type: persisted, the maximum number of acked events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.acks: 1024
#
# If using queue.type: persisted, the maximum number of written events before forcing a checkpoint
# Default is 1024, 0 for unlimited
#
# queue.checkpoint.writes: 1024
#
# If using queue.type: persisted, the interval in milliseconds when a checkpoint is forced on the head page
# Default is 1000, 0 for no periodic checkpoint.
#
# queue.checkpoint.interval: 1000
#
# ------------ Dead-Letter Queue Settings --------------
# Flag to turn on dead-letter queue.
#
# dead_letter_queue.enable: false

# If using dead_letter_queue.enable: true, the maximum size of each dead letter queue. Entries
# will be dropped if they would increase the size of the dead letter queue beyond this setting.
# Default is 1024mb
# dead_letter_queue.max_bytes: 1024mb

# If using dead_letter_queue.enable: true, the interval in milliseconds where if no further events eligible for the DLQ
# have been created, a dead letter queue file will be written. A low value here will mean that more, smaller, queue files
# may be written, while a larger value will introduce more latency between items being "written" to the dead letter queue, and
# being available to be read by the dead_letter_queue input when items are are written infrequently.
# Default is 5000.
#
# dead_letter_queue.flush_interval: 5000

# If using dead_letter_queue.enable: true, the directory path where the data files will be stored.
# Default is path.data/dead_letter_queue
#
# path.dead_letter_queue:
#
# ------------ Metrics Settings --------------
#
# Bind address for the metrics REST endpoint
#
# http.host: "127.0.0.1"
#
# Bind port for the metrics REST endpoint, this option also accept a range
# (9600-9700) and logstash will pick up the first available ports.
#
# http.port: 9600-9700
#
# ------------ Debugging Settings --------------
#
# Options for log.level:
#   * fatal
#   * error
#   * warn
#   * info (default)
#   * debug
#   * trace
#
# log.level: info
path.logs: /var/log/logstash
#
# ------------ Other Settings --------------
#
# Where to find custom plugins
# path.plugins: []
#
# Flag to output log lines of each pipeline in its separate log file. Each log filename contains the pipeline.name
# Default is false
# pipeline.separate_logs: false
#
# ------------ X-Pack Settings (not applicable for OSS build)--------------
#
# X-Pack Monitoring
# https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html
#xpack.monitoring.enabled: false
#xpack.monitoring.elasticsearch.username: logstash_system
#xpack.monitoring.elasticsearch.password: password
#xpack.monitoring.elasticsearch.proxy: ["http://proxy:port"]
#xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx
#xpack.monitoring.elasticsearch.cloud_auth: logstash_system:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.monitoring.elasticsearch.api_key: "id:api_key"
#xpack.monitoring.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ]
#xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file
#xpack.monitoring.elasticsearch.ssl.truststore.password: password
#xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.monitoring.elasticsearch.ssl.keystore.password: password
#xpack.monitoring.elasticsearch.ssl.verification_mode: certificate
#xpack.monitoring.elasticsearch.sniffing: false
#xpack.monitoring.collection.interval: 10s
#xpack.monitoring.collection.pipeline.details.enabled: true
#
# X-Pack Management
# https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
#xpack.management.enabled: false
#xpack.management.pipeline.id: ["main", "apache_logs"]
#xpack.management.elasticsearch.username: logstash_admin_user
#xpack.management.elasticsearch.password: password
#xpack.management.elasticsearch.proxy: ["http://proxy:port"]
#xpack.management.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.management.elasticsearch.cloud_id: management_cluster_id:xxxxxxxxxx
#xpack.management.elasticsearch.cloud_auth: logstash_admin_user:password
# another authentication alternative is to use an Elasticsearch API key
#xpack.management.elasticsearch.api_key: "id:api_key"
#xpack.management.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ]
#xpack.management.elasticsearch.ssl.truststore.path: /path/to/file
#xpack.management.elasticsearch.ssl.truststore.password: password
#xpack.management.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.management.elasticsearch.ssl.keystore.password: password
#xpack.management.elasticsearch.ssl.verification_mode: certificate
#xpack.management.elasticsearch.sniffing: false
#xpack.management.logstash.poll_interval: 5s

/etc/elasticsearchelasticsearch.yml

# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
#       Before you set out to tweak and tune the configuration, make sure you
#       understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# https://www.elastic.co/guide/en/elasticsearch/reference/index.html
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
#cluster.name: my-application
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
#node.name: node-1
#
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
path.data: /var/lib/elasticsearch
#
# Path to log files:
#
path.logs: /var/log/elasticsearch
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# By default Elasticsearch is only accessible on localhost. Set a different
# address here to expose this node on the network:
#
network.host: localhost
#
# By default Elasticsearch listens for HTTP traffic on the first free port it
# finds starting at 9200. Set a specific HTTP port here:
#
http.port: 9200
#
# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
#discovery.seed_hosts: ["host1", "host2"]
#
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
#cluster.initial_master_nodes: ["node-1", "node-2"]
#
# For more information, consult the discovery and cluster formation module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true

xpack.security.enabled: false

/etc/filebeat/modules.d/zeek.yml

<pre><font color="#06989A"> Module: zeek</font>
<font color="#06989A"># Docs: https://www.elastic.co/guide/en/beats/filebeat/7.x/filebeat-module-zeek.html</font>

- module: zeek
  capture_loss:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/capture_loss.log&quot;]
  connection:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/conn.log&quot;]
  dce_rpc:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/dce_rpc.log&quot;]
  dhcp:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/dhcp.log&quot;]
  dnp3:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/dnp3.log&quot;]
  dns:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/dns.log&quot;]
  dpd:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/dpd.log&quot;]
  files:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/files.log&quot;]
  ftp:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/ftp.log&quot;]
  http:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/http.log&quot;]
  intel:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/intel.log&quot;]
  irc:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/irc.log&quot;]
  kerberos:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/kerberos.log&quot;]
  modbus:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/modbus.log&quot;]
  mysql:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/mysql.log&quot;]
  notice:
    enabled: true
    var.paths: [&quot;/opt/zeek/logs/current/notice.log&quot;]
</pre>
ntlm:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/ntlm.log"]
  ntp:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/ntp.log"]
  ocsp:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/oscp.log"]
  pe:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/pe.log"]
  radius:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/radius.log"]
  rdp:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/rdp.log"]
  rfb:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/rfb.log"]
  signature:
    enabled: false
    var.paths: ["/opt/zeek/logs/current/signature.log"]
  sip:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/sip.log"]
  smb_cmd:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/smb_cmd.log"]
  smb_files:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/smb_files.log"]
  smb_mapping:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/smb_mapping.log"]
  smtp:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/smtp.log"]
  snmp:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/snmp.log"]
  socks:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/socks.log"]
ssh:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/ssh.log"]
  ssl:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/ssl.log"]
  stats:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/stats.log"]
  syslog:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/syslog.log"]
  traceroute:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/traceroute.log"]
  tunnel:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/tunnel.log"]
  weird:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/weird.log"]
  x509:
    enabled: true
    var.paths: ["/opt/zeek/logs/current/x509.log"]

    # Set custom paths for the log files. If left empty,
    # Filebeat will choose the paths depending on your OS.
    #var.paths:

First your zeek module config shouldn't be url encoded so if it's actually like that, it needs to be fixed. Second, can u please post ur logstash pipeline(s) so we can see the inputs, filters, outputs?

Alex, thanks for the reply.

How is this filebeat module config?

```
- module: zeek
  # All logs
  connection:
    enabled: true
    var.paths:
    - /opt/zeek/logs/current/conn.log
  dns:
    enabled: true
    var.paths:
    - /opt/zeek/logs/current/dns.log
  http:
    enabled: true
    var.paths:
    - /opt/zeek/logs/current/http.log
  files:
    enabled: true
    var.paths:
    - /opt/zeek/logs/current/files.log
  ssl:
    enabled: true
    var.paths:
    - /opt/zeek/logs/current/ssl.log
  notice:
    enabled: true
    var.paths:
    - /opt/zeek/logs/current/capture_loss.log
    - /opt/zeek/logs/current/known_services.log
    - /opt/zeek/logs/current/loaded_scripts.log
    - /opt/zeek/logs/current/packet_filter.log
    - /opt/zeek/logs/current/reporter.log
    - /opt/zeek/logs/current/stats.log
    - /opt/zeek/logs/current/stderr.log
    - /opt/zeek/logs/current/stdout.log
    - /opt/zeek/logs/current/weird.log

/etc/logstash/pipelines.yml

  GNU nano 4.8              /etc/logstash/pipelines.yml                         
- pipeline.id: zeek_and_host_logs
  path.config: "/etc/logstash/conf.d/zeek_and_host.conf"

/etc/logstash/conf.d/zeek_and_host.conf

nput {
  beats {
    host => "localhost"
    port => 5044
  }
}

filter {
  # Lookup source and destination addresses against our list of Tor nodes
  if ("" in [source][address]) {
    translate {
      dictionary_path => "/etc/logstash/tables/tor_nodes.json"
      field => "[source][address]"
      destination => "[@metadata][torsrc]"
    }
    mutate {convert => { "[@metadata][torsrc]" => "integer" }}
  }
  if ("" in [destination][address]) {
    translate {
      dictionary_path => "/etc/logstash/tables/tor_nodes.json"
      field => "[destination][address]"
      destination => "[@metadata][tordst]"
    }
    mutate {convert => { "[@metadata][tordst]" => "integer" }}
  }
  if [@metadata][torsrc] == [source][port] {
    mutate {add_field => { "[source][tor]" => "true"}}
    mutate {convert => { "[source][tor]" => "boolean" }}
  }
  if [@metadata][tordst] == [destination][port] {
    mutate {add_field => { "[destination][tor]" => "true"}}
    mutate {convert => { "[destination][tor]" => "boolean"}}
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    ssl => false
    ssl_certificate_verification => false
    sniffing => true
    manage_template => true
    index => "%{[event][module]}-%{+yyyy.MM.dd}"
    setup.template.name: "filebeat"
    setup.template.pattern: "filebeat-*"
    setup.ilm.enabled: false
  }
}

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.