Hello.
I'm working on a new codec plugin that parses protobuf data in a unique way (meaning I can't use the existing protobuf plugin).
Here's the plugin code:
package com.ofekinger.logstash.plugins.mycodec;
import co.elastic.logstash.api.*;
// Protobuf imports
import com.google.protobuf.ExtensionRegistry;
import com.google.protobuf.Message;
import org.apache.commons.io.input.CharSequenceInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.zip.InflaterInputStream;
import static java.nio.charset.StandardCharsets.ISO_8859_1;
@LogstashPlugin(name = "my_codec")
public class MyCodec implements Codec {
private final String id;
public static String MY_CODEC = "my_codec";
public MyCodec() {
this.id = UUID.randomUUID().toString();
}
public MyCodec(String id, Configuration config, Context context) {
this.id = id;
}
public MyCodec(final Configuration config, final Context context) {
this();
}
public void decode(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) {
while (byteBuffer.hasRemaining()) {
byte nextByte = byteBuffer.get();
if (nextByte != '<') {
throw new RuntimeException("Invalid character " + nextByte);
}
nextByte = byteBuffer.get();
StringBuilder byteLength = new StringBuilder();
while (nextByte != '>') {
byteLength.append((char)nextByte);
nextByte = byteBuffer.get();
}
int length = Integer.parseInt(byteLength.toString());
byte[] protobufBytes = new byte[length];
byteBuffer.get(protobufBytes);
try {
// Protobuf extraction
consumer.accept(toMap(ufMessage));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
protected ExtensionRegistry getExtensionRegistry() {
ExtensionRegistry extensionRegistry = ExtensionRegistry.newInstance();
extensionRegistry.add(FieldsMsg.fields);
extensionRegistry.add(CustomFieldsMsg.customFields);
return extensionRegistry;
}
public void flush(ByteBuffer byteBuffer, Consumer<Map<String, Object>> consumer) {
decode(byteBuffer, consumer);
}
public Map<String, Object> toMap(Message message) {
return message.getAllFields().entrySet().stream()
.collect(Collectors.toMap(
e -> e.getKey().getName(),
e -> {
if (e.getValue() instanceof Message) {
return toMap((Message) e.getValue());
} else {
return e.getValue();
}
}
));
}
@Override
public void encode(Event event, OutputStream outputStream) {
throw new UnsupportedOperationException("Encoding is not supported");
}
@Override
public Codec cloneCodec() {
return new MyCodec();
}
@Override
public Collection<PluginConfigSpec<?>> configSchema() {
return Collections.emptyList();
}
@Override
public String getId() {
return this.id;
}
}
For some reason, whenever I try to use the codec in logstash I run into an error:
[2023-06-19T16:10:05,174][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:another_test, :exception=>"Java::JavaLang::IllegalStateException", :message=>"Unable to configure plugins: class org.jruby.java.proxies.ConcreteJavaProxy cannot be cast to class org.jruby.RubyClass (org.jruby.java.proxies.ConcreteJavaProxy and org.jruby.RubyClass are in unnamed module of loader 'app')", :backtrace=>["org.logstash.config.ir.CompiledPipeline.<init>(CompiledPipeline.java:120)", "org.logstash.execution.JavaBasePipelineExt.initialize(JavaBasePipelineExt.java:85)", "org.logstash.execution.JavaBasePipelineExt$INVOKER$i$1$0$initialize.call(JavaBasePipelineExt$INVOKER$i$1$0$initialize.gen)", "org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:846)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuper(IRRuntimeHelpers.java:1229)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuperSplatArgs(IRRuntimeHelpers.java:1202)", "org.jruby.ir.targets.indy.InstanceSuperInvokeSite.invoke(InstanceSuperInvokeSite.java:29)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$initialize$0(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/java_pipeline.rb:48)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:139)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:112)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:329)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:87)", "org.jruby.RubyClass.newInstance(RubyClass.java:911)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen)", "org.jruby.ir.targets.indy.InvokeSite.invoke(InvokeSite.java:208)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/pipeline_action/create.rb:50)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0$__VARARGS__(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/pipeline_action/create.rb:49)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:139)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:112)", "org.jruby.ir.targets.indy.InvokeSite.invoke(InvokeSite.java:208)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.agent.RUBY$block$converge_state$2(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/agent.rb:386)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:141)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:64)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.Block.call(Block.java:143)", "org.jruby.RubyProc.call(RubyProc.java:309)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:107)", "java.base/java.lang.Thread.run(Thread.java:833)"]}
Here's the logstash config I'm using:
input {
http {
port => 9090
codec => my_codec
}
}
filter {
}
output {
stdout {
codec => "json_lines"
}
}
However, when I change the config to use curly brackets:
input {
http {
port => 9090
codec => my_codec {
}
}
}
filter {
}
output {
stdout {
codec => "json_lines"
}
}
I'm getting this error:
[2023-06-19T16:26:44,537][ERROR][logstash.agent ] Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:another_test, :exception=>"Java::JavaLang::IllegalStateException", :message=>"Unable to configure plugins: (TypeError) Java type is not serializable, cannot be marshaled class org.logstash.config.ir.compiler.JavaCodecDelegator", :backtrace=>["org.logstash.config.ir.CompiledPipeline.<init>(CompiledPipeline.java:120)", "org.logstash.execution.JavaBasePipelineExt.initialize(JavaBasePipelineExt.java:85)", "org.logstash.execution.JavaBasePipelineExt$INVOKER$i$1$0$initialize.call(JavaBasePipelineExt$INVOKER$i$1$0$initialize.gen)", "org.jruby.internal.runtime.methods.JavaMethod$JavaMethodN.call(JavaMethod.java:846)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuper(IRRuntimeHelpers.java:1229)", "org.jruby.ir.runtime.IRRuntimeHelpers.instanceSuperSplatArgs(IRRuntimeHelpers.java:1202)", "org.jruby.ir.targets.indy.InstanceSuperInvokeSite.invoke(InstanceSuperInvokeSite.java:29)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.java_pipeline.RUBY$method$initialize$0(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/java_pipeline.rb:48)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:139)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:112)", "org.jruby.runtime.callsite.CachingCallSite.cacheAndCall(CachingCallSite.java:329)", "org.jruby.runtime.callsite.CachingCallSite.call(CachingCallSite.java:87)", "org.jruby.RubyClass.newInstance(RubyClass.java:911)", "org.jruby.RubyClass$INVOKER$i$newInstance.call(RubyClass$INVOKER$i$newInstance.gen)", "org.jruby.ir.targets.indy.InvokeSite.invoke(InvokeSite.java:208)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/pipeline_action/create.rb:50)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.pipeline_action.create.RUBY$method$execute$0$__VARARGS__(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/pipeline_action/create.rb:49)", "org.jruby.internal.runtime.methods.CompiledIRMethod.call(CompiledIRMethod.java:139)", "org.jruby.internal.runtime.methods.MixedModeIRMethod.call(MixedModeIRMethod.java:112)", "org.jruby.ir.targets.indy.InvokeSite.invoke(InvokeSite.java:208)", "Users.ofekinger.Downloads.logstash_minus_8_dot_5_dot_3.logstash_minus_core.lib.logstash.agent.RUBY$block$converge_state$2(/Users/ofekinger/Downloads/logstash-8.5.3/logstash-core/lib/logstash/agent.rb:386)", "org.jruby.runtime.CompiledIRBlockBody.callDirect(CompiledIRBlockBody.java:141)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:64)", "org.jruby.runtime.IRBlockBody.call(IRBlockBody.java:58)", "org.jruby.runtime.Block.call(Block.java:143)", "org.jruby.RubyProc.call(RubyProc.java:309)", "org.jruby.internal.runtime.RubyRunnable.run(RubyRunnable.java:107)", "java.base/java.lang.Thread.run(Thread.java:833)"]}
I was not able to figure out what is the actual error in the code.
Any help would be much appreciated.
Thank you in advance!