I want to correlate messages sending to logstash within a filter and send them to the next filter of logstash inside the pipeline.
However I have successfully setup a custom logstash filter plugin according to elastic documentation.
My code of the filter plugin looks like this:
package org.logstashplugins;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.FilterMatchListener;
import co.elastic.logstash.api.LogstashPlugin;
import co.elastic.logstash.api.PluginConfigSpec;
import org.apache.commons.lang3.StringUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.*;
// class name must match plugin name
@LogstashPlugin(name = "java_filter_example")
public class JavaFilterExample implements Filter {
public static final PluginConfigSpec<String> SOURCE_CONFIG =
PluginConfigSpec.stringSetting("source", "message");
private String id;
private String sourceField;
private List<String> buffer;
public JavaFilterExample(String id, Configuration config, Context context)
{
// constructors should validate configuration options
this.id = id;
this.sourceField = config.get(SOURCE_CONFIG);
this.buffer = new ArrayList<String>();
}
@Override
public Collection<Event> filter(Collection<Event> events, FilterMatchListener matchListener)
{
for (Event e : events)
{
Object f = e.getField(sourceField);
if (f instanceof String)
{
buffer.add((String) f);
matchListener.filterMatched(e);
}
}
return events;
}
@Override
public Collection<PluginConfigSpec<?>> configSchema()
{
// should return a list of all configuration options for this plugin
return Collections.singletonList(SOURCE_CONFIG);
}
@Override
public String getId()
{
return this.id;
}
}
Now I want to add a Thread which is being executed in the background which runs over the buffer containing the messages and sends a correlated event after a timeout occurs or if a new event arrives containing a "finish" message.
My question is now, how can I trigger a new event and send it to the next filters in the logstash pipeline? What is the java API or class to this?
I know there is an Aggregate Filter Plugin which meet partially my requirements, but I have to implement different parsers for the messages so it is better to implement a custom plugin by my own. But it is not clear accoding to documentations how to send a new event without returning an collection of modified events inside the filter method. Can you help me to achieve this?
I need something like this (in pseudo code):
public void anotherFunctionInsideAnotherJavaClass(...)
{
Logstash.sendEvent(new Event(...));
}
I need to generate and send an event inside an other context than inside the filter method. Is this possible to do?
So this is similar to the logstash aggregate filter plugin (Aggregate filter plugin | Logstash Reference [7.15] | Elastic) but I can only find the translated ruby code on github not the java implementation for this filter.
I need something similar, I need the interfaces how to store and cache events like the Aggregate Filter plugin does but with my custom parser on top. Has anyone an idea how to push something from another class to the next filter inside logstash or to output?
Hopefully it is clear what my problem is.
Thanks in advance for any advice.