Filebeat sophos module error: Provided Grok expressions do not match field value

Hello Everyone,

I am trying to setup the Filebeat Sophos Module for a Sophos-XG Firewall. Filebeat Output is send directly to Elasticsearch.

I enabled the sophos module and the initialized it sucessfully with

filebeat setup -e -d --pipelines -modules="sophos"

my module config is as follows:

- module: sophos
  xg:
    enabled: true

    # Set which input to use between tcp, udp (default) or file.
    var.input: tcp

    # The interface to listen to syslog traffic. Defaults to
    # localhost. Set to 0.0.0.0 to bind to all available interfaces.
    var.syslog_host: 0.0.0.0

    # The port to listen for syslog traffic. Defaults to 9004.
    var.syslog_port: 9004

    # firewall default hostname
    var.default_host_name: firewall.localgroup.local

    # known firewalls
    var.known_devices:
      - serial_number: "FW Serial"
        hostname: "FW Hostname"
      #- serial_number: "1234234590678557"
      #  hostname: "b.host.local"


  utm:
    enabled: false

And I receive Logs like this:

{
  "_index": "filebeat-7.17.3-2022.05.12-000001",
  "_type": "_doc",
  "_id": "XYM9vYAB9OV1mX8rU7yo",
  "_version": 1,
  "_score": 1,
  "_source": {
    "agent": {
      "hostname": "Elastic Stack Hostname",
      "name": "Elastic Stack Hostname",
      "id": "059ca9b8-745a-43b4-9ea1-d4bba45cb164",
      "ephemeral_id": "d3fe32d3-77c6-4676-ad74-61afbcd2928c",
      "type": "filebeat",
      "version": "7.17.3"
    },
    "log": {
      "source": {
        "address": "FW private IP:18876"
      }
    },
    "_conf": {
      "mappings": [
        {
          "hostname": "FW Hostname",
          "serial": "FW Serial"
        }
      ],
      "default": "firewall.localgroup.local"
    },
    "fileset": {
      "name": "xg"
    },
    "message": "\u0000\u0010\u0000\u000e\u0000\u0017\u0000\u0019\u0000\u001c\u0000\u001b\u0000\u0018\u0000\u001a\u0000\u0016\u0000#\u0000\u0000\u0000\r\u0000 \u0000\u001e\u0006\u0001\u0006\u0002\u0006\u0003\u0005\u0001\u0005\u0002\u0005\u0003\u0004\u0001\u0004\u0002\u0004\u0003\u0003\u0001\u0003\u0002\u0003\u0003\u0002\u0001\u0002\u0002\u0002\u0003",
    "error": {
      "message": "Provided Grok expressions do not match field value: [\\u0000\\u0010\\u0000\\u000E\\u0000\\u0017\\u0000\\u0019\\u0000\\u001C\\u0000\\u001B\\u0000\\u0018\\u0000\\u001A\\u0000\\u0016\\u0000#\\u0000\\u0000\\u0000\\r\\u0000 \\u0000\\u001E\\u0006\\u0001\\u0006\\u0002\\u0006\\u0003\\u0005\\u0001\\u0005\\u0002\\u0005\\u0003\\u0004\\u0001\\u0004\\u0002\\u0004\\u0003\\u0003\\u0001\\u0003\\u0002\\u0003\\u0003\\u0002\\u0001\\u0002\\u0002\\u0002\\u0003]"
    },
    "tags": [
      "sophos-xg",
      "forwarded"
    ],
    "input": {
      "type": "tcp"
    },
    "@timestamp": "2022-05-13T11:45:53.630Z",
    "ecs": {
      "version": "1.12.0"
    },
    "service": {
      "type": "sophos"
    },
    "host": {
      "name": "Elastic Stack Hostname"
    },
    "event": {
      "ingested": "2022-05-13T11:45:54.589904775Z",
      "timezone": "+02:00",
      "module": "sophos",
      "dataset": "sophos.xg"
    }
  },
  "fields": {
    "_conf.mappings.hostname": [
      "FW Hostname"
    ],
    "_conf.mappings.serial": [
      "FW Serial"
    ],
    "fileset.name": [
      "xg"
    ],
    "input.type": [
      "tcp"
    ],
    "agent.hostname": [
      "Elastic Stack Hostname"
    ],
    "message": [
      "\u0000\u0010\u0000\u000e\u0000\u0017\u0000\u0019\u0000\u001c\u0000\u001b\u0000\u0018\u0000\u001a\u0000\u0016\u0000#\u0000\u0000\u0000\r\u0000 \u0000\u001e\u0006\u0001\u0006\u0002\u0006\u0003\u0005\u0001\u0005\u0002\u0005\u0003\u0004\u0001\u0004\u0002\u0004\u0003\u0003\u0001\u0003\u0002\u0003\u0003\u0002\u0001\u0002\u0002\u0002\u0003"
    ],
    "_conf.default": [
      "firewall.localgroup.local"
    ],
    "tags": [
      "sophos-xg",
      "forwarded"
    ],
    "service.type": [
      "sophos"
    ],
    "agent.type": [
      "filebeat"
    ],
    "event.ingested": [
      "2022-05-13T11:45:54.589Z"
    ],
    "@timestamp": [
      "2022-05-13T11:45:53.630Z"
    ],
    "agent.id": [
      "059ca9b8-745a-43b4-9ea1-d4bba45cb164"
    ],
    "event.module": [
      "sophos"
    ],
    "ecs.version": [
      "1.12.0"
    ],
    "log.source.address": [
      "private FW IP:18876"
    ],
    "error.message": [
      "Provided Grok expressions do not match field value: [\\u0000\\u0010\\u0000\\u000E\\u0000\\u0017\\u0000\\u0019\\u0000\\u001C\\u0000\\u001B\\u0000\\u0018\\u0000\\u001A\\u0000\\u0016\\u0000#\\u0000\\u0000\\u0000\\r\\u0000 \\u0000\\u001E\\u0006\\u0001\\u0006\\u0002\\u0006\\u0003\\u0005\\u0001\\u0005\\u0002\\u0005\\u0003\\u0004\\u0001\\u0004\\u0002\\u0004\\u0003\\u0003\\u0001\\u0003\\u0002\\u0003\\u0003\\u0002\\u0001\\u0002\\u0002\\u0002\\u0003]"
    ],
    "agent.ephemeral_id": [
      "d3fe32d3-77c6-4676-ad74-61afbcd2928c"
    ],
    "agent.version": [
      "7.17.3"
    ],
    "agent.name": [
      "Elastic Stack Hostname"
    ],
    "host.name": [
      "Elastic Stack Hostname"
    ],
    "event.dataset": [
      "sophos.xg"
    ],
    "event.timezone": [
      "+02:00"
    ]
  }
}

