The original data is avro data, but only plain => { charset => "BINARY" } is used for debugging to reduce interference.
avro data original (hexadecimal)
00000000: 00 80 40 00 cc e1 85 98 0e 00 3a 32 30 32 34 2d  ..@.......:2024-
00000010: 30 34 2d 31 38 54 30 36 3a 33 32 3a 30 34 2e 30  04-18T06:32:04.0
00000020: 30 30 2b 30 38 3a 30 30                          00+08:00
logstash obtained content
00000000: 00 ef bf bd 40 00 ef bf bd ef bf bd ef bf bd ef  ....@...........
00000010: bf bd 0e 00 3a 32 30 32 34 2d 30 34 2d 31 38 54  ....:2024-04-18T
00000020: 30 36 3a 33 32 3a 30 34 2e 30 30 30 2b 30 38 3a  06:32:04.000+08:
00000030: 30 30 0a                                         00.
logstash is 8.13.0 from docker. The following is the container-related configuration
docker-compose.yml
  logstash:
    image: logstash:8.13.0
    volumes:
      - /root/kafka-2-es/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
    environment:
      - "XPACK_MONITORING_ENABLED=false"
      - "KAFKA_GROUP_ID"
logstash.conf
input {
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["test-01"]
    codec => plain {
      charset => "BINARY"
    }
    group_id => "${KAFKA_GROUP_ID}"
    auto_offset_reset => "earliest"
  }
  redis {
    host => "redis"
    port => 6379
    data_type => "list"
    key => "test-01"
    codec => plain {
      charset => "BINARY"
    }
  }
}
output {
  stdout {
    codec => rubydebug
  }
  file {
    path => "/tmp/test-01.log"
    codec => plain {
      charset => "BINARY"
    }
  }
  elasticsearch {
    hosts => ["https://elasticsearch:9200"]
    index => "test-01"
    user => "elastic"
    password => "123456"
    ssl => true
    ssl_certificate_verification => false
    manage_template => true
  }
}
Actions already tried:
- Use go to consume the binary content obtained from kafka; Correct
 - Use logstash to consume the same content from kafka and redis; Special characters are replaced with 
ef bf bd - use 
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-01 --from-beginningto get the content directly from kafka is correct - Use the content obtained by 
redis-cli --raw lpop test-01; Is correct - Try different docker images, 
docker.elastic.co/logstash/logstash:8.13.0,logstash:8.13.0,bitnami/logstash:8.13.0and7.17.21all have the same problem. - It is still wrong to write file data directly in BINARY.
 
I looked for a circle of logstash.yml related settings and found no possible related settings.