Why does an enrich policy get executed on the master node?

I'm trying to create a geo_match enrich policy based on a not-so-big source index (10GB). The match field is a geo_shape and there are about half a dozen single-term enrich fields. So nothing extraordinary.

When the index was a little smaller, the policy would get created without any problem. The index has grown a little bit and I'm now trying to re-execute the enrich policy and it fails.

What I noticed is that the enrich policy is actually executed on the master node which has only 1GB of heap (compared to data nodes which have 31GB of heap).

So at some point during the execution, the process just stops and the master node crashes. From the master node logs, I can get the following information:

fatal error in thread [elasticsearch[instance-0000000040][management][T#4]], exiting
java.lang.OutOfMemoryError: Java heap space
at java.nio.file.Files.newBufferedReader(Files.java:2917) ~[?:?]
at java.nio.file.Files.readAllLines(Files.java:3396) ~[?:?]
at java.nio.file.Files.readAllLines(Files.java:3436) ~[?:?]
at org.elasticsearch.monitor.os.OsProbe.readProcSelfCgroup(OsProbe.java:297) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.monitor.os.OsProbe.getControlGroups(OsProbe.java:252) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.monitor.os.OsProbe.getCgroup(OsProbe.java:525) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.monitor.os.OsProbe.osStats(OsProbe.java:659) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.monitor.os.OsService$OsStatsCache.refresh(OsService.java:70) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.monitor.os.OsService$OsStatsCache.refresh(OsService.java:63) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.common.util.SingleObjectCache.getOrRefresh(SingleObjectCache.java:54) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.monitor.os.OsService.stats(OsService.java:60) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.node.NodeService.stats(NodeService.java:120) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction.nodeOperation(TransportNodesStatsAction.java:72) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.action.admin.cluster.node.stats.TransportNodesStatsAction.nodeOperation(TransportNodesStatsAction.java:38) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.action.support.nodes.TransportNodesAction.nodeOperation(TransportNodesAction.java:158) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.action.support.nodes.TransportNodesAction$NodeTransportHandler.messageReceived(TransportNodesAction.java:271) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.action.support.nodes.TransportNodesAction$NodeTransportHandler.messageReceived(TransportNodesAction.java:267) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler$1.doRun(SecurityServerTransportInterceptor.java:257) ~[?:?]
at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.common.util.concurrent.EsExecutors$DirectExecutorService.execute(EsExecutors.java:224) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler.lambda$messageReceived$0(SecurityServerTransportInterceptor.java:306) ~[?:?]
at org.elasticsearch.xpack.security.transport.SecurityServerTransportInterceptor$ProfileSecuredRequestHandler$$Lambda$5058/0x00000008016bd990.accept(Unknown Source) ~[?:?]
at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.xpack.security.authz.AuthorizationService.lambda$authorizeAction$4(AuthorizationService.java:263) ~[?:?]
at org.elasticsearch.xpack.security.authz.AuthorizationService$$Lambda$5253/0x00000008018ac4a8.accept(Unknown Source) ~[?:?]
at org.elasticsearch.xpack.security.authz.AuthorizationService$AuthorizationResultListener.onResponse(AuthorizationService.java:641) ~[?:?]
at org.elasticsearch.xpack.security.authz.AuthorizationService$AuthorizationResultListener.onResponse(AuthorizationService.java:616) ~[?:?]
at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43) ~[elasticsearch-7.10.0.jar:7.10.0]
at org.elasticsearch.xpack.security.authz.RBACEngine.authorizeClusterAction(RBACEngine.java:153) ~[?:?]
at org.elasticsearch.xpack.security.authz.AuthorizationService.authorizeAction(AuthorizationService.java:265) ~[?:?]
at org.elasticsearch.xpack.security.authz.AuthorizationService.maybeAuthorizeRunAs(AuthorizationService.java:248) ~[?:?]
at org.elasticsearch.xpack.security.authz.AuthorizationService.lambda$authorize$1(AuthorizationService.java:212) ~[?:?]
fatal error
	at org.elasticsearch.ExceptionsHelper.lambda$maybeDieOnAnotherThread$4(ExceptionsHelper.java:294)
	at java.base/java.util.Optional.ifPresent(Optional.java:176)
	at org.elasticsearch.ExceptionsHelper.maybeDieOnAnotherThread(ExceptionsHelper.java:284)
	at org.elasticsearch.transport.netty4.Netty4MessageChannelHandler.exceptionCaught(Netty4MessageChannelHandler.java:81)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
	at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
	at io.netty.handler.logging.LoggingHandler.exceptionCaught(LoggingHandler.java:205)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:281)
	at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:273)
	at io.netty.handler.ssl.SslHandler.exceptionCaught(SslHandler.java:1136)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:302)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:752)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
	at io.netty.handler.logging.LoggingHandler.flush(LoggingHandler.java:295)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:742)
	at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:728)
	at org.elasticsearch.transport.netty4.Netty4MessageChannelHandler.doFlush(Netty4MessageChannelHandler.java:180)
	at org.elasticsearch.transport.netty4.Netty4MessageChannelHandler.flush(Netty4MessageChannelHandler.java:115)
	at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:750)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:790)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:758)
	at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1020)
	at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:299)
	at org.elasticsearch.transport.netty4.Netty4TcpChannel.sendMessage(Netty4TcpChannel.java:146)
	at org.elasticsearch.transport.OutboundHandler.internalSend(OutboundHandler.java:133)
	at org.elasticsearch.transport.OutboundHandler.sendMessage(OutboundHandler.java:125)
	at org.elasticsearch.transport.OutboundHandler.sendResponse(OutboundHandler.java:105)
	at org.elasticsearch.transport.TcpTransportChannel.sendResponse(TcpTransportChannel.java:63)
	at org.elasticsearch.transport.TaskTransportChannel.sendResponse(TaskTransportChannel.java:52)
	at org.elasticsearch.action.support.ChannelActionListener.onResponse(ChannelActionListener.java:43)
	at org.elasticsearch.action.support.ChannelActionListener.onResponse(ChannelActionListener.java:27)
	at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43)
	at org.elasticsearch.action.ActionListener$2.onResponse(ActionListener.java:89)
	at org.elasticsearch.action.ActionListener.completeWith(ActionListener.java:336)
	at org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction.masterOperation(TransportClusterStateAction.java:100)
	at org.elasticsearch.action.admin.cluster.state.TransportClusterStateAction.masterOperation(TransportClusterStateAction.java:48)
	at org.elasticsearch.action.support.master.TransportMasterNodeAction.masterOperation(TransportMasterNodeAction.java:99)
	at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.lambda$doStart$3(TransportMasterNodeAction.java:166)
	at org.elasticsearch.action.ActionRunnable$2.doRun(ActionRunnable.java:73)
	at org.elasticsearch.common.util.concurrent.AbstractRunnable.run(AbstractRunnable.java:37)
	at org.elasticsearch.common.util.concurrent.EsExecutors$DirectExecutorService.execute(EsExecutors.java:224)
	at org.elasticsearch.action.support.master.TransportMasterNodeAction$AsyncSingleAction.doStart(TransportMasterNodeAction.java:166)
	at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:115)
	at org.elasticsearch.action.support.master.TransportMasterNodeAction.doExecute(TransportMasterNodeAction.java:59)
	at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:179)
	at org.elasticsearch.action.support.ActionFilter$Simple.apply(ActionFilter.java:53)
	at org.elasticsearch.action.support.TransportAction$RequestFilterChain.proceed(TransportAction.java:177)
	at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.lambda$apply$0(SecurityActionFilter.java:87)
	at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
	at org.elasticsearch.xpack.security.action.filter.SecurityActionFilter.lambda$authorizeRequest$4(SecurityActionFilter.java:173)
	at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
	at org.elasticsearch.xpack.security.authz.AuthorizationService.lambda$authorizeAction$4(AuthorizationService.java:263)
	at org.elasticsearch.xpack.security.authz.AuthorizationService$AuthorizationResultListener.onResponse(AuthorizationService.java:641)
	at org.elasticsearch.xpack.security.authz.AuthorizationService$AuthorizationResultListener.onResponse(AuthorizationService.java:616)
	at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43)
	at org.elasticsearch.xpack.security.authz.RBACEngine.authorizeClusterAction(RBACEngine.java:153)
	at org.elasticsearch.xpack.security.authz.AuthorizationService.authorizeAction(AuthorizationService.java:265)
	at org.elasticsearch.xpack.security.authz.AuthorizationService.maybeAuthorizeRunAs(AuthorizationService.java:248)
	at org.elasticsearch.xpack.security.authz.AuthorizationService.lambda$authorize$1(AuthorizationService.java:212)
	at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
	at org.elasticsearch.action.support.ContextPreservingActionListener.onResponse(ContextPreservingActionListener.java:43)
	at org.elasticsearch.xpack.security.authz.RBACEngine.lambda$resolveAuthorizationInfo$1(RBACEngine.java:126)
	at org.elasticsearch.action.ActionListener$1.onResponse(ActionListener.java:63)
	at org.elasticsearch.xpack.security.authz.store.CompositeRolesStore.roles(CompositeRolesStore.java:159)
	at org.elasticsearch.xpack.security.authz.store.CompositeRolesStore.getRoles(CompositeRolesStore.java:276)
	at org.elasticsearch.xpack.security.authz.RBACEngine.getRoles(RBACEngine.java:132)
	at ...

