[2022-01-26T12:26:56,585][INFO ][org.reflections.Reflections] Reflections took 201 ms to scan 1 urls, producing 119 keys and 417 values
[2022-01-26T12:26:59,973][FATAL][logstash.runner ] The given configuration is invalid. Reason: Unable to configure plugins: (SchemaParseError) Error validating default for eventType: at . expected type string, got null
[2022-01-26T12:26:59,991][FATAL][org.logstash.Logstash ] Logstash stopped processing because of an error: (SystemExit) exit
org.jruby.exceptions.SystemExit: (SystemExit) exit
at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:747) ~[jruby-complete-9.2.20.1.jar:?]
at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:710) ~[jruby-complete-9.2.20.1.jar:?]
at usr.share.logstash.lib.bootstrap.environment.<main>(/usr/share/logstash/lib/bootstrap/environment.rb:94) ~[?:?]
Isn't it an error on schema defined by "/home/abc/python/working/divolte-collector-with-apache-kafka/divolte-collector-0.9.0/conf/MyEventRecord3.avsc"?
You script around 'eventType' looks fine. I'm not familiar with kafka and avro, sorry.
I want to run this in background permanently what steps should i need to be taken on this, right now after executing this above command terminal process never shutdown until press Ctrl+c
@Tomo_M Thanks for your help , I am done with this above configuration
and I want one more help actually I want to set some logic to change my key name
here below is my logstash configuration
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
kafka {
bootstrap_servers => "http://localhost:9092"
topics => ["tracking"]
codec => avro {
schema_uri => "/home/abc/python/working/divolte-collector-with-apache-kafka/divolte-collector-0.9.0/conf/MyEventRecord3.avsc"
}
value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
}
}
filter {
if [eventType] == 'product_detail' {
prune {
blacklist_names => ["^user___.*"]
********************Question (mutate change in all fields because schema contains 62 fields) **************************
# Then here I want remove "product___" from front in all the keys
# product___bar = "10+"
******************************************************
}
}
if [eventType] == 'user_information' {
prune {
blacklist_names => ["^product___.*"]
*************Question (mutate change in all fields because schema contains 62 fields) **************************
# Then here I want remove "user___" from front in all the keys
# user___id = "1"
**********************************************
}
}
}
output {
stdout {
codec => rubydebug
}
if [eventType] == 'product_detail' {
kafka {
bootstrap_servers => "http://localhost:9092"
topic_id => 'product_detail'
}
}
else if [eventType] == 'user_information' {
kafka {
bootstrap_servers => "http://localhost:9092"
topic_id => 'user_information'
}
}
else {
kafka {
bootstrap_servers => "http://localhost:9092"
topic_id => 'unrecognized'
}
}
}
@Tomo_M Thank you so much its working perfect now
Actually I am not aware about how to use include in this "(k.include?(prefix))" and the if syntax.
For practice sake is there any online playground available for this. to see and compile the out
Can you please provide some links
Apache, Apache Lucene, Apache Hadoop, Hadoop, HDFS and the yellow elephant
logo are trademarks of the
Apache Software Foundation
in the United States and/or other countries.