How do I send Cowrie logs to ELK (with SSL) via Logstash?

When I am setting up my ELK without SSL configuration, I was able to setup Cowrie logstash in my logstash-cowrie.log and inside my filebeat.yml file.

The link from the ELK with SSL configuration:

I have created another ELK with the SSL configuration settings in it, apparently it is unable to pass the logs in there. From my ELK server I found out that there was this line in it.

[INFO ][logstash.outputs.elasticsearch][main] Not eligible for data streams because config contains one or more settings that are not compatible with data streams: {"ilm_rollover_alias"=>"cowrie-logstash", "ilm_enabled"=>"auto"}

The sample of my beats file from the guide that I have followed:

input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate_authorities => ["/etc/logstash/certs/http_ca.crt"]
    ssl_certificate => "/etc/logstash/certs/logstash.crt"
    ssl_key => "/etc/logstash/certs/logstash.key"
    ssl_verify_mode => "force_peer"
  }
}
filter {
  grok {
    match => { "message" => "%{SYSLOGLINE}" }
  }
  date {
    match => ["timestamp", "MMM dd HH:mm:ss"]
  }
}
output {
  elasticsearch {
    ssl => true
    hosts => ["https://<ELK IP>:9200"]
    cacert => "/etc/logstash/certs/http_ca.crt"
    user => "logstash_internal"
    password => "demo-password"
  }
}

Original logstash-cowrie.conf file that would be in /etc/logstash/conf.d

input {
       # filebeats
       beats {
       	     port => 5044
             type => "cowrie"
       }

       # if you don't want to use filebeat: this is the actual live log file to monitor
       #file {
       #       path => ["/home/cowrie/cowrie-git/log/cowrie.json"]
       #       codec => json
       #       type => "cowrie"
       #}
}

filter {
    if [type] == "cowrie" {
        json {
	    source => message
            target => honeypot
	}

        date {
            match => [ "timestamp", "ISO8601" ]
        }

        if [src_ip]  {

            mutate {
                add_field => { "src_host" => "%{src_ip}" }
            }

            dns {
                reverse => [ "src_host" ]
                nameserver => [ "8.8.8.8", "8.8.4.4" ]
                action => "replace"
                hit_cache_size => 4096
                hit_cache_ttl => 900
                failed_cache_size => 512
                failed_cache_ttl => 900
            }


            geoip {
                source => "src_ip"
                target => "geoip"
                database => "/opt/logstash/vendor/geoip/GeoLite2-City.mmdb"
            }

        }

        mutate {
	    # cut out useless tags/fields
            remove_tag => [ "beats_input_codec_plain_applied"]
	    remove_field => [ "[log][file][path]", "[log][offset]" ]
        }
    }
}

output {
    if [type] == "cowrie" {
        elasticsearch {
            hosts => ["<ELK IP>:9200"]
	    ilm_enabled => auto
	    ilm_rollover_alias => "cowrie-logstash"
        }
        #file {
        #    path => "/tmp/cowrie-logstash.log"
        #    codec => json
        #}
        stdout {
            codec => rubydebug
        }
    }
}

Filebeat.yml file

filebeat.inputs:
- type: filestream
  id: syslog
  paths:
    - /var/log/syslog
    - /cowrie/cowrie/var/log/cowrie/cowrie.json*

setup.template.settings:
  index.number_of_shards: 1

logging.level: info

output.logstash:
  hosts: ["<ELK IP>:5044"]
  ssl.enabled: true
  ssl.certificate_authorities: ["/etc/filebeat/certs/http_ca.crt"]
  ssl.certificate: "/etc/filebeat/certs/client.crt"
  ssl.key: "/etc/filebeat/certs/client.key"
  pipelining: 4

I have made some adjustments to my logstash-cowrie.conf file and filebeat.yml file but still wasn't able to get the indices inside my ELK. I am not too sure what went wrong in my conf file.

Been receiving this error in my logstash-plain.log

