JSON log parse to separate fields

Hi all!
I have a question about implementing JSON log parsing to separated fields in Kibana.
I use fluend to get logs from my k8s cluster.
See the config:

  containers.input.conf: |-
    <source>
      @id fluentd-containers.log
      @type tail
      path /var/log/containers/*.log
      pos_file /var/log/es-containers.log.pos
      tag raw.kubernetes.*
      read_from_head true
      <parse>
        @type multi_format
        <pattern>
          format json
          time_key time
          time_format %Y-%m-%dT%H:%M:%S.%NZ
        </pattern>
        <pattern>
          format /^(?<time>.+) (?<stream>stdout|stderr) [^ ]* (?<log>.*)$/
          time_format %Y-%m-%dT%H:%M:%S.%N%:z
        </pattern>
      </parse>
    </source>
    # Detect exceptions in the log output and forward them as one log entry.
    <match raw.kubernetes.**>
      @id raw.kubernetes
      @type detect_exceptions
      remove_tag_prefix raw
      message log
      stream stream
      multiline_flush_interval 5
      max_bytes 500000
      max_lines 1000
    </match>
  system.input.conf: |-
    <source>
      @id systemd.log
      @type systemd
      tag systemd
      read_from_head true
      <storage>
        @type local
        persistent true
        path /var/log/systemd.log.pos
      </storage>
      <entry>
        field_map {"MESSAGE": "log", "_PID": ["process", "pid"], "_CMDLINE": "process", "_COMM": "cmd"}
        field_map_strict false
        fields_strip_underscores true
        fields_lowercase true
      </entry>
    </source>
    # Example:
    # I1118 21:26:53.975789       6 proxier.go:1096] Port "nodePort for kube-system/default-http-backend:http" (:31429/tcp) was open before and is still needed
    <source>
      @id kube-proxy.log
      @type tail
      format multiline
      multiline_flush_interval 5s
      format_firstline /^\w\d{4}/
      format1 /^(?<severity>\w)(?<time>\d{4} [^\s]*)\s+(?<pid>\d+)\s+(?<source>[^ \]]+)\] (?<log>.*)/
      time_format %m%d %H:%M:%S.%N
      path /var/log/kube-proxy.log
      pos_file /var/log/es-kube-proxy.log.pos
      tag kubeproxy
      read_from_head true
    </source>
output.conf: |-
    <filter kubeproxy>
      @type record_transformer
      enable_ruby
      <record>
        hostname ${ENV["HOSTNAME"]}
      </record>
    </filter>
    
    <filter **>
      @type prometheus
      <metric>
        type counter
        name fluentd_input_status_num_records_total
        desc Total number of log entries generated by either application containers or system components
      </metric>
    </filter>
    <filter kubernetes.**>
      @type kubernetes_metadata
    </filter>
    <match **>
      @id elasticsearch
      @type elasticsearch
      @log_level info
      include_tag_key true
      host ${HOST}
      port 9200
      scheme https
      ssl_verify false
      ssl_version TLSv1_2
      user ${USER}
      password ${PASSWORD}
      logstash_format true
      logstash_prefix ${INDEX_NAME}
      <buffer>
        @type file
        path /var/log/fluentd-buffers/kubernetes.system.buffer
        flush_mode interval
        retry_type exponential_backoff
        flush_thread_count 2
        flush_interval 5s
        retry_forever
        retry_max_interval 30
        chunk_limit_size 2M
        queue_limit_length 8
        overflow_action block
      </buffer>
    </match>

How I receive all logs and JSON logs at the current time (see the highlighted fields within the JSON log string - these need to be parsed out into individual fields in the index):

How it should be:



But I also have non a JSON type logs in this index and nothing should be broken.
I know that I can use Ingest Pipeline for filebeat, but in my case, I need to create something the same for fluentd.
Does anyone have ideas?
I will be glad for any help from you!