2. dispynode (Server)

dispynode.py program should be running on each of the nodes (servers). It executes jobs for dispy clients; i.e., jobs submitted by JobCluster or SharedJobCluster. Usually no options are needed to run this program; ‘-d’ option may be useful to see log of jobs being executed.

The server sends node’s availability status (available CPU as percent, memory in bytes and disk space in bytes) to clients at pulse_interval as specified with JobCluster or option given to dispyscheduler (Shared Execution) (when used with SharedJobCluster) if psutil module is available. This availability information can be used by clients to monitor/analyze application performance, filter nodes based on available resources with customized NodeAllocate etc. The Monitor and Manage Cluster module also maintains and sends this information to web browsers so users can monitor availability status of nodes.

Below are various options to invoke dispynode program:

  • --save_config <file> saves configuration (i.e., options as given except for save_config) in given file and exits. This can be used to save the configuration on one node, copy that file over to all the other nodes and start dispynode with --config option to start dispynode with that configuration. If file is not given, the configuration is written to stdout.

    For example, dispynode.py -d --cpus -1 -s test --zombie_interval=10 --service_start 17:00 --service_stop 6:00 --serviece_end 8:00 --save_config /etc/dispynode.cfg stores given options in /etc/dispynode.cfg. This file can then be used in all the nodes to start dispynode with dispynode.py --config /etc/dispynode.cfg. Note that node specific configuration (e.g., IP address, name etc.) should not be given, as another node can’t be started with those options.

  • --config <file> reads configuration from given file (e.g., saved with save_config option).

  • -d enables debug messages that show trace of execution.

  • -c n or --cpus=n sets the number of processing units to n. Without this option, dispynode will use all the processing units available on that node. If n is positive, it must be at least 1 and at most number of processing units on that node; dispynode will then use at most n processors. If n is negative, then that many processing units are not used by dispynode.

  • -i addr or --ip_addr=addr directs dispynode to use given addr for communication. addr can be either host name or IP address in IPv4 or IPv6 format. If this option is not given, IP address associated with default host name is used.

  • --ext_ip_addr=addr directs dispynode to announce addr in network communication so that the node can be used if it is behind NAT firewall/gateway that is configured to use addr. See NAT/Firewall Forwarding below.

  • -p n or --node_port=n directs dispynode to use given port n instead of default port 51348.

  • --name=name associates given name to the node. If this option is not given, result of socket’s gethostname() is used as name.

  • -s secret or --secret=secret directs dispynode to use ‘secret’ for hashing handshake communication with dispy scheduler; i.e., this node will only work with clients that use same secret (see secret option to JobCluster and node_secret option to dispyscheduler (Shared Execution)).

  • --dest_path_prefix=path directs dispynode to use path as prefix for storing files sent by dispy scheduler. If a cluster uses dest_path option (when creating cluster with JobCluster or SharedJobCluster), then dest_path is appened to path prefix. With this, files from different clusters can be automatically stored in different directories, to avoid conflicts. Unless cleanup=False option is used when creating a cluster, dispynode will remove all files and directories created after the cluster is terminated.

  • --scheduler_node=addr: If the node is in the same network as the dispy scheduler or when no jobs are scheduled at the time dispynode is started, this option is not necessary. However, if jobs are already scheduled and scheduler and node are on different networks, the given addr is used for handshake with the scheduler.

  • --scheduler_port=n directs dispynode to use port n to communicate with scheduler. Default value is 51347. When using this option, make sure dispy scheduler is also directed to use same port.

  • --keyfile=path is path to file containing private key for SSL communication, same as ‘keyfile’ parameter to ssl.wrap_socket of Python ssl module. This key may be stored in ‘certfile’ itself, in which case this must be None (default). Same file must be used as keyfile parameter for JobCluster, or node_keyfile option with dispyscheduler (Shared Execution) (when SharedJobCluster is used).

  • --certfile=path is path to file containing SSL certificate, same as ‘certfile’ parameter to ssl.wrap_socket of Python ssl module. Same file must be used as certfile parameter for JobCluster, or node_certfile option with dispyscheduler (Shared Execution) (when SharedJobCluster is used).

  • --max_file_size n specifies maximum size of any file transferred from/to clients. If size of a file transferred exceeds n, the file will be truncated. n can be specified as a number >= 0, with an optional suffix letter k (indicating n kilobytes), m (n megabytes), g (n gigabytes) or t (n terrabytes). If n is 0 (default), there is no maximum limit.

  • --zombie_interval=n indicates dispynode to assume a scheduler is a zombie if there is no communication from it for n minutes. dispynode doesn’t terminate jobs submitted by a zombie scheduler; instead, when all the jobs scheduled are completed, the node frees itself from that scheduler so other schedulers may use the node.

  • --ping_interval=n is interval in seconds to send ping messages to discover schedulers. With the default value 0 dispynode doesn’t send such messages, except when dispynode is started. In addition to broadcasting UDP messages, dispynode will send TCP messages to the last scheduler (or for the first time the scheduler given with scheduler_node option) at given interval.

  • --clean indicates dispynode should remove any files saved from previous runs. dispynode saves any files sent by dispy clients and information about jobs’ execution results that couldn’t be sent to clients (because of network failures, clients crashed etc.). The cleaning is done once when the node is starting. If dispynode is left running for a long time, it may be advisable to periodically remove such files (perhaps files that were accessed before a certain time). Note that dispy sets the timestamps of files saved from dispy client computations to the timestamps on the clients, so modification times of such files may not be a good measure to know if the files are still in use.

  • --client_shutdown indicates that dispynode can be shutdown by client by calling dispynode_shutdown() in cleanup function.

  • --msg_timeout n specifies timeout value in seconds for socket I/O operations with the client / scheduler. The default value is 5 seconds. If the network is slow, this timeout can be increased. Bigger timeout values than necessary will cause longer delays in recognizing communication failures.

  • --service_start HH:MM, --service_stop HH:MM, --service_end HH:MM options allow service (executing jobs) only between those times (of day). HH:MM should be in 24 hour format. service_stop is optional and if given, the node stops accepting jobs at that time. Any jobs executing at service_end will be terminated (killed) so the clients should only submit reentrant computations if this feature is used. For example, if service_start is set to 17:00, service_stop set to 07:00 and service_end is set to 08:00, the node will execute jobs from 5PM, stop accepting new jobs at 7AM (next day), and kill any running jobs at 8AM. Then it will not accept any jobs until 5PM.

  • --serve n specifies maximum number of clients that can use the server. The default value of -1 implies no limit and any positive number causes dispynode to quit after running computations from that many clients.

    dispynode decrements number of clients left to run when all computations from a client are closed. With this, it is possible to run more computations than given n if the scheduler issues computations before curerntly running computations are closed. For example, if n is 1, scheduler from JobCluster can send second computation before closing first one; the node will accept second computation, and both computations will have access to same files. With SharedJobCluster, the client for node is dispyscheduler, so computations from different programs may be accepted by the node (until all computations are closed). See exclusive option for SharedJobCluster to prevent a node from being shared in more than one computation.

    serve option can be used with Containers to run each client’s computations in a new container.

  • --daemon option causes dispynode to not read from standard input, so dispynode can be run as background process, or started from (system startup) scripts. If this option is not given, dispynode prints menu of commands, and commands can be entered to get status and control dispynode.

