JVM Plugin - sharing a file between nodes

Hi all,

I'm currently working on a JVM plugin for Elasticsearch which allows users to upload a file to Elasticsearch, which then is used to update an analyzer. Since the request is handled by one of the nodes in the cluster, the data uploaded will only exist on that specific node. Since I want the file to be processed by all the nodes, I need a way of getting that uploaded file to all nodes.

I was wondering what the best approach is to get my file to all nodes in the cluster. My options:

1: Execute the upload call to all nodes in the cluster separately
2: Use the internal Client inside my restHandler (which extends BaseRestHandler) to get the cluster state, parse the location of the other nodes and replay the request to all the remaining nodes
3: ???

Any thoughts? :smiley:

Kind regards,

edit 1: Moved from Elasticsearch to Development

Why not store it in an index in ES?

You should look into writing a transport action. Such an action can receive a request and distribute it to the nodes of an index, or even to all cluster nodes (which is called a broadcast action).

If the analyzer configuration request is a file, you can - beside indexing the data - either save the file to the path.logs directory (which is writable) or you could store the file into the cluster state (which I do not recommend). Usually files are not useful at all at server side, the sync and update management of them is prone to errors. I recommend keeping the file at client side, and execute the action immediately in memory as necessary. If files get lost, just repeat the client action to configure the analyzer.

Dear Mark, Jörg

Thank you for your replies.

Actually, after writing this I started to store the contents of the file in an index. I wanted to use this so that new instances joining, or restarting nodes can read all contents from the index so that I don't have to upload the file again to each new instance.
I could leverage your suggestion @jprante to notify all nodes to read the new file from the index so that all nodes have the same version of my file. Thanks for that tip, do you have any referecences to code/documentation on this part?

I want my plugin to read the index during startup and pull everything in memory, but it seems that I cannot access indices during plugin initialization. Is there a way to detect whether a node has finished starting up? (already tried cluster health requests, but it seems that plugin initialization is done before the actual node starting up)

Ahhh I've managed to figure things out regarding the last part of my last post.

I've added a service which spawns a thread which waits for the cluster to be online, then retrieve the contents of the index and put them in memory.

The only thing to be added now is a transportAction in order to let other nodes in the cluster know that my file has been updated and that every node should read the latest version.

Is there any documentation on this part (next to checking other plugins in Github)??

One other way would be to keep the service alive and check for a new version of the file inside the index on an interval. Not sure what the better way would be, any thoughts? :slight_smile:

Don't send commands "to let nodes know when to read a file". Just send the file to the nodes with a broadcast action. You can connect the broadcast action to HTTP, for simple PUT or POST of a binary/text file.

See TransportBroadcastAction to address all the nodes that hold the shards which are part of an index

or see TransportNodesAction to address all nodes, without index context

You can implement in a plugin to wait for the first initial cluster state by using InitialStateListener, did you do that?

Groovy example

class DemoService extends AbstractLifecycleComponent<DemoService> {

   ...

    void doStart() {
        Executors.newSingleThreadExecutor().execute(new Runnable() {
            @Override
            void run() {
               // plugins may run before Discovery is initialized so we refer it from the injector now (ugly)
                Discovery discovery = injector.getInstance(Discovery)
                if (discovery != null) {
                    InitialStateListener listener = new InitialStateListener()
                    discovery.addListener(listener)
                    listener.waitForInitialState(settings.getAsTime(Constants.INITIAL_STATE_TIMEOUT_PARAMETER, TimeValue.timeValueSeconds(30)))
                    discovery.removeListener(listener)
                } else {
                    logger.info('no discovery')
                }
                ...

Thanks! I will look into the TransportNodesAction since I don't need any index context.

I created a service which spawns a thread with the following check:

public class TestingPluginService extends AbstractLifecycleComponent<TestingPluginService>

    ....
        protected void doStart() {
            thread = EsExecutors.daemonThreadFactory(settings, "testing").newThread(new Runnable() {
                        @Override
                        public void run() {
                            while (!closed) {
                                DiscoveryNode node = clusterService.localNode();
                                boolean started = clusterService.lifecycleState().equals(Lifecycle.State.STARTED);

                                if (node != null && started) {
                                    #doStuff();
                                    stop();
                                }

                                try {
                                    Thread.sleep(TimeValue.timeValueSeconds(5).millis());
                                } catch (InterruptedException ignored) {
                                }
                            }
                        }
                    });
        }

This works pretty well, but not sure if InitialStateListener is preferred?

Thanks again,

InitialStateListener.waitForInitialState() waits for the initial cluster state which is sent from master. That is, after this method returns, there is a cluster formed, a master elected, and all indices are found (but not necessarily recovered).

What you do with lifecycleState() is not related to sane master and sane cluster startup. It just means a local component on this node has run the start() method - here, the cluster service. But the cluster service is started long time before the join of the node to the cluster, master election handshake, and receiving the first cluster state.

The busy loop with Thread.sleep(5L) is very ugly and not required. As a side note, Thread.sleep() resolution depends on system timers and OS schedulers, so 5ms is way too short.

Thanks for your quick reply @jprante,

Thanks for the extra explanation, I will modify my code to reflect your suggestions :slight_smile:
The sleep is at 5 seconds now, but I agree sleeping should be avoided if possible.

Is it correct that InitialStateListener is a private static class within DiscoveryService? Would it be enough to create my own listener which implements InitialStateDiscoveryListener?