3. dispyscheduler (Shared Execution)

As mentioned in dispy (Client) and dispy: Distributed and Parallel Computing with/for Python, if at most one cluster uses a node at any time, cluster can be created with JobCluster; scheduler included in JobCluster manages jobs and nodes. If, however, there is a need to use (share) nodes by more than one cluster at the same time (e.g., multiple client programs need to run simultaneously), then SharedJobCluster must be used (instead of JobCluster) to create cluster, and dispyscheduler.py must be running on a node. Usually no options are needed when invoking this program.

Note

The default port used by client (program that uses SharedJobCluster) is 51347, which is also used by dispyscheduler. So running client and dispyscheduler on same computer will cause one or the other to hang (without raising an error message). To use both client and dispyscheduler on same computer, first start dispyscheduler and then use either port=0 (which will use a random unused port) or a different port, such as, port=2345 to SharedJobCluster. Running dispyscheduler on a computer that also runs dispynode doesn’t need any special setup.

Below are various options to invoking dispyscheduler:

  • --save_config <file> saves configuration (i.e., options as given except for save_config) in given file and exits. This can be used to save the configuration once and use that file with --config option to start dispyscheduler repeatedly with that configuration. If file is not given, the configuration is written to stdout.

  • --config <file> reads configuration from given file (e.g., saved with save_config option).

  • -d enables debug messages that show trace of execution.

  • -n node1 -n node2 or --nodes node1 --nodes node2 etc. sends handshake messages to given nodes (host name or IP) when dispyscheduler starts. This is not needed if scheduler and nodes are on same network.

  • -i addr or --ip_addr=addr directs dispyscheduler to use given addr for communication. addr can be either host name or IP address in IPv4 or IPv6 format. If this option is not given, IP address associated with default host name is used.

  • --ext_ip_addr=addr directs dispyscheduler to announce addr in network communication so that the scheduler can be used if it is behind NAT firewall/gateway that is configured to use addr. See below.

  • -p n or --port=n directs dispyscheduler to use given port n instead of default port 51347 for UDP and TCP communication for job results.

  • --scheduler_port=n directs dispyscheduler to use given port n instead of default port 51349 for job scheduler.

  • --node_port=n directs dispyscheduler to use given port n instead of default port 51348 where dispynodes must be running.

  • --node_secret secret directs dispyscheduler to use ‘secret’ for hashing handshake communication with nodes; i.e., the scheduler will only use nodes that use same secret (see secret option to dispynode (Server)).

  • --cluster_secret secret directs dispyscheduler to use ‘secret’ for hashing handshake communication with clusters; i.e., the scheduler will only work with SharedJobCluster clients that use same secret (see secret option to JobCluster).

  • --node_keyfile path is file containing private key for SSL communication with nodes, same as ‘keyfile’ parameter to ssl.wrap_socket of Python ssl module. This key may be stored in ‘node_certfile’ itself, in which case this must be None (default). Same file must be used as --keyfile for dispynode (Server).

  • --node_certfile path is file containing SSL certificate with

    nodes, same as ‘certfile’ parameter to ssl.wrap_socket of Python ssl module. Same file must be used as --certfile for dispynode (Server).

  • --cluster_keyfile path is file containing private key for SSL communication with clusters, same as ‘keyfile’ parameter to ssl.wrap_socket of Python ssl module. This key may be stored in cluster_certfile itself, in which case this must be None. Same file must be used as keyfile parameter for SharedJobCluster.

  • --cluster_certfile path is file containing SSL certificate with clusters, same as ‘certfile’ parameter to ssl.wrap_socket of Python ssl module. Same file must be used as certfile parameter for SharedJobCluster.

  • --httpd option starts httpd server so that all clusters using the scheduler can be monitored and managed with a web browser. The clusters are shown with their names appended with client’s IP address as ‘@ <IP address>’.

  • --pulse_interval n directs nodes it controls to send pulse messages every n seconds; if a pulse message is not received within 5*n, then a node is presumed dead. In that case, if a cluster set ‘reentrant=True’, then jobs scheduled on that node will be migrated to other node(s) if possible; if reentrant=False, then jobs are automatically cancelled. n must be between 1 and 600.

  • --ping_interval is number of seconds. Normally dispyscheduler can locate nodes running dispynode by broadcasting UDP ping messages on local network and point-to-point UDP messages to nodes on remote networks. However, UDP messages may get lost. Ping interval is number of seconds between repeated ping messages to find any nodes that have missed previous ping messages.

  • --zombie_interval=n indicates dispyscheduler to assume a client or a node is a zombie if there is no communication from it for n minutes. If a client is determined to be zombie, the computation is deleted. A node is determined to be zombie, then jobs running on it are rescheduled or cancelled, as per pulse_interval option.

  • --max_file_size n specifies maximum size of any file transferred from clients. If size of a file transferred exceeds n, the file will be truncated. n can be specified as a number >= 0, with an optional suffix letter k (indicating n kilobytes), m (n megabytes), g (n gigabytes) or t (n terrabytes). If n is 0 (default), there is no maximum limit.

  • --cooperative specifies that the client(s) can update CPUs of node(s). The CPUs can also be updated by clusters that are marked exclusive. (If neither cooperative option is given nor computation is marked exclusive, the scheduler won’t change the CPUs, as this may prevent other computations from scheduling jobs.) If node CPUs are changed by exclusive cluster and cooperative option is not used with dispyscheduler, the CPUs are reset to how they were before that cluster started. If CPUs are changed due to cooperative option, though, it is up to client clusters to cooperate and set/reset CPUs.

  • --cleanup_nodes sets computation’s cleanup to True if it was False, so the nodes always cleanup (i.e., remove any files transferred or generated by that computation) after it is done. If this option is not given, then nodes will leave behind files used by computations that set cleanup to *False, which may take up disk space (that should be cleaned up manually or through external programs).

  • --clean option causes dispyscheduler to remove any files saved from previous runs. dispyscheduler saves any files sent by dispy clients and information about jobs’ execution results that couldn’t be sent to clients (because of network failures, clients crashed etc.). The cleaning is done once when the scheduler is starting. If dispyscheduler is left running for a long time, it may be advisable to periodically remove such files (perhaps files that were accessed before a certain time).

  • --msg_timeout n specifies timeout value in seconds for socket I/O operations with the client / scheduler. The default value is 5 seconds. If the network is slow, this timeout can be increased. Bigger timeout values than necessary will cause longer delays in recognizing communication failures.

  • --daemon option causes dispyscheduler to not read from standard input, so dispyscheduler can be run as background process, or started from (system startup) scripts. If this option is not given, dispyscheduler prints menu of commands, and commands can be entered to get status and control dispyscheduler.