2.1. Commands

If dispynode is started as non-daemon (i.e., not as background process), following commands can be given at the prompt:

  • “quit” or “exit” terminate dispynode, killing any running jobs.
  • “stop” accepts no new jobs but continues to execute currently running jobs.
  • “start” resumes accepting new jobs.
  • “release” checks if current scheduler (client) is active and, if not, close its computations (i.e., release node from current client so another client can use the node). Current scheduler is considered active if either node is currently running any jobs for it or it can communicate with the node. “release” command is useful if current cient has crashed or is not reachable due to network issues and zombie period is too long for dispynode to automaitcaly release itself from the client.
  • “cpus” followed by number of cpus to use to change the number of jobs running at any time. If given number is negative then that many cpus are not used; e.g., if a node has 8 CPUs and given number is -3, then dispynode runs up to 5 jobs at any time.
  • Any other input causes dispynode to show status, including number of computations finished, currently running computations, jobs and CPU time used.

2.2. Signal Handling

dispynode uses signals SIGINT and SIGABRT to process keyboard interrupt and termination. In POSIX (Linux, OS X etc.) these signals have no effect in user computations (the handler is available in dispynode parent process, but not in child processes). If user computations depend on these signals, then handlers must be setup for them. Sending SIGINT to dispynode (main) program will cause dispynode to quit the program after all currently running computations are finished.