What I'd like to find out is WHY the master node is actually responsible for executing enrich policies given that some source indexes might be much bigger and master nodes don't have much heap and are not supposed to do any heavy lifting in the first place.

Thanks in advance for your insights

Would you share the output of GET _cat/nodes?

Sure, here it is

ip          heap.percent ram.percent cpu load_1m load_5m load_15m node.role master name
10.0.25.168           45          83   5    0.83    1.36     1.63 mr        -      instance-0000000038
10.0.12.26            60         100  44   12.38    8.35     6.78 cdhirstw  -      instance-0000000042
10.0.21.234           66         100  68    9.04    7.40     7.27 cdhirstw  -      instance-0000000066
10.0.17.89            57         100  91    7.48    6.63     6.91 cdhirstw  -      instance-0000000055
10.0.33.86            86         100  39    6.60    7.19     6.82 cdhirstw  -      instance-0000000062
10.0.36.66            34          89   5    2.87    2.80     2.23 mr        *      instance-0000000039
10.0.42.222           84         100 100   16.01   10.46    10.48 cdhirstw  -      instance-0000000058
10.0.45.35            54         100  71    9.12    8.23     7.49 cdhirstw  -      instance-0000000049
10.0.17.219           61         100  87    2.95    3.06     2.61 cdhirstw  -      instance-0000000059
10.0.10.160           64         100  64    5.04    8.52     6.93 cdhirstw  -      instance-0000000041
10.0.12.51            28          80  10    4.41    3.70     3.39 mr        -      instance-0000000040
10.0.6.43             71         100  66    5.79    6.33     5.81 cdhirstw  -      instance-0000000043