Elasticsearch and Filebeat version is 7.17.3, Sophos FW is XG330 (SFOS 18.5.2 MR-2-Build380)

Thanks in advance.

Looks like sophos is sending Unicode.

I am not sure if the

encoding : utf-8

Is.a valid settings for the sophos module I started filebeat up with that setting it did not complain but I don't have a sophos FW to ttest.

If not you may need to change it on the FW config side

btw you should really just run the following as there is more to load than just the pipelines such as index templates, dashboards etc. Although I don't think there are dashboards for Sophos but just FYI for future reference.

./filebeat setup -e -modules="sophos"

Sadly configuring the filebeat sophos module with

encoding : utf-8

did not have an effect. Changing the Format on the Sophos Side from either Device standard or Central Reporting did not solve it either.

Are The messages still coming encoded in Unicode?. You need to figure out how to send the sophos logs . So when you look at the mesage field it looks like normal text.

They are, yes. I am not sure if there is a cli option / switch for the Sophos XG on how to send the logs. If I am sending to my syslog-collector on port 514 and then to logstash it is working fine, but I would like to use the sophos index template with filebeat.

I will see if I can figure this out.

Thank you.

What does the message field look like when you use logstash?

There is a way to forward then to the sophos module ... in Elasticsearch if it is in the correct format?

One log-entry for example:

