When using logstash to consume binary data, some bytes are replaced with ef bf bd

The original data is avro data, but only plain => { charset => "BINARY" } is used for debugging to reduce interference.

avro data original (hexadecimal)

00000000: 00 80 40 00 cc e1 85 98 0e 00 3a 32 30 32 34 2d  ..@.......:2024-
00000010: 30 34 2d 31 38 54 30 36 3a 33 32 3a 30 34 2e 30  04-18T06:32:04.0
00000020: 30 30 2b 30 38 3a 30 30                          00+08:00

logstash obtained content

00000000: 00 ef bf bd 40 00 ef bf bd ef bf bd ef bf bd ef  ....@...........
00000010: bf bd 0e 00 3a 32 30 32 34 2d 30 34 2d 31 38 54  ....:2024-04-18T
00000020: 30 36 3a 33 32 3a 30 34 2e 30 30 30 2b 30 38 3a  06:32:04.000+08:
00000030: 30 30 0a                                         00.

logstash is 8.13.0 from docker. The following is the container-related configuration

docker-compose.yml

  logstash:
    image: logstash:8.13.0
    volumes:
      - /root/kafka-2-es/logstash.conf:/usr/share/logstash/pipeline/logstash.conf
    environment:
      - "XPACK_MONITORING_ENABLED=false"
      - "KAFKA_GROUP_ID"

logstash.conf

input {
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["test-01"]
    codec => plain {
      charset => "BINARY"
    }
    group_id => "${KAFKA_GROUP_ID}"
    auto_offset_reset => "earliest"
  }
  redis {
    host => "redis"
    port => 6379
    data_type => "list"
    key => "test-01"
    codec => plain {
      charset => "BINARY"
    }
  }
}

output {
  stdout {
    codec => rubydebug
  }
  file {
    path => "/tmp/test-01.log"
    codec => plain {
      charset => "BINARY"
    }
  }
  elasticsearch {
    hosts => ["https://elasticsearch:9200"]
    index => "test-01"
    user => "elastic"
    password => "123456"
    ssl => true
    ssl_certificate_verification => false
    manage_template => true
  }
}

Actions already tried:

  1. Use go to consume the binary content obtained from kafka; Correct
  2. Use logstash to consume the same content from kafka and redis; Special characters are replaced with ef bf bd
  3. use kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-01 --from-beginning to get the content directly from kafka is correct
  4. Use the content obtained by redis-cli --raw lpop test-01 ; Is correct
  5. Try different docker images, docker.elastic.co/logstash/logstash:8.13.0, logstash:8.13.0, bitnami/logstash:8.13.0 and 7.17.21 all have the same problem.
  6. It is still wrong to write file data directly in BINARY.

I looked for a circle of logstash.yml related settings and found no possible related settings.

That tells the codec on the input to translate the data from BINARY to UTF-8. The code always outputs UTF-8 because that was all the filters in the pipeline expect.

The ED BD BD is the UTF-8 encoding of the replacement character uFFFD. See also this thread.

Perhaps try the ASCII-8BIT encoding?

Thank you very much for your answer. Maybe I took some detours, or I was trying to simplify the problem by introducing new problems.

My original problem is actually this, the avro data I consume from kafka fails to parse (error) when read out.

From the event.original entered into es, and after I cross-verified and eliminated it many times, I located the suspected target to the point where some bytes in the avro binary obtained at the beginning were replaced.

And the data in ES can also support this phenomenon (fixed to a certain value after modification).

The original in ES is the content of the second binary. I can't find a way to avoid this problem, which will make my processed data always fail or be wrong.

I think my problem may be solved. This requires a special option. In order to avoid future generations encountering the same problem, I will post the solution here.

in GitHub - revpoint/logstash-codec-avro_schema_registry: A logstash codec plugin for decoding and encoding Avro records found a special option value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"

Yes, it is still possible to use 'avro' instead of avro_schema_registry. If you encounter binary data processing problems in the future, please check whether some special bytes have been replaced in the original data. If so, maybe this answer can save you a week (at least I should have saved this week).

The following is a complete logstash.conf demo, the binary content in the first code snippet of the avro data reference topic.

input {
  kafka {
    bootstrap_servers => "kafka:9092"
    topics => ["test-01"]
    codec => avro {
      schema_uri => "/app/test.avsc"
      tag_on_failure => true
      encoding => binary
    }
    value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    group_id => "${KAFKA_GROUP_ID}"
    auto_offset_reset => "earliest"
  }
}
output {
  stdout {
    codec => rubydebug
  }
}

Hope this answer helps you. Have a good night's sleep tonight.