Cannot initialize custom codec plugin

Hello.
I'm working on a new codec plugin that parses protobuf data in a unique way (meaning I can't use the existing protobuf plugin).

Here's the plugin code:

package com.ofekinger.logstash.plugins.mycodec;

import co.elastic.logstash.api.*;
// Protobuf imports
import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.Message;
import org.apache.commons.io.input.CharSequenceInputStream;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.zip.InflaterInputStream;

import static java.nio.charset.StandardCharsets.ISO_8859_1;


@LogstashPlugin(name = "my_codec")
public class MyCodec implements Codec {
    private final String id;
    public static String MY_CODEC = "my_codec";

    public MyCodec() {
        this.id = UUID.randomUUID().toString();
    }

    public MyCodec(String id, Configuration config, Context context) {
        this.id = id;
    }

    public MyCodec(final Configuration config, final Context context) {
        this();
    }


    public void decode(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) {
        while (byteBuffer.hasRemaining()) {
            byte nextByte = byteBuffer.get();
            if (nextByte != '<') {
                throw new RuntimeException("Invalid character " + nextByte);
            }

            nextByte = byteBuffer.get();
            StringBuilder byteLength = new StringBuilder();

            while (nextByte != '>') {
                byteLength.append((char)nextByte);
                nextByte = byteBuffer.get();
            }

            int length = Integer.parseInt(byteLength.toString());

            byte[] protobufBytes = new byte[length];
            byteBuffer.get(protobufBytes);
            try {
                // Protobuf extraction

                consumer.accept(toMap(ufMessage));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    protected ExtensionRegistry getExtensionRegistry() {
        ExtensionRegistry extensionRegistry = ExtensionRegistry.newInstance();
        extensionRegistry.add(FieldsMsg.fields);
        extensionRegistry.add(CustomFieldsMsg.customFields);
        return extensionRegistry;
    }


    public void flush(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) {
        decode(byteBuffer, consumer);
    }

    public Map<String, Object> toMap(Message message) {
        return message.getAllFields().entrySet().stream()
                .collect(Collectors.toMap(
                        e -> e.getKey().getName(),
                        e -> {
                            if (e.getValue() instanceof Message) {
                                return toMap((Message) e.getValue());
                            } else {
                                return e.getValue();
                            }
                        }
                ));
    }

    @Override
    public void encode(Event event, OutputStream outputStream) {
        throw new UnsupportedOperationException("Encoding is not supported");
    }

    @Override
    public Codec cloneCodec() {
        return new MyCodec();
    }

    @Override
    public Collection<PluginConfigSpec<?>> configSchema() {
        return Collections.emptyList();
    }

    @Override
    public String getId() {
        return this.id;
    }
}

For some reason, whenever I try to use the codec in logstash I run into an error:

[2023-06-19T16:10:05,174][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:another_test, :exception=>"Java::JavaLang::IllegalStateException", :message=>"Unable to configure plugins: class org.jruby.java.proxies.ConcreteJavaProxy cannot be cast to class org.jruby.RubyClass (org.jruby.java.proxies.ConcreteJavaProxy and org.jruby.RubyClass are in unnamed module of loader 'app')", :backtrace=>["org.logstash.config.ir.CompiledPipeline.<init>(CompiledPipeline.java:120)", "org.logstash.execution.JavaBasePipelineExt.initialize(JavaBasePipelineExt.java:85)", "org.logstash.execution.JavaBasePipelineExt$INVOKER$i$1$0$initialize.call(JavaBasePipelineExt$INVOKER$i$1$0$initialize.gen)", "org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:846)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuper(IRRuntimeHelpers.java:1229)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuperSplatArgs(IRRuntimeHelpers.java:1202)", "org.jruby.ir.targets.indy.InstanceSuperInvokeSite.invoke(InstanceSuperInvokeSite.java:29)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$initialize$0(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/java_pipeline.rb:48)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:139)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:112)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:329)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:87)", "org.jruby.RubyClass.newInstance(RubyClass.java:911)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen)", "org.jruby.ir.targets.indy.InvokeSite.invoke(InvokeSite.java:208)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/pipeline_action/create.rb:50)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0$__VARARGS__(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/pipeline_action/create.rb:49)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:139)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:112)", "org.jruby.ir.targets.indy.InvokeSite.invoke(InvokeSite.java:208)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.agent.RUBY$block$converge_state$2(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/agent.rb:386)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:141)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:64)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.Block.call(Block.java:143)", "org.jruby.RubyProc.call(RubyProc.java:309)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:107)", "java.base/java.lang.Thread.run(Thread.java:833)"]}

Here's the logstash config I'm using:

input {
  http { 
    port => 9090
    codec => my_codec
  }
}

filter {
}



output {
  stdout {
    codec => "json_lines"
  }
}

However, when I change the config to use curly brackets:

input {
  http { 
    port => 9090
    codec => my_codec {
      
    }
  }
}

filter {
}



output {
  stdout {
    codec => "json_lines"
  }
}

I'm getting this error:

[2023-06-19T16:26:44,537][ERROR][logstash.agent           ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:another_test, :exception=>"Java::JavaLang::IllegalStateException", :message=>"Unable to configure plugins: (TypeError) Java type is not serializable, cannot be marshaled class org.logstash.config.ir.compiler.JavaCodecDelegator", :backtrace=>["org.logstash.config.ir.CompiledPipeline.<init>(CompiledPipeline.java:120)", "org.logstash.execution.JavaBasePipelineExt.initialize(JavaBasePipelineExt.java:85)", "org.logstash.execution.JavaBasePipelineExt$INVOKER$i$1$0$initialize.call(JavaBasePipelineExt$INVOKER$i$1$0$initialize.gen)", "org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:846)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuper(IRRuntimeHelpers.java:1229)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuperSplatArgs(IRRuntimeHelpers.java:1202)", "org.jruby.ir.targets.indy.InstanceSuperInvokeSite.invoke(InstanceSuperInvokeSite.java:29)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$initialize$0(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/java_pipeline.rb:48)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:139)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:112)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:329)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:87)", "org.jruby.RubyClass.newInstance(RubyClass.java:911)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen)", "org.jruby.ir.targets.indy.InvokeSite.invoke(InvokeSite.java:208)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/pipeline_action/create.rb:50)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0$__VARARGS__(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/pipeline_action/create.rb:49)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:139)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:112)", "org.jruby.ir.targets.indy.InvokeSite.invoke(InvokeSite.java:208)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.agent.RUBY$block$converge_state$2(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/agent.rb:386)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:141)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:64)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.Block.call(Block.java:143)", "org.jruby.RubyProc.call(RubyProc.java:309)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:107)", "java.base/java.lang.Thread.run(Thread.java:833)"]}

I was not able to figure out what is the actual error in the code.
Any help would be much appreciated.

Thank you in advance!

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.