{
    "_index": "logstash-2022.05.16-000001",
    "_type": "_doc",
    "_id": "XS1RzYAB9OV1mX8rn_VF",
    "_version": 1,
    "_score": 1,
    "_source": {
      "@timestamp": "2022-05-16T14:41:59.984Z",
      "type": "syslog",
      "@version": "1",
      "host": "192.168.x.x",
      "message": "<14>device_name=\"SFW\" timestamp=\"2022-05-16T16:41:59+0200\" device_model=\"XG330\" device_serial_id=\"serial\" log_id=\"010101600001\" log_type=\"Firewall\" log_component=\"Firewall Rule\" log_subtype=\"Allowed\" log_version=1 severity=\"Information\" fw_rule_id=\"10\" nat_rule_id=\"0\" fw_rule_type=\"USER\" ether_type=\"Unknown (0x0000)\" in_interface=\"Port1\" out_interface=\"Port2\" src_mac=\"XX:XX:XX:XX:XX:XX\" dst_mac=\"XX:XX:XX:XX:XX:XX\" src_ip=\"192.168.x.x\" src_country=\"R1\" dst_ip=\"192.168.x.x\" dst_country=\"R1\" protocol=\"TCP\" src_port=43007 dst_port=80 src_zone_type=\"LAN\" src_zone=\"LAN\" dst_zone_type=\"VPN\" dst_zone=\"VPN\" con_event=\"Start\" con_id=\"672667904\" hb_status=\"No Heartbeat\" app_resolved_by=\"Signature\" app_is_cloud=\"FALSE\" qualifier=\"New\" in_display_interface=\"Port1\" out_display_interface=\"Port2\"",
      "tags": [
        "_grokparsefailure"
      ]
    },
    "fields": {
      "@timestamp": [
        "2022-05-16T14:41:59.984Z"
      ],
      "type.keyword": [
        "syslog"
      ],
      "tags.keyword": [
        "_grokparsefailure"
      ],
      "@version": [
        "1"
      ],
      "host": [
        "192.168.x.x"
      ],
      "type": [
        "syslog"
      ],
      "host.keyword": [
        "192.168.x.x"
      ],
      "message": [
        "<14>device_name=\"SFW\" timestamp=\"2022-05-16T16:41:59+0200\" device_model=\"XG330\" device_serial_id=\"serial\" log_id=\"010101600001\" log_type=\"Firewall\" log_component=\"Firewall Rule\" log_subtype=\"Allowed\" log_version=1 severity=\"Information\" fw_rule_id=\"10\" nat_rule_id=\"0\" fw_rule_type=\"USER\" ether_type=\"Unknown (0x0000)\" in_interface=\"Port1\" out_interface=\"Port2\" src_mac=\"XX:XX:XX:XX:XX:XX\" dst_mac=\"XX:XX:XX:XX:XX:XX\" src_ip=\"192.168.x.x\" src_country=\"R1\" dst_ip=\"192.168.x.x\" dst_country=\"R1\" protocol=\"TCP\" src_port=43007 dst_port=80 src_zone_type=\"LAN\" src_zone=\"LAN\" dst_zone_type=\"VPN\" dst_zone=\"VPN\" con_event=\"Start\" con_id=\"672667904\" hb_status=\"No Heartbeat\" app_resolved_by=\"Signature\" app_is_cloud=\"FALSE\" qualifier=\"New\" in_display_interface=\"Port1\" out_display_interface=\"Port2\""
      ],
      "tags": [
        "_grokparsefailure"
      ]
    }
  }

The grok pattern is not correct yet but I just wanted to check if it works. How would I forward those logs to the sophos module in filebeat?

OK we can do this!! :slight_smile:

Best would still be to figure out why filebeat is getting strange encoding but we can try this.

So I just took that message and simulated through the sophos xg pipeline and it worked!