There are currently 3 job scheduling algorithms in dispyscheduler. These schedulers all first pick a node with least load (i.e., number of jobs running on that node divided by number of CPUs on that node). The default scheduler then picks job submitted earliest among all clusters that can use that node (i.e., the node matches nodes list, if given). Alternate schedulers can be choosen when starting dispyscheduler with following options:

  • --fair_cluster_scheduler chooses earliest job submitted from cluster that was last scheduled least recently (i.e., has been waiting for its turn to run jobs longest). If there are jobs pending in clusters, this algorithm chooses jobs from those clusters in a round-robin fashion.
  • --early_cluster_scheduler chooses earliest job submitted from cluster that was created first (i.e., the client called SharedJobCluster earliest). With this algorithm, clusters are scheduled with first-created-first-served fashion.

3.1. NAT/Firewall Forwarding

As explained in dispy (Client) and dispynode (Server) documentation, ext_ip_addr can be used to use services behind NAT firewall/gateway. This option can be used with dispyscheduler, too. This is especially useful if there are many nodes in a network behind NAT firewall/gateway (otherwise, as explained in dispynode documentation, each dispynode should be started with a different port and all those ports forwarded appropriately). Assuming that dispyscheduler is to run on a node with (private) IP address 192.168.20.55 and it is behind NAT firewall/gateway at (public) IP address a.b.c.d, dispyscheduler can be invoked as:

dispyscheduler.py -i 192.168.20.55 --ext_ip_addr a.b.c.d

and setup NAT to forward UDP and TCP ports 51347 and TCP port 51349 to 192.168.20.55. Then dispy clients can use nodes in this network with:

cluster = SharedJobCluster(compute, nodes=['*'], scheduler_node='a.b.c.d')