Error parsing json in Logstash

So I have been trying to setup cowrie to link it with my ELK but apparently I have been getting the error parsing json in my configuration files.

The setup for my ELK is from this link.

Error when it shows on my logstash-plain.log

[2024-06-10T16:52:32,899][WARN ][logstash.filters.json    ][main][74d2e7affe0e8332b5be876c2741cbbf9cbcb2a9af46ab40f79c0776dcfdd167] Error parsing json {:source=>"message", :raw=>["Jun 10 10:45:04 ubuntu-server dockerd[1205]: time=\"2024-06-10T10:45:04.672278722+08:00\" level=info msg=\"API listen on /run/docker.sock\"", "time=\"2024-06-10T10:45:04.672278722+08:00\" level=info msg=\"API listen on /run/docker.sock\""], :exception=>#<Java::JavaLang::ClassCastException: class org.jruby.RubyArray cannot be cast to class org.jruby.RubyIO (org.jruby.RubyArray and org.jruby.RubyIO are in unnamed module of loader 'app')>}

Elasticsearch.yml file

network.host: 0.0.0.0
cluster.name: elastic-demo
node.name: demo-node
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
http.port: 9200
xpack.security.enabled: true
xpack.security.enrollment.enabled: true
xpack.security.http.ssl:
  enabled: true
  keystore.path: certs/http.p12
xpack.security.transport.ssl:
  enabled: true
  verification_mode: certificate
  keystore.path: certs/transport.p12
  truststore.path: certs/transport.p12
cluster.initial_master_nodes: ["demo-node"]
http.host: 0.0.0.0

Kibana.yml file

server.port: 5601
server.host: 0.0.0.0
server.name: demo-server
logging.root.level: info
logging:
  appenders:
    file:
      type: file
      fileName: /var/log/kibana/kibana.log
      layout:
        type: json
  root:
    appenders:
      - default
      - file
pid.file: /run/kibana/kibana.pid

elasticsearch.hosts: ['https://<ELK IP>:9200']
elasticsearch.serviceAccountToken: xxxx
elasticsearch.ssl.certificateAuthorities: [/var/lib/kibana/ca_xxx.crt]
xpack.fleet.outputs: [{id: fleet-default-output, name: default, is_default: true, is_default_mon]
xpack.encryptedSavedObjects.encryptionKey: xxx
xpack.reporting.encryptionKey: xxx
xpack.security.encryptionKey: xxx

Logstash.yml file

node.name: demo-node
path.data: /var/lib/logstash
api.enabled: false
path.logs: /var/log/logstash
log.level: info

Logstash-cowrie.conf

input {
       # filebeats
       beats {
       	     port => 5044
             ssl => true
             ssl_certificate_authorities => ["/etc/logstash/certs/http_ca.crt"]
             ssl_certificate => "/etc/logstash/certs/logstash.crt"
             ssl_key => "/etc/logstash/certs/logstash.key"
             ssl_verify_mode => "force_peer"
             type => "cowrie"
       }

       # if you don't want to use filebeat: this is the actual live log file to monitor
       #file {
       #       path => ["/home/cowrie/cowrie-git/log/cowrie.json"]
       #       codec => json
       #       type => "cowrie"
       #}
}

filter {
    if [type] == "cowrie" {
        json {
	    source => "message"
            target => honeypot
	}

        date {
            match => [ "timestamp", "ISO8601" ]
        }

        if [src_ip]  {

            mutate {
                add_field => { "src_host" => "%{src_ip}" }
            }

            dns {
                reverse => [ "src_host" ]
                nameserver => [ "8.8.8.8", "8.8.4.4" ]
                action => "replace"
                hit_cache_size => 4096
                hit_cache_ttl => 900
                failed_cache_size => 512
                failed_cache_ttl => 900
            }


            geoip {
                source => "src_ip"
                target => "geoip"
                database => "/opt/logstash/vendor/geoip/GeoLite2-City.mmdb"
            }

        }

        mutate {
	    # cut out useless tags/fields
            remove_tag => [ "beats_input_codec_plain_applied"]
	    remove_field => [ "[log][file][path]", "[log][offset]" ]
        }
    }
}

output {
    if [type] == "cowrie" {
        elasticsearch {
            ssl => true
            hosts => ["https://<ELK IP>:9200"]
            cacert => "/etc/logstash/certs/http_ca.crt"
            user => "logstash_internal"
            password => "demo-password"
	    ilm_enabled => auto
	    ilm_rollover_alias => "cowrie-logstash"
        }
        #file {
        #    path => "/tmp/cowrie-logstash.log"
        #    codec => json
        #}
        stdout {
            codec => rubydebug
        }
    }
}

Filebeat.yml file

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input-specific configurations.

# filestream is an input for collecting log messages from files.
- type: filestream

  # Unique ID among all inputs, an ID is required.
  id: syslog

  # Change to true to enable this input configuration.
  enabled: true

 # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /var/log/syslog
    - /home/cowrie/cowrie/var/log/cowrie/cowrie.json*
    #- c:\programdata\elasticsearch\logs\*

 setup.template.settings:
  index.number_of_shards: 1

logging.level: info

output.logstash:
  hosts: ["<ELK IP>:5044"]
  ssl.enabled: true
  ssl.certificate_authorities: ["/etc/filebeat/certs/http_ca.crt"]
  ssl.certificate: "/etc/filebeat/certs/client.crt"
  ssl.key: "/etc/filebeat/certs/client.key"
  pipelining: 4

The problem is that your message is not JSON. This configuration creates a [message] field that is an array of strings and produces exactly the error message that you see.

input { generator { count => 1 lines => [ '' ] } }

output { stdout { codec => rubydebug { metadata => false } } }
filter {
    mutate { remove_field => [ "message" ] }

    mutate { add_field => { "message" => 'Jun 10 10:45:04 ubuntu-server dockerd[1205]: time="2024-06-10T10:45:04.672278722+08:00" level=info msg="API listen on /run/docker.sock' } }
    mutate { add_field => { "message" => 'time="2024-06-10T10:45:04.672278722+08:00" level=info msg="API listen on /run/docker.sock"' } }

    json { source => "message" remove_field => [ "message" ] }
}

Why would [message] be an array? Perhaps if you do something like this

input { generator { count => 1 lines => [ 'Jun 10 10:45:04 ubuntu-server dockerd[1205]: time="2024-06-10T10:45:04.672278722+08:00" level=info msg="API listen on /run/docker.sock' ] } }

output { stdout { codec => rubydebug { metadata => false } } }
filter {
    mutate { remove_field => [ "event", "host", "log" ] }

    grok { match => { "message" => "^%{SYSLOGTIMESTAMP} %{NOTSPACE} %{WORD}\[%{NUMBER}\]: %{DATA:message}$" } }
}

which does not overwrite [message], but instead turns it into an array.

Thank you. I am not very familiar with JSON file as I am still learning on the programming language. I think I have other issues now since I have removed the ssl details from my Logstash-cowrie.conf file.