Use java stream in ScoreFunction plugins


#1

Hi,

I am trying to develop ScoreFunction plugin for elasticsearch 6.2.3.

there is a matrix calculation between each doc and query input . To speed up the calculation, the idea is using java stream api.

but it can't work because the follow exception

Caused by: org.elasticsearch.search.query.QueryPhaseExecutionException: Query Failed [Failed to execute main query]
        at org.elasticsearch.search.query.QueryPhase.execute(QueryPhase.java:290) ~[elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.search.query.QueryPhase.execute(QueryPhase.java:107) ~[elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.search.SearchService.loadOrExecuteQueryPhase(SearchService.java:307) ~[elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.search.SearchService.executeQueryPhase(SearchService.java:340) ~[elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:316) [elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.search.SearchService$2.onResponse(SearchService.java:312) [elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.search.SearchService$3.doRun(SearchService.java:1002) [elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.common.util.concurrent.ThreadContext$ContextPreservingAbstractRunnable.doRun(ThreadContext.java:672) [elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.common.util.concurrent.TimedRunnable.doRun(TimedRunnable.java:41) [elasticsearch-6.2.3.jar:6.2.3]
        at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) [elasticsearch-6.2.3.jar:6.2.3]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1167) [?:?]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:641) [?:?]
        at java.lang.Thread.run(Thread.java:844) [?:?]
Caused by: java.security.AccessControlException: access denied ("org.elasticsearch.ThreadPermission" "modifyArbitraryThreadGroup")
        at java.security.AccessControlContext.checkPermission(AccessControlContext.java:472) ~[?:?]
        at java.security.AccessController.checkPermission(AccessController.java:895) ~[?:?]
        at java.lang.SecurityManager.checkPermission(SecurityManager.java:558) ~[?:?]
        at org.elasticsearch.SecureSM.checkThreadGroupAccess(SecureSM.java:196) ~[securesm-1.2.jar:6.2.3]
        at org.elasticsearch.SecureSM.checkAccess(SecureSM.java:143) ~[securesm-1.2.jar:6.2.3]
        at java.lang.ThreadGroup.checkAccess(ThreadGroup.java:313) ~[?:?]
        at java.lang.Thread.init(Thread.java:433) ~[?:?]
        at java.lang.Thread.init(Thread.java:388) ~[?:?]
        at java.lang.Thread.<init>(Thread.java:639) ~[?:?]
        at java.util.concurrent.ForkJoinWorkerThread.<init>(ForkJoinWorkerThread.java:111) ~[?:?]
        at java.util.concurrent.ForkJoinWorkerThread$InnocuousForkJoinWorkerThread.<init>(ForkJoinWorkerThread.java:223) ~[?:?]
        at java.util.concurrent.ForkJoinPool$InnocuousForkJoinWorkerThreadFactory$1.run(ForkJoinPool.java:3283) ~[?:?]
        at java.util.concurrent.ForkJoinPool$InnocuousForkJoinWorkerThreadFactory$1.run(ForkJoinPool.java:3281) ~[?:?]
        at java.security.AccessController.doPrivileged(Native Method) ~[?:?]
        at java.util.concurrent.ForkJoinPool$InnocuousForkJoinWorkerThreadFactory.newThread(ForkJoinPool.java:3280) ~[?:?]
        at java.util.concurrent.ForkJoinPool.createWorker(ForkJoinPool.java:1317) ~[?:?]
        at java.util.concurrent.ForkJoinPool.tryAddWorker(ForkJoinPool.java:1341) ~[?:?]
        at java.util.concurrent.ForkJoinPool.signalWork(ForkJoinPool.java:1467) ~[?:?]
        at java.util.concurrent.ForkJoinPool.externalPush(ForkJoinPool.java:1937) ~[?:?]
        at java.util.concurrent.ForkJoinTask.fork(ForkJoinTask.java:693) ~[?:?]
        at java.util.stream.AbstractTask.compute(AbstractTask.java:313) ~[?:?]
        at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:747) ~[?:?]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:283) ~[?:?]
        at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:395) ~[?:?]
        at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:725) ~[?:?]
        at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(ReduceOps.java:919) ~[?:?]
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233) ~[?:?]
        at java.util.stream.ReferencePipeline.reduce(ReferencePipeline.java:491) ~[?:?]
        at com.gm.es.plugins.matrix.PairWiseScore.streamScore(PairWiseScore.java:19) ~[?:?]

The code in PairWiseScore.java is

Stream<Number[]> st = Arrays.stream(a).parallel();
return st.map(p -> score(p, b, threshold)).reduce((x, y) -> x + y).get();

It seems that the stream api will create thread, but this is forbidden in the elasticsearch plugin. After looking closely at the exception stack, I noticed that the stream api internally calls AccessController.doPrivileged in 'ForkJoinPool.java:3280', so i just add plugin-security.policy in plugins folder, and plugin-descriptor.properties is in the same directory.

//plugin-security.policy
grant {
  permission org.elasticsearch.ThreadPermission "modifyArbitraryThreadGroup";
  permission org.elasticsearch.ThreadPermission "modifyArbitraryThread";
};

But the problem remains, me get same exception stack.
Any help would be appreciated...


(system) #2

This topic was automatically closed 28 days after the last reply. New replies are no longer allowed.