How to register streams for custom aggregation

I am developing a custom metric aggregator using ES 1.3.4 similar to the
Stats aggregator. On a 2 node cluster, when I execute a query with this
custom aggregation, I get the following warning in ES logs:

[2014-11-20 00:50:14,319][WARN ][transport.netty ] [Blue Streak]
Message not fully read (response) for [313] handler org.elasticsearch.search
.action.SearchServiceTransportAction$6@27f233b2, error [false], resetting

On debugging, I found that a NPE is thrown by the following line:
InternalAggregation aggregation = AggregationStreams.stream(type).readResult
(in);
in InternalAggregations.java.

Seems like I've not registered streams for my custom aggregation and I am
not able to figure a way to do it. FYI, I do not have any module defined
for my custom aggregation. I am simply adding a custom AggregatorParser in
onModule(AggregationModule) method. This method is defined in a class that
extends AbstractPlugin. Can you help me register streams for my custom
aggregation?

For further clarity, I'll mention the code structure of my custom
aggregation below:

public class CustomPlugin extends AbstractPlugin {
...
public static void onModule(AggregationModule aggregationModule) {
aggregationModule.addAggregatorParser(CustomAggregationParser.class
);
}
...
}

class InternalCustomAggregation extends InternalNumericMetricsAggregation.
MultiValue implements CustomAggregation {
...
public static final AggregationStreams.Stream STREAM = new
AggregationStreams.Stream() {
@Override
public InternalCustomAggregation readResult(StreamInput in) throws
IOException {
InternalCustomAggregation result = new InternalCustomAggregation
();
result.readFrom(in);
return result;
}
};

public static void registerStreams() {
    AggregationStreams.registerStream(STREAM, TYPE.stream());
}
...

}

interface CustomAggregation extends Aggregation {
...
}

class CustomAggregator extends NumericMetricsAggregator.MultiValue {
...
}

class CustomAggregationBuilder extends ValuesSourceMetricsAggregationBuilder
{
...
}

class CustomAggregationParser implements Aggregator.Parser {
...
}

I could not find a way to get the method InternalPayloads.registerStreams()
called by ES for registering streams for my custom aggregation. Is there
something that I am missing here? Let me know if you need more information.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/2ecc735f-21a7-4fd4-879c-94491ea111d8%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

I finally figured it out. The solution was rather very simple. I updated
the onModule() method as below:

public static void onModule(AggregationModule aggregationModule) {
aggregationModule.addAggregatorParser(CustomAggregationParser.class);
InternalCustomAggregation.registerStreams();
}

I came to this solution by reading the Elasticsearch Cardinality Plugin
code at

On Friday, November 21, 2014 2:22:47 PM UTC+5:30, Mouzer wrote:

I am developing a custom metric aggregator using ES 1.3.4 similar to the
Stats aggregator. On a 2 node cluster, when I execute a query with this
custom aggregation, I get the following warning in ES logs:

[2014-11-20 00:50:14,319][WARN ][transport.netty ] [Blue Streak]
Message not fully read (response) for [313] handler org.elasticsearch.
search.action.SearchServiceTransportAction$6@27f233b2, error [false],
resetting

On debugging, I found that a NPE is thrown by the following line:
InternalAggregation aggregation = AggregationStreams.stream(type).
readResult(in);
in InternalAggregations.java.

Seems like I've not registered streams for my custom aggregation and I am
not able to figure a way to do it. FYI, I do not have any module defined
for my custom aggregation. I am simply adding a custom AggregatorParser in
onModule(AggregationModule) method. This method is defined in a class that
extends AbstractPlugin. Can you help me register streams for my custom
aggregation?

For further clarity, I'll mention the code structure of my custom
aggregation below:

public class CustomPlugin extends AbstractPlugin {
...
public static void onModule(AggregationModule aggregationModule) {
aggregationModule.addAggregatorParser(CustomAggregationParser.
class);
}
...
}

class InternalCustomAggregation extends InternalNumericMetricsAggregation.
MultiValue implements CustomAggregation {
...
public static final AggregationStreams.Stream STREAM = new
AggregationStreams.Stream() {
@Override
public InternalCustomAggregation readResult(StreamInput in) throws
IOException {
InternalCustomAggregation result = new
InternalCustomAggregation();
result.readFrom(in);
return result;
}
};

public static void registerStreams() {
    AggregationStreams.registerStream(STREAM, TYPE.stream());
}
...

}

interface CustomAggregation extends Aggregation {
...
}

class CustomAggregator extends NumericMetricsAggregator.MultiValue {
...
}

class CustomAggregationBuilder extends
ValuesSourceMetricsAggregationBuilder {
...
}

class CustomAggregationParser implements Aggregator.Parser {
...
}

I could not find a way to get the method
InternalPayloads.registerStreams() called by ES for registering streams for
my custom aggregation. Is there something that I am missing here? Let me
know if you need more information.

--
You received this message because you are subscribed to the Google Groups "elasticsearch" group.
To unsubscribe from this group and stop receiving emails from it, send an email to elasticsearch+unsubscribe@googlegroups.com.
To view this discussion on the web visit https://groups.google.com/d/msgid/elasticsearch/ae0d44d7-c5fe-4cc5-bb49-d58563ff7748%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.