Protobuf codec error "undefined method `parse'"

(Sebastian Vaisov) #1

Hello,

we want to use protobuf codec to get messages from Kafka, for that I converted following proto scheme to ruby using 3.5.0 google converter. Logstash starts the pipeline without errors, but when I publish messages to the topic, I get following error:

[2019-04-19T09:51:17,194][WARN ][logstash.codecs.protobuf ] Couldn't decode protobuf: # 
<NoMethodError: undefined method `parse' for 
Bk::Sk::Pk::Reservations::Proto::Types::V0::ReservationEntry:Class>.

Initial proto scheme:

syntax = "proto3";
package bk.sk.pk.reservations.proto.types.v0;

option java_multiple_files = true;

message ReservationEntry {
    string id = 1;
    string internalAccountId = 2;
    ReservationState state = 3;
    InstructedAmount instructedAmount = 4;
    Lifetime lifetime = 5;
    Requestor requestor = 6;
    Description description = 7;
    bool forceMarker = 8;
    string creationTimestamp = 9; // ISO timestamp format: yyyy-MM-ddTHH:mm:ss.SSSSSSX
}

message Requestor {
    string productCode = 1;

    string systemCode = 2;

    string init = 3;
}

enum ReservationState {
    RESERVED = 0;
    CANCELED = 1;
    CONSUMED = 2;
    EXPIRED = 3;
}

message Lifetime {
    string startDateTime = 1; // ISO timestamp format: yyyy-MM-ddTHH:mm:ss.SSSSSSX

    string endDateTime = 2; // ISO timestamp format: yyyy-MM-ddTHH:mm:ss.SSSSSSX
}

message InstructedAmount {
    DecimalNumber amount = 1;

    // 3 characters long currency code
    string currency = 2;
}

message DecimalNumber {
    int64 unscaledValue = 1;
    int32 scale = 2;
}

message Description {
    string text1 = 1;

    string text2 = 2;
}

Converted ruby scheme:

require 'google/protobuf'

Google::Protobuf::DescriptorPool.generated_pool.build do
  add_message "bk.sk.pk.reservations.proto.types.v0.ReservationEntry" do
    optional :id, :string, 1
    optional :internalAccountId, :string, 2
    optional :state, :enum, 3, "bk.sk.pk.reservations.proto.types.v0.ReservationState"
    optional :instructedAmount, :message, 4, "bk.sk.pk.reservations.proto.types.v0.InstructedAmount"
    optional :lifetime, :message, 5, "bk.sk.pk.reservations.proto.types.v0.Lifetime"
    optional :requestor, :message, 6, "bk.sk.pk.reservations.proto.types.v0.Requestor"
    optional :description, :message, 7, "bk.sk.pk.reservations.proto.types.v0.Description"
    optional :forceMarker, :bool, 8
    optional :creationTimestamp, :string, 9
  end
  add_message "bk.sk.pk.reservations.proto.types.v0.Requestor" do
    optional :productCode, :string, 1
    optional :systemCode, :string, 2
    optional :init, :string, 3
  end
  add_message "bk.sk.pk.reservations.proto.types.v0.Lifetime" do
    optional :startDateTime, :string, 1
    optional :endDateTime, :string, 2
  end
  add_message "bk.sk.pk.reservations.proto.types.v0.InstructedAmount" do
    optional :amount, :message, 1, "bk.sk.pk.reservations.proto.types.v0.DecimalNumber"
    optional :currency, :string, 2
  end
  add_message "bk.sk.pk.reservations.proto.types.v0.DecimalNumber" do
    optional :unscaledValue, :int64, 1
    optional :scale, :int32, 2
  end
  add_message "bk.sk.pk.reservations.proto.types.v0.Description" do
    optional :text1, :string, 1
    optional :text2, :string, 2
  end
  add_enum "bk.sk.pk.reservations.proto.types.v0.ReservationState" do
    value :RESERVED, 0
    value :CANCELED, 1
    value :CONSUMED, 2
    value :EXPIRED, 3
  end
end

module Bk
  module Sk
    module Pk
      module Reservations
        module Proto
          module Types
            module V0
              ReservationEntry = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.ReservationEntry").msgclass
              Requestor = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.Requestor").msgclass
              Lifetime = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.Lifetime").msgclass
              InstructedAmount = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.InstructedAmount").msgclass
              DecimalNumber = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.DecimalNumber").msgclass
              Description = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.Description").msgclass
              ReservationState = Google::Protobuf::DescriptorPool.generated_pool.lookup("bk.sk.pk.reservations.proto.types.v0.ReservationState").enummodule
            end
          end
        end
      end
    end
  end
end

Logstash pipeline:

input {
  kafka {
    bootstrap_servers => 'kafka-0.kafka:9092,kafka-1.kafka:9092,kafka-2.kafka:9092,kafka-3.kafka:9092,kafka-4.kafka:9092,kafka-5.kafka:9092'
    topics => ["processed-entries"]
    client_id => "logstash_pipe3"
    group_id => "logstash-pre"
    codec => protobuf
    {
      class_name => "Bk::Sk::Pk::Reservations::Proto::Types::V0::ReservationEntry"
      include_path => "/usr/share/logstash/proto-res-schemas/ReservationEntry_pb.rb"
    }
    decorate_events => "true"
    auto_offset_reset => "earliest"
  }
}

Any idea what we're doing wrong?

(system) closed #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.