Logstash configuration

Hello everyone,
I am stuck in log stash configuration
here below is my configuration please let me know what is wrong in this configuration

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  kafka {
        bootstrap_servers => "http://localhost:9092"
        topics => ["tracking"]
        codec => avro {
            schema_uri => "/home/abc/python/working/divolte-collector-with-apache-kafka/divolte-collector-0.9.0/conf/MyEventRecord3.avsc"
        }
        value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    }
}


filter {
    if [eventType] == 'product_detail' {
        prune {
            blacklist_names => ["^user___.*"]
        }
    }

    if [eventType] == 'user_information' {
        prune {
            blacklist_names => ["^product___.*"]
        }
    }
}
output {
    stdout {
        codec => rubydebug
    }

    if [eventType] == 'product_detail' {
        kafka {
            bootstrap_servers => "http://localhost:9092"
            topic_id => 'product_detail'
        }
    }
    else if [eventType] == 'user_information' {
        kafka {
            bootstrap_servers => "http://localhost:9092"
            topic_id => 'user_information'
        }
    }
    else {
        kafka {
            bootstrap_servers => "http://localhost:9092"
            topic_id => 'unrecognized'
        }

    }
}

Below error is coming

[2022-01-26T12:26:56,585][INFO ][org.reflections.Reflections] Reflections took 201 ms to scan 1 urls, producing 119 keys and 417 values
[2022-01-26T12:26:59,973][FATAL][logstash.runner          ] The given configuration is invalid. Reason: Unable to configure plugins: (SchemaParseError) Error validating default for eventType: at . expected type string, got null
[2022-01-26T12:26:59,991][FATAL][org.logstash.Logstash    ] Logstash stopped processing because of an error: (SystemExit) exit
org.jruby.exceptions.SystemExit: (SystemExit) exit
at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:747) ~[jruby-complete-9.2.20.1.jar:?]
at org.jruby.RubyKernel.exit(org/jruby/RubyKernel.java:710) ~[jruby-complete-9.2.20.1.jar:?]
at usr.share.logstash.lib.bootstrap.environment.<main>(/usr/share/logstash/lib/bootstrap/environment.rb:94) ~[?:?]

Please provide some guidance

Isn't it an error on schema defined by "/home/abc/python/working/divolte-collector-with-apache-kafka/divolte-collector-0.9.0/conf/MyEventRecord3.avsc"?

You script around 'eventType' looks fine. I'm not familiar with kafka and avro, sorry.

@Tom thanks as i remove defualt null from avro and its working perfectly now

as i have one question

sudo bin/logstash --path.settings /etc/logstash/ -f /etc/logstash/logstash-genome.conf --config.reload.automatic

I want to run this in background permanently what steps should i need to be taken on this, right now after executing this above command terminal process never shutdown until press Ctrl+c

You can run logstash as a service.

@Tomo_M Thanks for your help , I am done with this above configuration
and I want one more help actually I want to set some logic to change my key name
here below is my logstash configuration

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  kafka {
        bootstrap_servers => "http://localhost:9092"
        topics => ["tracking"]
        codec => avro {
            schema_uri => "/home/abc/python/working/divolte-collector-with-apache-kafka/divolte-collector-0.9.0/conf/MyEventRecord3.avsc"
        }
        value_deserializer_class => "org.apache.kafka.common.serialization.ByteArrayDeserializer"
    }
}


filter {
    if [eventType] == 'product_detail' {
        prune {
            blacklist_names => ["^user___.*"]
             ********************Question (mutate change in all fields because schema contains 62 fields) **************************
               # Then here I want remove "product___" from front in all the keys
               #  product___bar = "10+"
            ******************************************************
        }
    }

    if [eventType] == 'user_information' {
        prune {
            blacklist_names => ["^product___.*"]
            *************Question (mutate change in all fields because schema contains 62 fields) **************************
             # Then here I want remove "user___" from front in all the keys 
             # user___id = "1"
            **********************************************
        }
    }
}
output {
    stdout {
        codec => rubydebug
    }

    if [eventType] == 'product_detail' {
        kafka {
            bootstrap_servers => "http://localhost:9092"
            topic_id => 'product_detail'
        }
    }
    else if [eventType] == 'user_information' {
        kafka {
            bootstrap_servers => "http://localhost:9092"
            topic_id => 'user_information'
        }
    }
    else {
        kafka {
            bootstrap_servers => "http://localhost:9092"
            topic_id => 'unrecognized'
        }

    }
}

Query is in this above configuration

If you can specify all field names, mutate rename filter should be a simple solution.

If not, it seems you have to use ruby filter of your own script.

You can get root hash by event.to_hash.

Hi @Tomo_M
I don't know that much about ruby filter
But I made it can you please verify

ruby {

        code => "

            event.to_hash.each {|k, v|

                event[k.split('product___').last] = event[v]

                event.remove(k)

            }

        "

    }

So I can paste this code just after prune object, after prune complete then this filter works?

getting tags => [ [0] "_rubyexception"]

The 4th line should be
event.set(k.split('product___').last, v)

:no_mouth:
@Tomo_M through above line its working perfectly but all 61 fields gone due to event.remove(k) inside the for loop.

z = {"purchase___product_id": "22", "purchase___tax": "1", "normal": 1}
for i in list(z):
    try:
        z[i.split('___')[1]] = z[i]
    except IndexError:
        continue
    del z[i]
print(z)
#{'normal': 1, 'product_id': '22', 'tax': '1'}

want this exception handler in ruby filter

Really?

filter {
  ruby {
    code => "prefix = 'product___'
    event.to_hash.each{|k,v|
      if (!k.start_with?('@')) && (k.include?(prefix))
        event.set(k.split(prefix).last, v)
        event.remove(k)
      end
    }"
  }
}

worked for me.

@Tomo_M Thank you so much its working perfect now
Actually I am not aware about how to use include in this "(k.include?(prefix))" and the if syntax.
For practice sake is there any online playground available for this. to see and compile the out
Can you please provide some links

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.