POST /_ingest/pipeline/filebeat-8.2.0-sophos-xg-pipeline/_simulate
{
  "docs": [
    {
      "_index": "m-index",
      "_id": "kMpUTHoBr7SFhhL5-98P",
      "_source": {
        "@timestamp": "2021-11-09T03:17:20.0516241Z",
        "message": "<14>device_name=\"SFW\" timestamp=\"2022-05-16T16:41:59+0200\" device_model=\"XG330\" device_serial_id=\"serial\" log_id=\"010101600001\" log_type=\"Firewall\" log_component=\"Firewall Rule\" log_subtype=\"Allowed\" log_version=1 severity=\"Information\" fw_rule_id=\"10\" nat_rule_id=\"0\" fw_rule_type=\"USER\" ether_type=\"Unknown (0x0000)\" in_interface=\"Port1\" out_interface=\"Port2\" src_mac=\"XX:XX:XX:XX:XX:XX\" dst_mac=\"XX:XX:XX:XX:XX:XX\" src_ip=\"192.168.x.x\" src_country=\"R1\" dst_ip=\"192.168.x.x\" dst_country=\"R1\" protocol=\"TCP\" src_port=43007 dst_port=80 src_zone_type=\"LAN\" src_zone=\"LAN\" dst_zone_type=\"VPN\" dst_zone=\"VPN\" con_event=\"Start\" con_id=\"672667904\" hb_status=\"No Heartbeat\" app_resolved_by=\"Signature\" app_is_cloud=\"FALSE\" qualifier=\"New\" in_display_interface=\"Port1\" out_display_interface=\"Port2\""
      }
    }
  ]
}

So results look good!!

{
  "docs" : [
    {
      "doc" : {
        "_index" : "m-index",
        "_id" : "kMpUTHoBr7SFhhL5-98P",
        "_source" : {
          "server" : {
            "port" : 80,
            "mac" : "XX:XX:XX:XX:XX:XX",
            "ip" : "192.168.x.x"
          },
          "log" : {
            "level" : "informational"
          },
          "destination" : {
            "port" : 80,
            "mac" : "XX:XX:XX:XX:XX:XX",
            "ip" : "192.168.x.x"
          },
          "rule" : {
            "id" : "10"
          },
          "source" : {
            "port" : 43007,
            "mac" : "XX:XX:XX:XX:XX:XX",
            "ip" : "192.168.x.x"
          },
          "error" : {
            "message" : "'192.168.x.x' is not an IP string literal."
          },
          "network" : {
            "transport" : "TCP"
          },
          "observer" : {
            "ingress" : {
              "interface" : {
                "name" : "Port1"
              }
            },
            "product" : "XG",
            "type" : "firewall",
            "vendor" : "Sophos",
            "egress" : {
              "interface" : {
                "name" : "Port2"
              }
            }
          },
          "@timestamp" : "2022-05-16T14:41:59.000Z",
          "sophos" : {
            "xg" : {
              "device_model" : "XG330",
              "con_id" : "672667904",
              "fw_rule_type" : "USER",
              "app_is_cloud" : "FALSE",
              "device_name" : "SFW",
              "log_type" : "Firewall",
              "ether_type" : "Unknown (0x0000)",
              "device_serial_id" : "serial",
              "timestamp" : "2022-05-16T16:41:59+0200",
              "severity" : "Information",
              "dst_country" : "R1",
              "log_component" : "Firewall Rule",
              "log_subtype" : "Allowed",
              "dst_zone_type" : "VPN",
              "hb_status" : "No Heartbeat",
              "message_id" : "00001",
              "dst_zone" : "VPN",
              "src_port" : "43007",
              "src_zone_type" : "LAN",
              "con_event" : "Start",
              "src_country" : "R1",
              "src_zone" : "LAN",
              "app_resolved_by" : "Signature",
              "qualifier" : "New",
              "dst_port" : "80",
              "log_version" : "1"
            }
          },
          "client" : {
            "port" : 43007,
            "mac" : "XX:XX:XX:XX:XX:XX",
            "ip" : "192.168.x.x"
          },
          "event" : {
            "severity" : "6",
            "ingested" : "2022-05-16T17:22:39.662913297Z",
            "original" : "device_name=\"SFW\" timestamp=\"2022-05-16T16:41:59+0200\" device_model=\"XG330\" device_serial_id=\"serial\" log_id=\"010101600001\" log_type=\"Firewall\" log_component=\"Firewall Rule\" log_subtype=\"Allowed\" log_version=1 severity=\"Information\" fw_rule_id=\"10\" nat_rule_id=\"0\" fw_rule_type=\"USER\" ether_type=\"Unknown (0x0000)\" in_interface=\"Port1\" out_interface=\"Port2\" src_mac=\"XX:XX:XX:XX:XX:XX\" dst_mac=\"XX:XX:XX:XX:XX:XX\" src_ip=\"192.168.x.x\" src_country=\"R1\" dst_ip=\"192.168.x.x\" dst_country=\"R1\" protocol=\"TCP\" src_port=43007 dst_port=80 src_zone_type=\"LAN\" src_zone=\"LAN\" dst_zone_type=\"VPN\" dst_zone=\"VPN\" con_event=\"Start\" con_id=\"672667904\" hb_status=\"No Heartbeat\" app_resolved_by=\"Signature\" app_is_cloud=\"FALSE\" qualifier=\"New\" in_display_interface=\"Port1\" out_display_interface=\"Port2\"",
            "code" : "010101600001",
            "kind" : "event",
            "module" : "sophos",
            "action" : "Allowed",
            "category" : [
              "network"
            ],
            "dataset" : "sophos.xg",
            "outcome" : "success"
          }
        },
        "_ingest" : {
          "timestamp" : "2022-05-16T17:22:39.662913297Z"
        }
      }
    }
  ]
}

