It is not possible to use two input codecs, you can use only one, so you will need to use the plain codec in the beats input and use a json filter to parse your json messages.
You will need to be able to determine which log is a plain text message and which log is a json message, which is what was already asked.
This should be done in Filebeat, so you need to share your filebeat configuration.
The kafka inputs that you shared does not help, they are basically the same, pointing to the same topic patterns, it is not possible to know if they point to the same kafka cluster or not.
You will probably need to add a tag or a new field in each filebeat input so you can use this information in Logstash and apply the json filter.
Something like this:
if "json" in [tags] {
json {
source => "message"
}
}
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.