[2024-06-12T21:57:35,706][ERROR][org.logstash.execution.ShutdownWatcherExt][main] The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.
[2024-06-12T21:57:40,755][WARN ][org.logstash.execution.ShutdownWatcherExt][main] {"inflight_count"=>0, "stalling_threads_info"=>{"other"=>[{"thread_id"=>34, "name"=>"[main]<beats", "current_call"=>"[...]/vendor/bundle/jruby/3.1.0/gems/logstash-input-beats-6.8.3-java/lib/logstash/inputs/beats.rb:258:in `run'"}, {"thread_id"=>35, "name"=>"[main]<file", "current_call"=>"[...]/vendor/bundle/jruby/3.1.0/gems/logstash-input-file-4.4.6/lib/filewatch/watch.rb:55:in `sleep'"}, {"thread_id"=>33, "name"=>"[main]>worker1", "current_call"=>"[...]/logstash-core/lib/logstash/java_pipeline.rb:304:in `block in start_workers'"}, {"thread_id"=>21, "name"=>"[main]-pipeline-manager", "current_call"=>"[...]/vendor/bundle/jruby/3.1.0/gems/thwait-0.2.0/lib/thwait.rb:112:in `pop'"}], ["LogStash::Filters::Mutate", {"remove_field"=>["[log][file][path]", "[log][offset]"], "remove_tag"=>["beats_input_codec_plain_applied"], "id"=>"0bcd91e6d87a3e83b29e10c46ae5addf5638c2da50ab468c79b01d6019736b03"}]=>[{"thread_id"=>32, "name"=>"[main]>worker0", "current_call"=>"[...]/vendor/bundle/jruby/3.1.0/gems/stud-0.0.23/lib/stud/interval.rb:95:in `sleep'"}]}}

Logstash-cowrie.conf file

input {
       # filebeats
       #beats {
       #     port => 5044
       #     type => "cowrie"
       #}

       # if you don't want to use filebeat: this is the actual live log file to monitor
        file {
               path => ["/home/cowrie/cowrie-git/log/cowrie.json"]
               start_position => "beginning"
               sincedb_path => "/dev/null"
               codec => "json"
               type => "cowrie"
         }
}

filter {
    if [type] == "cowrie" {
        json {
	    source => message
            target => honeypot
	}

        date {
            match => [ "timestamp", "ISO8601" ]
        }

        if [src_ip]  {

            mutate {
                add_field => { "src_host" => "%{src_ip}" }
            }

            dns {
                reverse => [ "src_host" ]
                nameserver => [ "8.8.8.8", "8.8.4.4" ]
                action => "replace"
                hit_cache_size => 4096
                hit_cache_ttl => 900
                failed_cache_size => 512
                failed_cache_ttl => 900
            }


            geoip {
                source => "src_ip"
                target => "geoip"
                database => "/opt/logstash/vendor/geoip/GeoLite2-City.mmdb"
            }

        }

        mutate {
	    # cut out useless tags/fields
            remove_tag => [ "beats_input_codec_plain_applied"]
	    remove_field => [ "[log][file][path]", "[log][offset]" ]
        }
    }
}

output {
    if [type] == "cowrie" {
        elasticsearch {
            hosts => ["https://<ELK IP>:9200"]
            ilm_enabled => true
            ilm_rollover_alias => "cowrie-logstash"
            ssl => true
            ssl_certificate_verification => true
            cacert => "/etc/logstash/certs/http_ca.crt"
            user => "logstash_internal"
            password => "demo-password"
        }
        #file {
        #    path => "/tmp/cowrie-logstash.log"
        #    codec => json
        #}
        stdout {
            codec => rubydebug
        }
    }
}

Filebeat.yml file

# ============================== Filebeat inputs ===============================

filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input-specific configurations.

# filestream is an input for collecting log messages from files.
- type: filestream

  # Unique ID among all inputs, an ID is required.
  id: syslog

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /var/log/syslog
    #- c:\programdata\elasticsearch\logs\*

- type: log
  enabled: true
  paths:
    - /home/cowrie/cowrie/var/log/cowrie/cowrie.json
  fields:
    event.type: cowrie

setup.template.settings:
  index.number_of_shards: 1

logging.level: info

output.logstash:
  hosts: ["<ELK IP>:5044"]
  ssl.enabled: true
  ssl.certificate_authorities: ["/etc/filebeat/certs/http_ca.crt"]
  ssl.certificate: "/etc/filebeat/certs/client.crt"
  ssl.key: "/etc/filebeat/certs/client.key"
  pipelining: 4