Hmm, ok, it's not an ingest node, that would have been too simple :slight_smile:

What evidence do you have that the master node is executing an enrich policy? The stack trace from the OOM you shared doesn't indicate any enrich-related activity (not that stack traces from OOMs are very helpful anyway).

On the first line of the logs you can see that the OOM occurs on instance-0000000040 which was the elected master node at the time. I've ran this a few times to make sure and on each run I can see that the elected master node crashes consistently.

Another thing worth noting: I've successfully executed another enrich policy (of type match) from a source index that was 600GB big and 300M records and it ran smoothly.

The source index that makes the enrich process crash is only 10GB big and has only a mere 76K records, so it doesn't seem to be a question of size.

Can you share the heap dump from the crash? I can arrange a private upload space if needed.

Unfortunately, it's on ES Cloud, so I don't have any way to get that...

Regarding the logs, I could find this just before the OOM crash (without the timestamps, but that's irrelevant)

Policy [zone-school-policy]: Running enrich policy
creating index, cause [api], templates [], shards [1]/[0]
Cluster health status changed from [YELLOW] to [GREEN] (reason: [shards started [[.enrich-zone-school-policy-1612442793467][0]]]).
... then a few GC overhead messages...
handling inbound transport message [InboundMessage{Header{315638562}{7.10.0}{5997610}{false}{false}{false}{false}{NO_ACTION_NAME_FOR_RESPONSES}}] took [5105ms] which is above the warn threshold of [5000ms]
fatal error in thread [QuotaAwareFSTimer-0], exiting
fatal error in thread [elasticsearch[instance-0000000040][management][T#4]], exiting
fatal error in thread [elasticsearch[scheduler][T#1]], exiting
fatal error in thread [elasticsearch[instance-0000000040][management][T#2]], exiting
fatal error in thread [elasticsearch[scheduler][T#1]], exiting
... then the OOM...

Ok, would you open a support ticket and ask for us to enable the capture of a heap dump on OOM, and then once they've done that you'll need to trigger another crash and then let us know to go and look for the heap dump. It sounds like an Elasticsearch bug to go OOM under these conditions, although I'm not sure the mechanism that causes it is exactly as you describe.

Just refreshing, otherwise the topic will get closed and it's still being investigated by Elastic support.
I'll delete this comment after the next answer

Hi @val

A couple thoughts / questions, while we wait?

What version stack?

As you I would not expect the enrich policy to run on a master node, can you remove it and re-add the enrich policy? and see if you have the same problem?

And then run

GET _enrich/_stats
GET _cat/nodes?h=n,r,id

And compare against the nodes.

I created an ESS 7.11.1 Cluster with 6 Data / Ingest / Coord nodes + 3 Dedicated Masters, the enrich policy only executes on 6 Data Nodes.

I also tested an ESS 7.11.1 Cluster with Ingest nodes and Data / Master Nodes and the policy only executes on the ingest nodes.

I can tell this by running

GET _enrich/_stats

GET _cat/nodes?h=n,r,id

I suspect there was some reason / bug / change that resulted executing the policy on the master, I do not expect that to be normal / designed behavior.

Curious what you find out, especially if you try to remove it an reapply / execute. The stats may remain / persist for the old nodes, not sure how to clear them.

As I mentioned above I also don't think the enrich policy is being executed on the master, but it's definitely having some kind of effect. I suspect a bug, but I'm not hopeful about being able to work out the details without a heap dump on which we're still waiting...

I don't suppose you can reproduce this in a test cluster on a local machine @val ?

Thank you @stephenb and @DavidTurner .

Another heap dump has been generated on March 2nd and the support team has followed up with the engineering team, but I haven't received any feedback yet. I'll update here as soon as I know.

Thanks for looking into this @stephenb
We're running version 7.10.0 on Elastic Cloud

I did remove the enrich policy and recreated it. Upon executing it and viewing the execution stats, I get the output below. The node ID corresponds to the current master node (i.e. instance-0000000040). I think this is because it's the one "coordinating" the task, but not necessarily the one on which the policy executes.

  "executing_policies" : [
    {
      "name" : "zone-school-policy",
      "task" : {
        "node" : "Bsk0iJs4Rc6nBFMChn-GGA",
        "id" : 13274653,
        "type" : "enrich",
        "action" : "policy_execution",
        "status" : {
          "phase" : "RUNNING"
        },
        "description" : "zone-school-policy",
        "start_time_in_millis" : 1615193241118,
        "running_time_in_nanos" : 39693487142,
        "cancellable" : false,
        "parent_task_id" : "Bsk0iJs4Rc6nBFMChn-GGA:13274652",
        "headers" : { }
      }
    }
  ],

Moments later, I get the following alter via email

A node in your cluster xyz (my-cluster) ran out of memory at 08:49 on March 08, 2021. As a precaution, we automatically restarted the node instance-0000000040 .

Also I think it is hard to reproduce the problem because other policies work well (i.e. without exploding anything). I noticed that normal term matching policies usually work ok, but this one is a geo matching one and works on a bigger amount of geo data... That might be related.

1 Like

Ohh ok I see now. Yes the master is indeed coordinating the reindex operation that creates the enrich index, which means it's fetching the docs from the source index in batches of 10000, which definitely could be a problem on a small master with large docs. This is my bad, I misunderstood "executing the enrich policy" to mean actually doing the lookups at indexing time hence my confusion because that definitely doesn't happen on the master.

Would you open a Github issue for this?

In the meantime if you don't want to run with larger masters you can adjust the enrich.fetch_size setting to change the batch size to something smaller. Unfortunately that is a node-wide setting that applies to all policies.

1 Like

Thank you so much @DavidTurner
Indeed, that's only a problem when executing the policy to build the enrich index, not at indexing time.

That fetch size would probably explain why that could be an issue. Since that setting cannot be updated dynamically, I'll have to find the right time to restart the cluster to change it, though.

I'm still interested to know what you find in the heap dump and if that concurs with a high value of enrich.fetch_size.

Also @DavidTurner would it make sense to have a circuit breaker like in search/indexing use cases in order to prevent the master node from going OOM? I might suggest that in the Github issue.

IMO the right solution is not to do the heavy lifting of this job on the master node at all, but that does introduce some complexity since the master is needed for some of the other steps, and making the execution process distributed introduces some new and exciting failure modes.

Tracking memory usage of reindexing in a circuit breaker is an interesting idea, possibly worth investigating further, but the end result for you would be that the enrich policy would simply fail with a CircuitBreakingException. That's better than killing the master for sure but still doesn't really help.

I think that preventing the master node from going OOM is already a good thing :slight_smile:

Moreover, the error message conveyed by the CircuitBreakingException being raised could hint at the enrich.fetch_size setting being too large, which would also be something positive

@DavidTurner @val

Thanks both of you I learned something today to be aware of, as I tend to use enrich processors frequently.

I have tons of them and that has never really been a problem for the past few years.... until the data volume was multiplied by a magnitude of ~10. Add complex geo_shapes to the mix and you've got the perfect recipe for OOM :wink: