_jsonparsefailure with filebeat and logstash

Hello,

I use Filebeat to fetch data from Wazuh (HIDS) and send alerts to Logstash.
Then Logstash sends its data to ES and everything usually works fine.

However, sometimes after being away for a few days, I look on Kibana and see that there are parsing errors like 'jsonparsefailure'.

Here is the corresponding message:

{
  "_index": "wazuh-alerts-",
  "_id": "YZ8ufttsrh",
  "_version": 1,
  "_score": null,
  "_source": {
    "@version": "1",
    "message": "Jan  3 05:51:14 wazuh-manager filebeat[769437]: 2023-01-03T05:51:14.358Z#011INFO#011[input.harvester]#011log/harvester.go:340#011File is inactive. Closing because close_inactive of 5m0s reached.#011{\"input_id\": \"c5e42179-0a6f-4988-b4f3-c0edcfb1b6dc\", \"source\": \"/var/log/syslog\", \"state_id\": \"native::525914-1804\", \"finished\": false, \"os_id\": \"525914-1804\", \"old_source\": \"/var/log/syslog\", \"old_finished\": true, \"old_os_id\": \"525914-1804\", \"harvester_id\": \"25d29d53-8432-431a-b4af-12579e7f8549\"}Jan  3 05:51:15 wazuh-manager filebeat[769437]: 2023-01-03T05:51:15.369Z#011INFO#011[input.harvester]#011log/harvester.go:309#011Harvester started for paths: [/var/log/messages* /var/log/syslog*]#011{\"input_id\": \"c5e42179-0a6f-4988-b4f3-c0dc\", \"source\": \"/var/log/syslog\", \"state_id\": \"native::525914-1804\", \"finished\": false, \"os_id\": \"525914-1804\", \"old_source\": \"/var/log/syslog\", \"old_finished\": true, \"old_os_id\": \"525914-1804\", \"harvester_id\": \"bbdbb8a9-f915-4587-aae5-ef2b76fd49f0\"}Jan  3 05:51:17 wazuh-manager filebeat[769437]: 2023-01-03T05:51:17.372Z#011ERROR#011[logstash]#011logstash/async.go:280#011Failed to publish events caused by: write tcp 192.168.1.18:35676->192.168.1.23:5044: write: connection reset by peerJan  3 05:51:17 wazuh-manager filebeat[769437]: 2023-01-03T05:51:17.372Z#011INFO#011[publisher]#011pipeline/retry.go:219#011retryer: send unwait signal to consumerJan  3 05:51:17 wazuh-manager filebeat[769437]: 2023-01-03T05:51:17.372Z#011INFO#011[publisher]#011pipeline/retry.go:223#011  doneJan  3 05:51:18 wazuh-manager filebeat[769437]: 2023-01-03T05:51:18.946Z#011ERROR#011[publisher_pipeline_output]#011pipeline/output.go:180#011failed to publish events: write tcp 192.168.1.18:35676->192.168.1.16:5044: write: connection reset by peerJan  3 05:51:18 
wazuh-manager filebeat[769437]: 2023-01-03T05:51:18.946Z#011INFO#011[publisher_pipeline_output]#011pipeline/output.go:143#011Connecting to backoff(async(tcp://192.168.1.16:5044))Jan  3 05:51:18 wazuh-manager filebeat[769437]: 2023-01-03T05:51:18.946Z#011INFO#011[publisher]#011pipeline/retry.go:219#011retryer: send unwait signal to consumerJan  3 05:51:18 wazuh-manager filebeat[769437]: 2023-01-03T05:51:18.946Z#011INFO#011[publisher]#011pipeline/retry.go:223#011  doneJan  3 05:51:18 wazuh-manager filebeat[769437]: 2023-01-03T05:51:18.946Z#011INFO#011[publisher_pipeline_output]#011pipeline/output.go:151#011Connection to backoff(async(tcp://192.168.1.16:5044)) establishedJan  3 05:52:01 wazuh-manager CRON[963977]: pam_unix(cron:session): session opened for user root by (uid=0)Jan  3 05:52:01 wazuh-manager CRON[963978]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)Jan  3 05:52:01 
wazuh-manager CRON[963977]: pam_unix(cron:session): session closed for user rootJan  3 05:52:03 wazuh-manager filebeat[769437]: 2023-01-03T05:52:03.046Z#011INFO#011[input.harvester]#011log/harvester.go:309#011Harvester started for paths: [/var/log/auth.log* /var/log/secure*]#011{\"input_id\": \"f9c11fja-825c-5606-a9aa-782d29f4d8de\", \"source\": \"/var/log/auth.log\", \"state_id\": \"native::527100-1804\", \"finished\": false, \"os_id\": \"65050-5855\", \"old_source\": \"/var/log/auth.log\", \"old_finished\": true, \"old_os_id\": \"527100-1804\", \"harvester_id\": \"d546427-4ggd-4ey8-9927-3a69cydbd3\"}",
    "@timestamp": "2023-01-03T05:53:10.377572603Z",
    "tags": [
      "_jsonparsefailure",
      "wazuh"
    ],
    "type": "wazuh-alerts"
  },
  "fields": {
    "@timestamp": [
      "2023-01-03T05:53:10.377Z"
    ]
  },
  "sort": [
    1672725190377
  ]
}