So let me think about that...

There are a couple things here...

The Module Expects the data to be written to the filebeat index ... and needs to use the pipeline.

assuming ran the full setup

./filebeat setup -e

So your architecture would be

Syslog -> Logstash -> Elasticsearch -> sophos pipeline -> filebeat index

Just pass the doc s through

In the logstash.conf set the output to Elasticsearch set the index and set the pipeline to

pipeline => "filebeat-7.17.3-sophos-xg-pipeline"
index => "filebeat-7.17.3" <!-- Filebeat write alias...

This is a lot of manual but give it a try

You would still want to figure out why you are getting unicode ... (and I am not sure that setting works on tcp input)

Ok, as long as it works I am happy to try. I have one question though about the architecture: Is it possible to ingest only the sophos logs into the pipeline and leave other syslog-messages as is when they have the same input?

Currently I have the following configuration:

IPTables Configuration on the single-node Host for TCP/UDP:514 and DNAT to Port 5000.

Two seperate pipelines: main and syslog in pipelines.yml:

# This file is where you define your pipelines. You can define multiple.
# For more information on multiple pipelines, see the documentation:
#   https://www.elastic.co/guide/en/logstash/current/multiple-pipelines.html

- pipeline.id: main
  path.config: "/etc/logstash/conf.d/logstash.conf"
- pipeline.id: syslog
  path.config: "/etc/logstash/conf.d/logstash-syslog.conf"

main pipeline was for filebeat, but this is not active anymore since I changed the output directly to Elasticsearch. This leaves me with the syslog pipeline and the port 5000 input:

input {
    udp {
      port => 5000
      type => syslog
    }
  }
  
  filter {
    if [type] == "syslog" {
      grok {
        match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      }
      date {
        match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
        add_field => [ "received_at", "%{@timestamp}" ]
        add_field => [ "received_from", "%{host}" ]
      }
    if "GS-2352" in [message] {
      mutate { remove_tag => "_grokparsefailure" add_tag => "Lancom" }
      }
    else if "GS-2326" in [message] {
      mutate { remove_tag => "_grokparsefailure" add_tag => "Lancom" }
      }
    }
  }
  
  output {
    elasticsearch {
      hosts => ["https://hostname:9200"]
      cacert => '/etc/logstash/config/certs/fullchain1.pem'
      index => "logstash-%{+YYYY.MM.dd}-000001"
      user => 'elastic'
      password => 'secret'
    }
    stdout {
      codec => rubydebug
    }
  }

If I change the pipeline and index there other syslog messages would be send to that too, correct? Is it possible to have different outputs based on tags or is there any other way to configure around that?

Sorry if that is a weird question or I am missing something, still very new to the Elastic Stack.

Thank you for your help, I appreciate it.

Short answer yes.. you can separate the output etc.... I will give you a couple options

