Use java stream in ScoreFunction plugins

Hi,

I am trying to develop ScoreFunction plugin for elasticsearch 6.2.3.

there is a matrix calculation between each doc and query input . To speed up the calculation, the idea is using java stream api.

but it can't work because the follow exception

Caused by: org.elasticsearch.search.query.QueryPhaseExecutionException: Query Failed [Failed to execute main query]
        at org.elasticsearch.search.query.QueryPhase.execute(QueryPhase.java:290) ~[elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.search.query.QueryPhase.execute(QueryPhase.java:107) ~[elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.search.SearchService.loadOrExecuteQueryPhase(SearchService.java:307) ~[elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:340) ~[elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:316) [elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:312) [elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.search.SearchService$3.doRun(SearchService.java:1002) [elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:672) [elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:41) [elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.2.3.jar:6.2.3]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) [?:?]
        at java.lang.Thread.run(Thread.java:844) [?:?]
Caused by: java.security.AccessControlException: access denied ("org.elasticsearch.ThreadPermission" "modifyArbitraryThreadGroup")
        at java.security.AccessControlContext.checkPermission(AccessControlContext.java:472) ~[?:?]
        at java.security.AccessController.checkPermission(AccessController.java:895) ~[?:?]
        at java.lang.SecurityManager.checkPermission(SecurityManager.java:558) ~[?:?]
        at org.elasticsearch.SecureSM.checkThreadGroupAccess(SecureSM.java:196) ~[securesm-1.2.jar:6.2.3]
        at org.elasticsearch.SecureSM.checkAccess(SecureSM.java:143) ~[securesm-1.2.jar:6.2.3]
        at java.lang.ThreadGroup.checkAccess(ThreadGroup.java:313) ~[?:?]
        at java.lang.Thread.init(Thread.java:433) ~[?:?]
        at java.lang.Thread.init(Thread.java:388) ~[?:?]
        at java.lang.Thread.<init>(Thread.java:639) ~[?:?]
        at java.util.concurrent.ForkJoinWorkerThread.<init>(ForkJoinWorkerThread.java:111) ~[?:?]
        at java.util.concurrent.ForkJoinWorkerThread$InnocuousForkJoinWorkerThread.<init>(ForkJoinWorkerThread.java:223) ~[?:?]
        at java.util.concurrent.ForkJoinPool$InnocuousForkJoinWorkerThreadFactory$1.run(ForkJoinPool.java:3283) ~[?:?]
        at java.util.concurrent.ForkJoinPool$InnocuousForkJoinWorkerThreadFactory$1.run(ForkJoinPool.java:3281) ~[?:?]
        at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
        at java.util.concurrent.ForkJoinPool$InnocuousForkJoinWorkerThreadFactory.newThread(ForkJoinPool.java:3280) ~[?:?]
        at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1317) ~[?:?]
        at java.util.concurrent.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1341) ~[?:?]
        at java.util.concurrent.ForkJoinPool.signalWork(ForkJoinPool.java:1467) ~[?:?]
        at java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:1937) ~[?:?]
        at java.util.concurrent.ForkJoinTask.fork(ForkJoinTask.java:693) ~[?:?]
        at java.util.stream.AbstractTask.compute(AbstractTask.java:313) ~[?:?]
        at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:747) ~[?:?]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:283) ~[?:?]
        at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:395) ~[?:?]
        at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:725) ~[?:?]
        at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:919) ~[?:?]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:?]
        at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:491) ~[?:?]
        at com.gm.es.plugins.matrix.PairWiseScore.streamScore(PairWiseScore.java:19) ~[?:?]

The code in PairWiseScore.java is

Stream<Number[]> st = Arrays.stream(a).parallel();
return st.map(p -> score(p, b, threshold)).reduce((x, y) -> x + y).get();

It seems that the stream api will create thread, but this is forbidden in the elasticsearch plugin. After looking closely at the exception stack, I noticed that the stream api internally calls AccessController.doPrivileged in 'ForkJoinPool.java:3280', so i just add plugin-security.policy in plugins folder, and plugin-descriptor.properties is in the same directory.

//plugin-security.policy
grant {
  permission org.elasticsearch.ThreadPermission "modifyArbitraryThreadGroup";
  permission org.elasticsearch.ThreadPermission "modifyArbitraryThread";
};

But the problem remains, me get same exception stack.
Any help would be appreciated...

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.