'How to generate a logstash event within a custom java filter plugin after timeout?
I want to correlate messages sending to logstash within a filter and send them to the next filter of logstash inside the pipeline.
However I have successfully setup a custom logstash filter plugin according to elastic documentation.
My code of the filter plugin looks like this:
package org.logstashplugins;
import co.elastic.logstash.api.Configuration;
import co.elastic.logstash.api.Context;
import co.elastic.logstash.api.Event;
import co.elastic.logstash.api.Filter;
import co.elastic.logstash.api.FilterMatchListener;
import co.elastic.logstash.api.LogstashPlugin;
import co.elastic.logstash.api.PluginConfigSpec;
import org.apache.commons.lang3.StringUtils;
import java.util.Collection;
import java.util.Collections;
import java.util.*;
// class name must match plugin name
@LogstashPlugin(name = "java_filter_example")
public class JavaFilterExample implements Filter {
public static final PluginConfigSpec<String> SOURCE_CONFIG =
PluginConfigSpec.stringSetting("source", "message");
private String id;
private String sourceField;
private List<String> buffer;
public JavaFilterExample(String id, Configuration config, Context context)
{
// constructors should validate configuration options
this.id = id;
this.sourceField = config.get(SOURCE_CONFIG);
this.buffer = new ArrayList<String>();
}
@Override
public Collection<Event> filter(Collection<Event> events, FilterMatchListener matchListener)
{
for (Event e : events)
{
Object f = e.getField(sourceField);
if (f instanceof String)
{
buffer.add((String) f);
matchListener.filterMatched(e);
}
}
return events;
}
@Override
public Collection<PluginConfigSpec<?>> configSchema()
{
// should return a list of all configuration options for this plugin
return Collections.singletonList(SOURCE_CONFIG);
}
@Override
public String getId()
{
return this.id;
}
}
Now I want to add a Thread which is being executed in the background which runs over the buffer containing the messages and sends a correlated event after a timeout occurs or if a new event arrives containing a "finish" message.
My question is now, how can I trigger a new event and send it to the next filters in the logstash pipeline? What is the java API or class to this?
I know there is an Aggregate Filter Plugin which meet partially my requirements, but I have to implement different parsers for the messages so it is better to implement a custom plugin by my own. But it is not clear accoding to documentations how to send a new event without returning an collection of modified events inside the filter method. Can you help me to achieve this?
I need something like this (in pseudo code):
public void anotherFunctionInsideAnotherJavaClass(...)
{
Logstash.sendEvent(new Event(...));
}
I need to generate and send an event inside an other context than inside the filter method. Is this possible to do?
Hopefully it is clear what my problem is.
Thanks.
Solution 1:[1]
Event is not accessible from co.elastic.logstash.api.Event
, you need to use:
new org.logstash.Event(...);
Sources
This article follows the attribution requirements of Stack Overflow and is licensed under CC BY-SA 3.0.
Source: Stack Overflow
Solution | Source |
---|---|
Solution 1 | TheAnotherWise |