Hello,
we want to use protobuf codec to get messages from Kafka, for that I converted following proto scheme to ruby using 3.5.0 google converter. Logstash starts the pipeline without errors, but when I publish messages to the topic, I get following error:
[2019-04-19T09:51:17,194][WARN ][logstash.codecs.protobuf ] Couldn't decode protobuf: #
<NoMethodError: undefined method `parse' for
Bk::Sk::Pk::Reservations::Proto::Types::V0::ReservationEntry:Class>.
Initial proto scheme:
syntax = "proto3";
package bk.sk.pk.reservations.proto.types.v0;
option java_multiple_files = true;
message ReservationEntry {
string id = 1;
string internalAccountId = 2;
ReservationState state = 3;
InstructedAmount instructedAmount = 4;
Lifetime lifetime = 5;
Requestor requestor = 6;
Description description = 7;
bool forceMarker = 8;
string creationTimestamp = 9; // ISO timestamp format: yyyy-MM-ddTHH:mm:ss.SSSSSSX
}
message Requestor {
string productCode = 1;
string systemCode = 2;
string init = 3;
}
enum ReservationState {
RESERVED = 0;
CANCELED = 1;
CONSUMED = 2;
EXPIRED = 3;
}
message Lifetime {
string startDateTime = 1; // ISO timestamp format: yyyy-MM-ddTHH:mm:ss.SSSSSSX
string endDateTime = 2; // ISO timestamp format: yyyy-MM-ddTHH:mm:ss.SSSSSSX
}
message InstructedAmount {
DecimalNumber amount = 1;
// 3 characters long currency code
string currency = 2;
}
message DecimalNumber {
int64 unscaledValue = 1;
int32 scale = 2;
}
message Description {
string text1 = 1;
string text2 = 2;
}
Converted ruby scheme:
require 'google/protobuf'
Google::Protobuf::DescriptorPool.generated_pool.build do
add_message "bk.sk.pk.reservations.proto.types.v0.ReservationEntry" do
optional :id, :string, 1
optional :internalAccountId, :string, 2
optional :state, :enum, 3, "bk.sk.pk.reservations.proto.types.v0.ReservationState"
optional :instructedAmount, :message, 4, "bk.sk.pk.reservations.proto.types.v0.InstructedAmount"
optional :lifetime, :message, 5, "bk.sk.pk.reservations.proto.types.v0.Lifetime"
optional :requestor, :message, 6, "bk.sk.pk.reservations.proto.types.v0.Requestor"
optional :description, :message, 7, "bk.sk.pk.reservations.proto.types.v0.Description"
optional :forceMarker, :bool, 8
optional :creationTimestamp, :string, 9
end
add_message "bk.sk.pk.reservations.proto.types.v0.Requestor" do
optional :productCode, :string, 1
optional :systemCode, :string, 2
optional :init, :string, 3
end
add_message "bk.sk.pk.reservations.proto.types.v0.Lifetime" do
optional :startDateTime, :string, 1
optional :endDateTime, :string, 2
end
add_message "bk.sk.pk.reservations.proto.types.v0.InstructedAmount" do
optional :amount, :message, 1, "bk.sk.pk.reservations.proto.types.v0.DecimalNumber"
optional :currency, :string, 2
end
add_message "bk.sk.pk.reservations.proto.types.v0.DecimalNumber" do
optional :unscaledValue, :int64, 1
optional :scale, :int32, 2
end
add_message "bk.sk.pk.reservations.proto.types.v0.Description" do
optional :text1, :string, 1
optional :text2, :string, 2
end
add_enum "bk.sk.pk.reservations.proto.types.v0.ReservationState" do
value :RESERVED, 0
value :CANCELED, 1
value :CONSUMED, 2
value :EXPIRED, 3
end
end
module Bk
module Sk
module Pk
module Reservations
module Proto
module Types
module V0
ReservationEntry = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.ReservationEntry").msgclass
Requestor = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.Requestor").msgclass
Lifetime = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.Lifetime").msgclass
InstructedAmount = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.InstructedAmount").msgclass
DecimalNumber = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.DecimalNumber").msgclass
Description = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.Description").msgclass
ReservationState = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.ReservationState").enummodule
end
end
end
end
end
end
end
Logstash pipeline:
input {
kafka {
bootstrap_servers => 'kafka-0.kafka:9092,kafka-1.kafka:9092,kafka-2.kafka:9092,kafka-3.kafka:9092,kafka-4.kafka:9092,kafka-5.kafka:9092'
topics => ["processed-entries"]
client_id => "logstash_pipe3"
group_id => "logstash-pre"
codec => protobuf
{
class_name => "Bk::Sk::Pk::Reservations::Proto::Types::V0::ReservationEntry"
include_path => "/usr/share/logstash/proto-res-schemas/ReservationEntry_pb.rb"
}
decorate_events => "true"
auto_offset_reset => "earliest"
}
}
Any idea what we're doing wrong?