Kafka keeps syslog timestamp in the message

Hi, I am having an issue with syslog timestamp when the Filebeat sends system module logs (/var/log/messages and /var/log/secure) through Kafka output.

If I send logs straight to Elasticsearch, the default Filebeat's dashboards (e.g. [Filebeat System] Syslog dashboard ECS) works just fine.

But, when I send data to Kafka, then Elasticsearch gets the log data with syslog timestamp at the beginning of each message which seems like preventing successful field parsing.

For example, the message when Filebeat sends logs straight to Elasticsearch looks like below.

pam_unix(sshd:session): session opened for user elkadmin by (uid=0)

But below shows the message when Filebeat sends logs through Kafka to Logstash.

Feb 19 13:20:32 nsoelk sshd[20694]: pam_unix(sshd:session): session opened for user elkadmin by (uid=0)

Here is my filebeat.yml setting for Kafka output.

    output.kafka:
      enabled: true
      hosts: ["kafka1:19094","kafka2:29094","kafka3:39094"]
      topic: 'test-topic'
      username: username
      password: password
      sasl.mechanism: SCRAM-SHA-256
      ssl.verification_mode: certificate
      ssl.certificate_authorities: ["/etc/filebeat/kafka_interm.cer"]

Here is my system.yml for filebeat.

    - module: system
      # Syslog
      syslog:
        enabled: true
        var.paths: ["/var/log/messages*"]

      # Authorization logs
      auth:
        enabled: true
        var.paths: ["/var/log/secure*"]

And last, here is Logstash input and output for Kafka.

    # Input
    input {
      kafka {
        sasl_mechanism => "SCRAM-SHA-256"
        security_protocol => "SASL_SSL"
        ssl_endpoint_identification_algorithm => ""
        ssl_truststore_location => "/usr/share/logstash/kafka.client.truststore.jks"
        ssl_truststore_password => "password"
        bootstrap_servers => "kafka1:19094,kafka2:29094,kafka3:39094"
        topics => ["test-topic"]
        decorate_events => true
        codec => json
        sasl_jaas_config => "org.apache.kafka.common.security.scram.ScramLoginModule required username='username' password='password';"
      }
    }

    # Filter
    filter {
    }

    # Output
    output {
      elasticsearch {
        hosts => ["https://es01:9200"]
        cacert => "/usr/share/elasticsearch/config/certificates/ca/ca.crt"
        index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
        user => "elastic"
        password => "password"
      }
    }

What settings should I change so that the syslog timestamp is not going to be inserted in front of the each message when it is delivered through Kafka output?

Thank you.

I found the solution for this by following this instruction page (Example: Set up Filebeat modules to work with Kafka and Logstash | Logstash Reference [7.11] | Elastic).

The key point was running pipeline setup for filebeat and setting up Logstash output to select correct ingest pipeline based on metadata.