But before I answer that I saw something that raised a question.

When you tried the Filebeat module did your architecture look like a) or b) below I asked because I was surprised to see the "The Main Pipeline was for Filebeat"

a) Sophos -> Filebeat -> Elasticsearch

b) Sophos -> Filebeat -> Logstash -> Elasticsearch

If it was b) can you share what that filebeat pipeline looked like?

To answer what you asked for this architecture

Syslog -> Logstash -> Elasticsearch -> sophos pipeline -> filebeat index

Option A) In short the easiest / clean what to do it is send the sophos syslog on another port, create a separate logstash pipeline that listens on that port and use the proper Elasticsearch output configuration.

Option B) Add logic in your main and add a tag or type then do conditional output to Elasticsearch

Pseudo code

output {
    if [type] == "syslog" {
    elasticsearch { ... }
      } else if [type] == "sophos"{
    elasticsearch { ... }
  }
}

Current architecture is Sophos -> Filebeat -> Elasticsearch

From my filebeat.yml:

# ================================== Outputs ===================================

# Configure what output to use when sending the data collected by the beat.

# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:
  # Array of hosts to connect to.
  hosts: ["https://hostname:9200"]

  # Protocol - either `http` (default) or `https`.
  protocol: "https"

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  username: "elastic"
  password: "secret"
  ssl.certificate_authorities: "/etc/filebeat/config/certs/fullchain1.pem"
  ssl.certificate: "/etc/filebeat/config/certs/fullchain1.pem"
  ssl.key: "/etc/filebeat/config/certs/privkey1.pem"
# ------------------------------ Logstash Output -------------------------------
#output.logstash:
  # The Logstash hosts
  # hosts: ["hostname:5044"]

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: "/etc/filebeat/config/certs/fullchain1.pem"

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/filebeat/config/certs/fullchain1.pem"

  # Client Certificate Key
  #ssl.key: "/etc/filebeat/config/certs/privkey1.pem"

# ================================= Processors =================================

Thank you, I will try this out.

That is the filebeat.yml... looks ok ...that is what leads to the wierd characters

You said you also had a main logstash pipeline for filebeat I was curious / asked what that was... but it is not important

Sorry, I thought it was not relevant since I am not using it anymore.

The main logstash pipeline:

input {
    beats {
      port => 5044
      ssl => true
      ssl_key => '/etc/logstash/config/certs/logstash.pkcs8.key'
      ssl_certificate => '/etc/logstash/config/certs/fullchain1.pem'
    }
  }
  filter {
    if ! [host][name] {
      mutate {
        rename => ["host", "hostname"]
        convert => {"hostname" => "string"}
      }
    }
  }
  output {
    elasticsearch {
      hosts => ["https://hostname:9200"]
      index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}-000001"
      cacert => '/etc/logstash/config/certs/fullchain1.pem'
      user => 'elastic'
      password => 'secret'
    }
  }

Adding logic in my syslog pipeline config did not work, I am not sure why. The pipeline did start but did not process any logs. In case it is of any intereset the config:

    port => 5000
    type => syslog
  }
}

filter {
  if [type] == "syslog" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
    }
    date {
      match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
      add_field => [ "received_at", "%{@timestamp}" ]
      add_field => [ "received_from", "%{host}" ]
    }
  if "XG330" in [message] {
    mutate { remove_tag => "_grokparsefailure" add_tag => "sophos" }
    }
  else if "GS-2352" in [message] {
    mutate { remove_tag => "_grokparsefailure" add_tag => "Lancom" }
    }
  else if "GS-2326" in [message] {
    mutate { remove_tag => "_grokparsefailure" add_tag => "Lancom" }
    }
  }
}

output {
  if [tag] == "sophos" {
  elasticsearch {
    hosts => ["https://hostname:9200"]
    cacert => '/etc/logstash/config/certs/fullchain1.pem'
    pipeline => "filebeat-7.17.3-sophos-xg-pipeline"
    index => "filebeat-7.17.3"
    user => 'elastic'
    password => 'secret'
  }
    } else if [tag] == "Lancom" {
  elasticsearch {
    hosts => ["https://hostname:9200"]
    cacert => '/etc/logstash/config/certs/fullchain1.pem'
    index => "logstash-%{+YYYY.MM.dd}-000001"
    user => 'elastic'
    password => 'secret'
  }
  stdout {
    codec => rubydebug
    }
  }
}