2.3. NAT/Firewall Forwarding

As explained in dispy (Client) documentation, ext_ip_addr can be used in case dispynode is behind a NAT firewall/gateway and the NAT forwards UDP and TCP ports 51348 to the IP address where dispynode is running. Thus, assuming NAT firewall/gateway is at (public) IP address a.b.c.d, dispynode is to run at (private) IP address and NAT forwards UDP and TCP ports 51348 to, dispynode can be invoked as:

dispynode.py -i --ext_ip_addr a.b.c.d

If multiple dispynodes are needed behind a.b.c.d, then each must be started with different ‘port’ argument and those ports must be forwarded to nodes appropriately. For example, to continue the example, if is another node that can run dispynode, then it can be started on it as:

dispynode.py -i -p 51350 --ext_ip_addr a.b.c.d

and configure NAT to forward UDP and TCP ports 51350 to Then dispy client can use the nodes with:

cluster = JobCluster(compute, nodes=[('a.b.c.d', 51347), ('a.b.c.d', 51350)])

2.4. Containers

dispy islotates computation environment so that jobs from one computation don’t affect jobs from another computation, even if a node is shared and jobs from different computations are running simlutaneously. Usually, any files transferred and saved by jobs are also removed when computation is closed (the exception is when dest_path is given or if cleanup is False, when files may be left behind). However, the jobs have access to server’s file system so they can be security risk. It is possible to avoid (some) issues by creating special user with access only to specific path (e.g., with a chroot environment).

If complete isolation of computation is needed, containers such as Docker, LXC containers can be used. Each container runs a copy of small Linux distribution with its own file system; the container has no access to host file system (unless it is configured to). Instructions below describe are for building and using Docker containers; LXC images can be built by installing dispy into a container and copying the image to all nodes.

dispy now includes Dockerfile under data directory where dispy module is installed, which can be obtained with the program:

import os, dispy
print(os.path.join(os.path.dirname(dispy.__file__), 'data'))

Note that Docker runs on Linux host only; with other operating systems a guest VM can be used to run Linux under which Docker can be run. See Docker Machine and Docker Docs for more details.

To build an image with latest Ubuntu Linux and dispy, install docker if not already installed, create a temporary directory, say, /tmp/dispy-docker, change to that directory and copy Dockerfile from above to that directory. (The Dockerfile can be customized to suit any additional tools or setup needed.) Then execute docker build -t dispy . (note the dot at the end). Full list of instructions for building image for Python 2.7 (for Python 3 use appropriate path to where Dockerfile is installed) are:

mkdir /tmp/dispy-docker
cd /tmp/dispy-docker
cp /path/to/Dockerfile .
docker build -t dispy .

where /path/to/ is path obtained from Python snippet (two lines) above.

Once the image is built, a new container can be run with:

docker run --net=host -it dispy

to start dispynode.py (which is the default command for the image built above) with default options. --net=host runs container in host network mode, i.e., container uses host network configuration. See –save_config and –config options to dispynode to use same options across many runs. If these or any other options are needed, Dockerfile can be customized before building the image in the instructions above.

If each client run should be started in a new container (so that clients do not interfere with each other and start in the same environment using the image built above), then serve option can be used as:

while :; do
    docker run --net=host -it dispy dispynode.py --serve 1

This causes dispynode to accept computations only from one client until it closes computations. When all running computations from that client are closed, dispynode quits, which terminates container and because of while loop, a new container is started from the image.