How to access a nested event using painless script in conf file?

I am having netflow data and some fields need to be aggregated for a certain time.
The config file which i have created is following.

output{
        elasticsearch {
            index => "vunet-1-1-netflowaggr"
            action => update
            document_id => "%{unique_id}"
            document_type => "doc"
            scripted_upsert => true
            script_lang => "painless"
            script_type => "inline"
        script => "
            if (ctx.op == 'create') {
                ctx._source = params.event;
                ctx._source['out_bytes'] = params.event.get('[message][netflow][out_bytes]');
            } else {
                if (params.event.get('[message][netflow][direction]') == 1) {
                    ctx._source['netflow']['out_bytes'] += params.event.get('[netflow][out_bytes]');
                    ctx._source['netflow']['out_pkts'] += params.event.get('[netflow][out_pkts]');
                } else {
                    ctx._source['netflow']['in_bytes'] += params.event.get('[netflow][in_bytes]');
                    ctx._source['netflow']['in_pkts'] += params.event.get('[netflow][in_pkts]');
                }
            }
                "
        }
[2020-03-18T17:15:23,677][DEBUG][o.e.a.b.TransportShardBulkAction] [vunet-1-1-netflowaggr-2020.03.18][1] failed to execute bulk item (update) BulkShardRequest [[vunet-1-1-netflowaggr-2020.03.18][1]] containing [19] requests
java.lang.IllegalArgumentException: failed to execute script
	at org.elasticsearch.action.update.UpdateHelper.executeScript(UpdateHelper.java:308) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.update.UpdateHelper.prepareUpdateScriptRequest(UpdateHelper.java:268) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.update.UpdateHelper.prepare(UpdateHelper.java:97) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.update.UpdateHelper.prepare(UpdateHelper.java:77) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.bulk.TransportShardBulkAction.executeUpdateRequestOnce(TransportShardBulkAction.java:343) ~[elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.bulk.TransportShardBulkAction.executeUpdateRequest(TransportShardBulkAction.java:406) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.bulk.TransportShardBulkAction.executeBulkItemRequest(TransportShardBulkAction.java:240) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.bulk.TransportShardBulkAction.performOnPrimary(TransportShardBulkAction.java:123) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:110) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.bulk.TransportShardBulkAction.shardOperationOnPrimary(TransportShardBulkAction.java:72) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:1033) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:1011) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.support.replication.ReplicationOperation.execute(ReplicationOperation.java:104) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.onResponse(TransportReplicationAction.java:358) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.onResponse(TransportReplicationAction.java:298) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.support.replication.TransportReplicationAction$1.onResponse(TransportReplicationAction.java:974) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.support.replication.TransportReplicationAction$1.onResponse(TransportReplicationAction.java:971) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.index.shard.IndexShardOperationPermits.acquire(IndexShardOperationPermits.java:238) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.index.shard.IndexShard.acquirePrimaryOperationPermit(IndexShard.java:2211) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.support.replication.TransportReplicationAction.acquirePrimaryShardReference(TransportReplicationAction.java:983) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.support.replication.TransportReplicationAction.access$500(TransportReplicationAction.java:97) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.doRun(TransportReplicationAction.java:319) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:294) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryOperationTransportHandler.messageReceived(TransportReplicationAction.java:281) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.transport.RequestHandlerRegistry.processMessageReceived(RequestHandlerRegistry.java:66) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.transport.TransportService$7.doRun(TransportService.java:652) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:637) [elasticsearch-6.1.2.jar:6.1.2]
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.1.2.jar:6.1.2]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_201]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_201]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_201]
Caused by: org.elasticsearch.script.ScriptException: runtime error
	at org.elasticsearch.painless.PainlessScript.convertToScriptException(PainlessScript.java:101) ~[?:?]
	at org.elasticsearch.painless.PainlessScript$Script.execute(
            if (ctx.op == 'create') {
                ctx._source = params.event;
            } else {
	        if (params.event.get('[netflow][direction]') == 1) {
	            ctx._source['netflow.out_bytes'] += params.event.get('[netflow].[out_bytes]') ...:498) ~[?:?]
	at org.elasticsearch.painless.ScriptImpl.run(ScriptImpl.java:105) ~[?:?]
	at org.elasticsearch.action.update.UpdateHelper.executeScript(UpdateHelper.java:305) ~[elasticsearch-6.1.2.jar:6.1.2]
	... 31 more
Caused by: java.lang.NullPointerException
	at org.elasticsearch.painless.DefMath.add(DefMath.java:425) ~[?:?]
	at org.elasticsearch.painless.PainlessScript$Script.execute(
            if (ctx.op == 'create') {
                ctx._source = params.event;
            } else {
	        if (params.event.get('[netflow][direction]') == 1) {
	            ctx._source['netflow.out_bytes'] += params.event.get('[netflow].[out_bytes]') ...:437) ~[?:?]
	at org.elasticsearch.painless.ScriptImpl.run(ScriptImpl.java:105) ~[?:?]
	at org.elasticsearch.action.update.UpdateHelper.executeScript(UpdateHelper.java:305) ~[elasticsearch-6.1.2.jar:6.1.2]
	... 31 more

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.