I have received a similar message a hundred times. I would first like to know how to fix the parsing problem, before fixing the connection problems displayed. I don't understand why the parsing is badly done, what should I do?

Here is my filebeat file:


###################### Filebeat Configuration Example #########################

# ============================== Filebeat inputs ===============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

# filestream is an input for collecting log messages from files.
- type: log

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - "/var/ossec/logs/alerts/alerts.json"
  fields_under_root: true
  document_type: json
  json.message_key: log
  json.keys_under_root: true
  json.overwrite_keys: true
  fields:
    beat.type: wazuh_alerts
    #- c:\programdata\elasticsearch\logs\*


# ============================== Filebeat modules ==============================

filebeat.config.modules:
  # Glob pattern for configuration loading
  path: ${path.config}/modules.d/*.yml

  # Set to true to enable config reloading
  reload.enabled: true

  # Period on which files under path should be checked for changes
  reload.period: 15s

# ------------------------------ Logstash Output -------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["192.168.1.16:5044"]
  username: "logstash"
  password: "xx"

# ================================= Processors =================================
processors:
  - add_host_metadata:
      when.not.contains.tags: forwarded
  - add_cloud_metadata: ~
  - add_docker_metadata: ~
  - add_kubernetes_metadata: ~

# modules

filebeat.modules:
   - module: wazuh
     alerts:
       enabled: true

   - module: system
     syslog:
       enabled: true

setup.template.json.enabled: true
setup.template.json.path: '/etc/filebeat/wazuh-template.json'
setup.template.json.name: 'wazuh'
setup.template.overwrite: true
setup.ilm.enabled: false

Hi,

Could you post an example of a working message and also the content of your Logstash pipeline?

Best regards
Wolfram

What does your Logstash pipeline looks like?

You are collecting Wazuh messages, which are json messages, and also syslog messages with the module system, those messages are not json messages.

Are you filtering this in your logstash pipeline and only using the json filter for the wazuh messages?

Ok, here is an example of well-parsed alerts:

{
              "beat" => {
        "type" => "wazuh_alerts"
    },
    "Agent hostname" => "wazuh-manager",
              "data" => {
        "agent_status_name" => "agent_test",
          "agent_status_ip" => "192.168.1.6",
          "agent_status_id" => "024",
                   "status" => "active"
    },
        "@timestamp" => 2023-01-03T12:40:02.059Z,
               "log" => "",
           "decoder" => {
        "name" => "status_agent"
    },
           "Content" => "Agent status has changed.",
              "tags" => [
        [0] "wazuh",
        [1] "beats_input_raw_event"
    ],
              "rule" => {
              "mail" => false,
            "groups" => [
            [0] "monitoring"
        ],
                "id" => "100008",
        "firedtimes" => 6645
    },
          "@version" => "1",
             "input" => {
        "type" => "log"
    },
           "manager" => {
        "name" => "wazuh-manager"
    },
    "Level of alert" => 3,
             "agent" => {
        "ephemeral_id" => "0c3gg632-dd1c-5590-8cd3-13228",
             "version" => "7.17.7",
                "type" => "filebeat",
            "hostname" => "wazuh-manager"
    },
          "Agent IP" => "192.168.1.18",
          "full_log" => "2023-01-03 12:40:00,340 wazuh-agent: id:024 ip:192.168.1.6 name:agent_test status:active",
        "predecoder" => {
        "program_name" => "wazuh-agent",
           "timestamp" => "2023-01-03 12:40:00,34"
    },
          "location" => "/var/log/status.log",
              "type" => "wazuh-alerts",
              "host" => {
                   "ip" => [
            [0] "xxx"
        ],
        "containerized" => false,
         "architecture" => "x86_64",
                  "mac" => [
            [0] "xxx"
        ],
                   "id" => "fe2dcc385663",
                   "os" => {
            "codename" => "buster",
              "family" => "debian",
              "kernel" => "5.15.60-1-pve",
             "version" => "10 (buster)",
                "type" => "linux",
                "name" => "Debian GNU/Linux",
            "platform" => "debian"
        },
                 "name" => "wazuh-manager",
             "hostname" => "wazuh-manager"
    },
          "Agent ID" => "000",
                "id" => "1672749602.18051892",
               "ecs" => {
        "version" => "1.12.0"
    },
         "timestamp" => "2023-01-03T12:40:02.059+0000"
}

It's a simple alert that gives the status of a Wazuh agent.

Here the input pipeline :

        beats {
                port => 5044
                type => "wazuh-alerts"
                codec => "json_lines"
                tags => ["wazuh"]
        }

Here is the filter part of my pipeline:

input {
  pipeline {
    address => wazuh
  }
}

filter {

  if [agent][name] == "test" {

    if [data][srcip] {
      mutate {
        add_field => [ "IP Source", "%{[data][srcip]}" ]
      }
    }

    if [data][aws][sourceIPAddress] {
      mutate {
        add_field => [ "IP source", "%{[data][aws][sourceIPAddress]}" ]
      }
    }
    mutate {
      rename => ["[data][abuseipdb][source][srcip]","IP Source"]
    }

    if [data][srcip] {
      geoip {
        source => "[data][srcip]"
        ecs_compatibility => disabled
      }
    }
    date {
      match => ["timestamp", "ISO8601"]
      target => "@timestamp"
    }
  }
}

and the output file :

output {
   opensearch {
     hosts => ["https://FQDN_OPENSEARCH:9200"]
     index => "logstash-"
     user => "logstash"
     password => "xxx"
     ssl => true
     ssl_certificate_verification => true
     cacert => "/opt/logstash/config/certs/root-ca.pem"

  pipeline {
    send_to => logs # pipes/logs_output.conf
  }

}

OpenSearch/OpenDistro are AWS run products and differ from the original Elasticsearch and Kibana products that Elastic builds and maintains. You may need to contact them directly for further assistance.

(This is an automated response from your friendly Elastic bot. Please report this post if you have any suggestions or concerns :elasticheart: )

You need to share your entire pipeline, share the pipeline with the beats input and also the pipeline that send the data to the wazuh index.

Your issue is that you tried to parse a non json message with the json filter, you need to check your conditionals to make sure this won't happen.

I posted above the input, filter and output parts of the pipeline (separated because they don't belong to the same iles in my architecture).

Indeed, the message is not JSON, but how is that?

There is nothing in the posted pipeline that uses a json filter, so the relevant parts of your pipeline were not shared.

You have the system module enabled, this will collect logs from your system, probably from /var/log/*, the message you shared is from your syslog file, /var/log/messages or maybe /var/log/syslog depending on the distribution.

   - module: system
     syslog:
       enabled: true

As I said in the previous answer:

You are collecting Wazuh messages, which are json messages, and also syslog messages with the module system , those messages are not json messages.

You need to have conditionals to filter those two kind of messages and only apply the json filter to the json messages.

I'm using "json_lines" codec as above, can't it be related to this?

Oh, I see now.

Yes, if you are using the json_lines codec, and sending non json messages from Filebeat, which you are doing with the system module, you will get this error.

Ok no problem :slight_smile:

So, I can try to add these lines in Filebeat :

   - module: system
     syslog:
       enabled: true
       json.message_key: log
       json.keys_under_root: true
       json.overwrite_keys: true