I did started to receive logs with another Port and forwarded it to another pipeline but those logs could not be indexed. IpTable Configuration:

sudo iptables -t nat -v -L PREROUTING -n --line-number
Chain PREROUTING (policy ACCEPT 686 packets, 125K bytes)
num   pkts bytes target     prot opt in     out     source               destination
1        6   312 DNAT       tcp  --  ens160 *       0.0.0.0/0            0.0.0.0/0            tcp dpt:9050 to:192.168.20.35:9055
2        4  2248 DNAT       udp  --  ens160 *       0.0.0.0/0            0.0.0.0/0            udp dpt:9050 to:192.168.20.35:9055
3        0     0 DNAT       tcp  --  ens160 *       0.0.0.0/0            0.0.0.0/0            tcp dpt:514 to:192.168.20.35:5000
4     3434  395K DNAT       udp  --  ens160 *       0.0.0.0/0            0.0.0.0/0            udp dpt:514 to:192.168.20.35:5000

sophos Pipeline configraution:

input {
    udp {
      port => 9055
      type => syslog
    }
  }

  filter {
    if [type] == "syslog" {
      grok {
        match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }
      }
      date {
        match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
        add_field => [ "received_at", "%{@timestamp}" ]
        add_field => [ "received_from", "%{host}" ]
      }
    if "XG330" in [message] {
      mutate { remove_tag => "_grokparsefailure" add_tag => "sophos" }
      }
    }
  }

  output {
    elasticsearch {
      hosts => ["https://hostname:9200"]
      cacert => '/etc/logstash/config/certs/fullchain1.pem'
      pipeline => "filebeat-7.17.3-sophos-xg-pipeline"
      index => "filebeat-7.17.3"
      user => 'elastic'
      password => 'secret'
    }
  }

I receive error logs like this:

"message": [
    "[2022-05-17T18:21:37,095][WARN ][logstash.outputs.elasticsearch][sophos][4f8fcd03753d030fe998bb3b544d7570105d70ac0f9d40c15cdedc9ce78d4133] Could not index event to Elasticsearch. {:status=>400, :action=>[\"index\", {:_id=>nil, :_index=>\"filebeat-7.17.3\", :routing=>nil, :pipeline=>\"filebeat-7.17.3-sophos-xg-pipeline\"}, {\"@version\"=>\"1\", \"type\"=>\"syslog\", \"message\"=>\"<14>device=\\\"SFW\\\" date=2022-05-17 time=18:21:36 timezone=\\\"CEST\\\" device_name=\\\"XG330\\\" device_id=serialnumber log_id=010101600001 log_type=\\\"Firewall\\\" log_component=\\\"Firewall Rule\\\" log_subtype=\\\"Allowed\\\" status=\\\"Allow\\\" priority=Information duration=0 fw_rule_id=2 nat_rule_id=3 policy_type=1 user_name=\\\"\\\" user_gp=\\\"\\\" iap=1 ips_policy_id=5 appfilter_policy_id=6 application=\\\"\\\" application_risk=0 application_technology=\\\"\\\" application_category=\\\"\\\" vlan_id=\\\"\\\" ether_type=Unknown (0x0000) bridge_name=\\\"\\\" bridge_display_name=\\\"\\\" in_interface=\\\"Port1\\\" in_display_interface=\\\"Port1\\\" out_interface=\\\"Port2\\\" out_display_interface=\\\"Port2\\\" src_mac=XX:XX:XX:XX:XX:XX dst_mac=XX:XX:XX:XX:XX:XX src_ip=192.168.X.X src_country_code=R1 dst_ip=X.X.X.X dst_country_code=USA protocol=\\\"TCP\\\" src_port=62626 dst_port=80 sent_pkts=0  recv_pkts=0 sent_bytes=0 recv_bytes=0 tran_src_ip=X.X.X.X tran_src_port=0 tran_dst_ip= tran_dst_port=0 srczonetype=\\\"LAN\\\" srczone=\\\"LAN\\\" dstzonetype=\\\"WAN\\\" dstzone=\\\"WAN\\\" dir_disp=\\\"\\\" connevent=\\\"Start\\\" connid=\\\"1484711808\\\" vconnid=\\\"\\\" hb_health=\\\"No Heartbeat\\\" message=\\\"\\\" appresolvedby=\\\"Signature\\\" app_is_cloud=0\", \"host\"=>\"192.168.X.X\", \"@timestamp\"=>2022-05-17T16:21:36.892Z, \"tags\"=>[\"sophos\"]}], :response=>{\"index\"=>{\"_index\"=>\"filebeat-7.17.3-2022.05.12-000001\", \"_type\"=>\"_doc\", \"_id\"=>\"ssDT0oAB9OV1mX8rLj3F\", \"status\"=>400, \"error\"=>{\"type\"=>\"mapper_parsing_exception\", \"reason\"=>\"failed to parse field [sophos.xg.sent_pkts] of type [long] in document with id 'ssDT0oAB9OV1mX8rLj3F'. Preview of field's value: '0 '\", \"caused_by\"=>{\"type\"=>\"illegal_argument_exception\", \"reason\"=>\"For input string: \\\"0 \\\"\"}}}}}"

The same results if i load the sophos pipeline without the grok filter.

I guess it is still an encoding issue?

Too many moving parts / changes... I am lost here are a few answers...

  1. Your output section has syntax is wrong

if [tag] == "sophos"

should be, tags are an array so they work different

if "sophos" in [tags] {

I guess this is with the seperate port / pipeline

No that is not... it is just a simple parsing issue

\"failed to parse field [sophos.xg.sent_pkts] of type [long] in document with id 'ssDT0oAB9OV1mX8rLj3F'. Preview of field's value: '0 '\", ... "illegal_argument_exception\", \"reason\"=>\"For input string: \\\"0 \\\"

There is a trailing space on sophos.xg.sent_pkts field

not sure if all or just a single event etc... etc..

Lost me... no need for grok filter if you just send sophos to it own port...

If just sophos going to port 9055 try this

input {
    udp {
      port => 9055
      type => syslog
    }
  }

  output {
    elasticsearch {
      hosts => ["https://hostname:9200"]
      cacert => '/etc/logstash/config/certs/fullchain1.pem'
      pipeline => "filebeat-7.17.3-sophos-xg-pipeline"
      index => "filebeat-7.17.3"
      user => 'elastic'
      password => 'secret'
    }
  }

Also you might need to clean up the filebeat indices and re-run full setup ...
./filebeat setup -e

Thanks for clearing that up, still learning...

So I changed my sophos pipeline.yml as you suggested and re-run the full filebeat setup.

Currently it still throws the same parsing errors. Out of 2,963 events which could not be indexed 2364 did throw the error:

"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"failed to parse field [sophos.xg.sent_pkts] of type [long] in document with id 'Ow3_1YAB9OV1mX8rm_Qw'. Preview of field's value: '0 '", "caused_by"=>{"type"=>"illegal_argument_exception", "reason"=>"For input string: \"0 \""}}}}}

the other 599 did had the following error message:

"error"=>{"type"=>"mapper_parsing_exception", "reason"=>"object mapping for [host] tried to parse field [host] as object, but found a concrete value"}}}}

I guess to resolve this I would need to learn Dissect Strings as documented here:

Regardless of those issues, I got it working with filebeat directly. Sophos -> Filebeat -> Elasticsearch. Sophos logs are now coming in and getting indexed without issues.

I really can not explain why though. I just changed the port in the sophos filebeat module from TCP:9004 to UDP:9055.

When I initially set-up the sophos module I had UDP:9005 configured, and no logs came in. I Changed it to TCP:9004 , since my fortinet module is already listening to UDP:9004 which resulted in the encoding issue.

UDP:9005 was only bound in the sophos module on the host, so still no idea why it did not work in the first place. I guess I did a mistake somewhere..

Thank you for your time and your help.

1 Like

Glad you got it working