Logstash and Beats with mTLS

Hi,

I have a working TLS connection between Beats and Logstash. What I now want is mTLS so that only Beats with a valid certificate can connect to Logstash. For policy reasons I can't use the certutil Tool within Elasticsearch but need to create custom certificates.

Here is what I have:

  • Filebeat: filebeat-7.14.1-1.x86_64
  • Logstash: logstash-oss-7.13.4-1.x86_64

(Both on CentOS 7)

Here's the relevant part of the Filebeat configuration:

output.logstash:
  hosts: [ "logstash01:5044","logstash02:5044"]
  ssl.enabled: true
  ssl.certificate_authorities: ["/opt/logstash-ca/ca.crt"]
  ssl.verification_mode: full
  ssl.key: /opt/logstash-ca/logstash01.key
  ssl.key_passphrase: ***
  ssl.certificate: /opt/logstash-ca/logstash01.crt

And the corresponding part of Logstash:

input {
  beats {
    port => 5044
    ssl => true
    ssl_certificate => "/etc/logstash/certs/logstash01-server.crt"
    ssl_key => "/etc/logstash/certs/logstash01-pkcs8.key"
    ssl_verify_mode => none
    ssl_certificate_authorities => ["/etc/logstash/certs/ca.crt"]
    ssl_peer_metadata => false
    ssl_key_passphrase => "***"
  }
}

As soon as I change ssl_verify_mode to force_peer and ssl_peer_metadata to true I see the following messages in logstash-plain.log:

[2021-11-22T16:01:05,766][INFO ][org.logstash.beats.BeatsHandler][shipper][0be890fd94f309412fe80d6e7b9f8247b5871400a45ed08569ac9246c8a9fef2] [local: ***:5044, remote: ***:44222] Handling exception: java.lang.NullPointerException (caused by: java.lang.NullPointerException)
[2021-11-22T16:01:05,768][WARN ][io.netty.channel.DefaultChannelPipeline][shipper][0be890fd94f309412fe80d6e7b9f8247b5871400a45ed08569ac9246c8a9fef2] An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.NullPointerException: null
        at java.util.concurrent.ConcurrentHashMap.get(java/util/concurrent/ConcurrentHashMap.java:936) ~[?:?]
        at org.logstash.FieldReference.from(org/logstash/FieldReference.java:117) ~[logstash-core.jar:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
        at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(jdk/internal/reflect/NativeMethodAccessorImpl.java:62) ~[?:?]
        at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(jdk/internal/reflect/DelegatingMethodAccessorImpl.java:43) ~[?:?]
        at java.lang.reflect.Method.invoke(java/lang/reflect/Method.java:566) ~[?:?]
        at org.jruby.javasupport.JavaMethod.invokeDirectWithExceptionHandling(org/jruby/javasupport/JavaMethod.java:456) ~[jruby-complete-9.2.16.0.jar:?]
        at org.jruby.javasupport.JavaMethod.invokeStaticDirect(org/jruby/javasupport/JavaMethod.java:368) ~[jruby-complete-9.2.16.0.jar:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_beats_minus_6_dot_1_dot_5_minus_java.lib.logstash.inputs.beats.message_listener.set_nested(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-beats-6.1.5-java/lib/logstash/inputs/beats/message_listener.rb:163) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_beats_minus_6_dot_1_dot_5_minus_java.lib.logstash.inputs.beats.message_listener.RUBY$method$set_nested$0$__VARARGS__(usr/share/logstash/vendor/bundle/jruby/$2_dot_5_dot_0/gems/logstash_minus_input_minus_beats_minus_6_dot_1_dot_5_minus_java/lib/logstash/inputs/beats//usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-beats-6.1.5-java/lib/logstash/inputs/beats/message_listener.rb) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_beats_minus_6_dot_1_dot_5_minus_java.lib.logstash.inputs.beats.message_listener.extract_tls_peer(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-beats-6.1.5-java/lib/logstash/inputs/beats/message_listener.rb:145) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_beats_minus_6_dot_1_dot_5_minus_java.lib.logstash.inputs.beats.message_listener.RUBY$method$extract_tls_peer$0$__VARARGS__(usr/share/logstash/vendor/bundle/jruby/$2_dot_5_dot_0/gems/logstash_minus_input_minus_beats_minus_6_dot_1_dot_5_minus_java/lib/logstash/inputs/beats//usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-beats-6.1.5-java/lib/logstash/inputs/beats/message_listener.rb) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_beats_minus_6_dot_1_dot_5_minus_java.lib.logstash.inputs.beats.message_listener.onNewMessage(/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-beats-6.1.5-java/lib/logstash/inputs/beats/message_listener.rb:39) ~[?:?]
        at usr.share.logstash.vendor.bundle.jruby.$2_dot_5_dot_0.gems.logstash_minus_input_minus_beats_minus_6_dot_1_dot_5_minus_java.lib.logstash.inputs.beats.message_listener.RUBY$method$onNewMessage$0$__VARARGS__(usr/share/logstash/vendor/bundle/jruby/$2_dot_5_dot_0/gems/logstash_minus_input_minus_beats_minus_6_dot_1_dot_5_minus_java/lib/logstash/inputs/beats//usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/logstash-input-beats-6.1.5-java/lib/logstash/inputs/beats/message_listener.rb) ~[?:?]
        at org.logstash.beats.BeatsHandler.channelRead0(org/logstash/beats/BeatsHandler.java:52) ~[logstash-input-beats-6.1.5.jar:?]
        at org.logstash.beats.BeatsHandler.channelRead0(org/logstash/beats/BeatsHandler.java:12) ~[logstash-input-beats-6.1.5.jar:?]
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(io/netty/channel/SimpleChannelInboundHandler.java:99) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(io/netty/channel/AbstractChannelHandlerContext.java:379) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(io/netty/channel/AbstractChannelHandlerContext.java:365) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(io/netty/channel/AbstractChannelHandlerContext.java:357) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(io/netty/handler/codec/ByteToMessageDecoder.java:324) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(io/netty/handler/codec/ByteToMessageDecoder.java:296) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(io/netty/channel/AbstractChannelHandlerContext.java:379) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.channel.AbstractChannelHandlerContext.access$600(io/netty/channel/AbstractChannelHandlerContext.java:61) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.channel.AbstractChannelHandlerContext$7.run(io/netty/channel/AbstractChannelHandlerContext.java:370) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.util.concurrent.DefaultEventExecutor.run(io/netty/util/concurrent/DefaultEventExecutor.java:66) ~[netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(io/netty/util/concurrent/SingleThreadEventExecutor.java:989) [netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(io/netty/util/internal/ThreadExecutorMap.java:74) [netty-all-4.1.65.Final.jar:4.1.65.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(io/netty/util/concurrent/FastThreadLocalRunnable.java:30) [netty-all-4.1.65.Final.jar:4.1.65.Final]
        at java.lang.Thread.run(java/lang/Thread.java:829) [?:?]

I'm building the certificates via Ansible. Here's the code I use for the Beats CSR:

- name: Create CSR
  openssl_csr:
    path: "{{ ca_ca_dir }}/{{ inventory_hostname }}.csr"
    privatekey_path: "{{ ca_ca_dir }}/{{ inventory_hostname }}.key"
    privatekey_passphrase: "{{ ca_keypassphrase }}"
    country_name: "{{ ca_country }}"
    organization_name: "{{ ca_organization }}"
    common_name: "{{ inventory_hostname }}"
    subject_alt_name: "DNS:{{ ansible_hostname }},DNS:{{ ansible_fqdn }},DNS:{{ inventory_hostname }}"
    extended_key_usage:
      - clientAuth

and for the Logstash CSR

    - name: Create server CSR
      openssl_csr:
        path: "{{ ca_ca_dir }}/{{ inventory_hostname }}-server.csr"
        privatekey_path: "{{ ca_ca_dir }}/{{ inventory_hostname }}.key"
        privatekey_passphrase: "{{ ca_keypassphrase }}"
        country_name: "{{ ca_country }}"
        organization_name: "{{ ca_organization }}"
        common_name: "{{ inventory_hostname }}"
        subject_alt_name: "DNS:{{ ansible_hostname }},DNS:{{ ansible_fqdn }},DNS:{{ inventory_hostname }}"
        extended_key_usage:
          - serverAuth

Even if you don't know Ansible, I hope it's more or less self explanatory. Since I found some hints within Logstashs documentation that the cerfiticate needs to be fit for severAuth and clientAuth I tried to use both in both tasks but to no effect.

Could you help me tackle this issue that keeps bugging me for quite some time now?

Side note: I'm open sourcing the role for certificate management: GitHub - widhalmt/ansible-role-ca It's mostly intended for setups where one has to use customs CAs and it's very work in progress. When it's done, I'll move it to the official GitHub space of the company I'm working for (the link will stay intact)