1. dispy (Client)

dispy module provides API for the client (program) to create clusters (to distribute computations to servers running dispynode), schedule jobs to execute those computations and process jobs’ execution results.

Creating a cluster consists of packaging computation fragments (code and data), specify options that control how the computations are executed, such as which nodes can execute computations. There are two ways to create clusters with dispy: JobCluster and SharedJobCluster. If only one instance of client program may be running at anytime, JobCluster is simple to use; it already contains a scheduler that will schedule jobs to nodes running dispynode (Server). If, however, multiple programs using dispy may be running simultaneously, JobCluster cannot be used - each of the schedulers in each instance of dispy will assume the nodes are controlled exclusively by each, causing conflicts. Instead, SharedJobCluster must be used. In this case, dispyscheduler (Shared Execution) program must also be running on some computer and SharedJobCluster must set scheduler_node parameter with the node running dispyscheduler (default is the host that calls SharedJobCluster).

Once a cluster is created, jobs can scheduled with cluster.submit() to execute a computation with different parameters, similar to how a function is executed in a local program, except that here dispy’s scheduler will execute it on an available processor on a remote server. The scheduler and dispynode server will run at most one job on a processor at any time, as the computations are assumed to CPU intensive. (See asyncoro’s discoro for an alternate execution model for executing multiple computations on a processor.)

While dispy and other components have various options that cover rather comprehensive use cases, making it seem complex, most of the options have default values that likely work for common cases. For example, starting dispynode (Server) program on each of the nodes on a local network and using JobCluster with computation, and possibly depends parameters, as done with example in dispy: Distributed and Parallel Computing with/for Python, may be sufficient.

1.1. JobCluster

class dispy.JobCluster(computation, nodes=['*'], depends=[], callback=None, cluster_status=None, ip_addr=None, ext_ip_addr=None, port=51347, node_port=51348, recover_file=None, dest_path=None, loglevel=dispy.logger.WARNING, setup=None, cleanup=True, pulse_interval=None, ping_interval=None, reentrant=False, secret='', keyfile=None, certfile=None)

Creates and returns a cluster for given computation, which can be used to schedule executions, as explained above, and Cluster. The parameters are:

  • computation should be either a Python function or a string. If it is a string, it must be path to executable program. This computation is sent to nodes in the given cluster. When a job is submitted (to invoke computation with arguments), dispynode executes the computation with those arguments in isolation - the computation should not depend on global state, such as modules imported in the main program or global variables etc., except as described below for setup parameter (see also Examples).

  • nodes must be a list, each element of which must be either a string or NodeAllocate object. If element is a string, it must be either IP address or host name; this string is converted to NodeAllocate object.

    This list serves two purposes: dispy initially sends a request to all the nodes listed to find out information about them (e.g., number of processing units available for dispy), then sends given computation to only those nodes that match the listed nodes (dispy may know about nodes not listed in this computation, as it also broadcasts identification request). Wildcard ‘*’ can be used to match (part of) any IP address; e.g., ‘192.168.3.*’ matches any node whose IP address starts with ‘192.168.3’. If there are any nodes beyond local network, then all such nodes should be mentioned in nodes. If there are many such nodes (on outside local network), it may be cumbersome to list them all (it is not possible to send identification request to outside networks with wildcard in them); in that case, dispynetrelay (Using Remote Servers) may be started on one of the nodes on that network and the node running dispynetrelay should be added to nodes list (and a wildcard for that network, so that other nodes on that network match that wildcard); see Examples.

  • depends is list of dependencies needed for computation. If computation is a Python function, each element of this list can be either Python function, Python class, an instance of class (object), a Python module, or path to a file. If computation is a program, each element can be either a Python module (useful if computation is a Python program), or path to a file. Only Python modules that are not present on nodes already need be listed; standard modules that are present on all nodes do not need to be listed here. Any code components in depends are executed on server such that the client can create an environment (state) for the jobs; for example, if client and computations exchange Class instances (objects), then the definition for that class can be included in depends so the object serialized at the client (for transfer over network as stream of bytes) and unserialized at the server using that class definition.

    These dependencies are available to all the jobs executed by nodes. Unless cleanup parameter is set to False, they are removed after the computation is done. See cluster.submit in Cluster on how to distribute dependencies for each job, instead of for the computation.

  • callback is a user provided function. When a job’s results become available, dispy will call provided callback function with that job as the argument. If a job sends provisional results with dispy_provisional_result multiple times, then dispy will call provided callback each such time. The (provisional) results of computation can be retrieved with ‘result’ field of job, etc. While computations are run on nodes in isolated environments, callbacks are run in the context of user programs from which (Shared)JobCluster is called - for example, callbacks can access global variables in programs that created cluster(s).

  • cluster_status is a user provided function. dispy calls this function whenever there is a change in a job’s status or a node’s status. The function is called with the arguments:

    status which is either DispyJob’s status, such as DispyJob.Created, DispyJob.Running, or DispyNode’s status, such as DispyNode.Initialized,

    node, an instance of DispyNode about which the status is indicated,

    job, an instance of DispyJob if the status is about a job, otherwise None.

    This feature is used by Monitor and Manage Cluster so the cluster status can be monitored in a web browser.

  • ip_addr is address to use for (client) communication. If it is not set, all configured network interfaces are used. If it is a string, it must be either a host name or IP address (in either IPv4 or IPv6 format). If it is a list, each must be a string of host name or IP address, in which case interface addresses for each of those is used.

  • ext_ip_addr is a string or list of strings of host names or IP addresses to use as return address for node to client communication. This may be needed in case the client is behind a NAT firewall/gateway and (some) nodes are outside. Typically, in such a case, ext_ip_addr must be the address of NAT firewall/gateway and the NAT firewall/gateway must forward ports to ip_addr appropriately. See below for more on NAT/firewall information.

  • port is port to use for (client) communication. Usually not necessary. If not given, dispy will request socket library to choose any available port.

  • node_port is port to use for communicating with nodes (servers). If this is different from default, dispynode.py programs must be run with the same port.

  • dest_path is directory on the nodes where files are transferred to. Default is to create a separate directory for each computation. If a computation transfers files (dependencies) and same computation is run again with same files, the transfer can be avoided by specifying same dest_path, along with the option ‘cleanup=False’.

  • recover_file, if given, must be a string. If it is a string, dispy uses it as path to file in which it stores information about cluster. If it is None, then a file of the form ‘_dispy_YYYYMMDDHHMMSS’ in the current directory is used. In case user program terminates unexpectedly (for example, because of network failure, uncaught exception), this file can be used to retrive results of jobs that were scheduled but not yet finished at the time of crash. See (Fault) Recover Jobs below.

  • loglevel is logging message level and should be one of DEBUG, INFO, *WARNING, ERROR and CRITICAL attributes in dispy.logger. At *DEBUG level many messages are logged and at CRITICAL level very few message are logged. Default value is *dispy.logger.INFO.

  • setup, if given, must be a Python function or partial function that takes no (further) arguments. This function is transferred, along with dependencies and computation, to each node and executed on each node after all dependencies have been copied to dest_path and working directory has also been set to dest_path (which would also be the working directory when computation jobs are executed).

    The setup function is executed on a node exactly once before any jobs are scheduled on that node; the function should return 0 to indicate successful initialization. This function can be used, for example, to initialize global variables that are then used in jobs as read-only variables for in-memory processing.

    Note

    Note that if data is read in to memory for in-memory processing, sharing nodes with SharedJobCluster may not be suitable; if SharedJobCluster must be used with in-memory processing, consider using exclusive option so that node executes jobs for that cluster exclusively (i.e., node is not shared with other clusters even though SharedJobCluster is used).

    Under Linux, OS X (and other Unix variants, but not Windows), child processes (that execute computation jobs) inherit parent’s address space (among other things), so the setup function can initialize any data to global variables.

    Under Windows, fork is not available, so the global variables are pickled and sent as arguments to new process; see multiprocessing’s Programming guidelines for Windows. Thus, under Windows, modules, for example, can’t be declared global variables in setup function, as modules are not serializable. If setup feature is used under Windows, care must be taken not to share same node with other computations, as one computation’s global variables may interfere with another computation’s.

    See Examples where setup and cleanup functions are used to initialize and de-initialize global variables.

  • cleanup: If True (default value), any files (dependencies) transferred will be deleted after the computation is done. If it is False, the files are left on the nodes; this may speedup if same files are needed for another cluster later. However, this can be security risk and/or require manual cleanup. If same files are used for multiple clusters, then cleanup may be set to False and same dest_path used.

    If cleanup is neither True nor False, must be a Python function or partial function that takes no (further) arguments. This function is transferred to each of the nodes and executed on the nodes before deleting files as mentiond above. If setup function creates global variables, for example, the cleanup function may delete those global variables.

  • pulse_interval is number of seconds between ‘pulse’ messages that nodes send to indicate they are alive and computing submitted jobs. If this value is given as an integer or floating number between 1 and 600, then a node is presumed dead if 5*pulse_interval seconds elapse without a pulse message. See ‘reentrant’ below.

    If a node has psutil module installed, that node sends current availability status (CPU, memory, disk space and swap space) with DispyNodeAvailInfo at pulse_interval frequency. This information is then sent to cluster_status callback in DispyNode as avail_info member. If node doesn’t have psutil module, avail_info member would be None.

  • reentrant must be either True or False. This value is used only if ‘pulse_interval’ is set for any of the clusters. If pulse_interval is given and reentrant is False (default), jobs scheduled for a dead node are automatically cancelled (for such jobs, execution result, output and error fields are set to None, exception field is set to ‘Cancelled’ and status is set to Cancelled); if reentrant is True, then jobs scheduled for a dead node are resubmitted to other available nodes.

  • ping_interval is number of seconds. Normally dispy can locate nodes running dispynode by broadcasting UDP ping messages on local network and point-to-point UDP messages to nodes on remote networks. However, UDP messages may get lost. Ping interval is number of seconds between repeated ping messages to find any nodes that have missed previous ping messages.

  • secret is a string that is (hashed and) used for handshaking of communication with nodes; i.e., this cluster will only work with nodes that use same secret (see secret option to dispynode (Server)). This prevents unauthorized use of nodes. However, the hashed string (not the secret itself) is passed in clear text, so an unauthorized, determined person may be able to figure out how to circumvent. This feature can be used, for example, to easily create a private cluster with small number of machines in a large network.

  • keyfile is path to file containing private key for SSL communication, same as ‘keyfile’ parameter to ssl.wrap_socket of Python ssl module. This key may be stored in ‘certfile’ itself, in which case this should be None. Same file must be used as keyfile option for dispynode (Server) in the case of JobCluster or for dispyscheduler (Shared Execution) in the case of SharedJobCluster.

  • certfile is path to file containing SSL certificate, same as ‘certfile’ parameter to ssl.wrap_socket of Python ssl module. Same file must be used as certfile option for dispynode (Server) in the case of JobCluster or for dispyscheduler (Shared Execution) in the case of SharedJobCluster.

1.2. SharedJobCluster

SharedJobCluster (along with running dispyscheduler (Shared Execution) program) should be used when multiple dispy client programs run simultaneously. SharedJobCluster has almost the same syntax as JobCluster, except as noted below.
class dispy.SharedJobCluster(computation, nodes=['*'], depends=[], ip_addr=None, port=51347, scheduler_node=None, scheduler_port=None, ext_ip_addr=None, dest_path=None, loglevel=dispy.logger.WARNING, cleanup=True, reentrant=False, exclusive=False, secret='', keyfile=None, certfile=None)

where all arguments common to JobCluster are same except as noted below.

Note

Note that client port is 51347 (since dispy version 4.6.7). Client programs can be started with this port on different computers. However, starting more than one client program on same computer will not work (as they will share same port on same computer); if starting more than one client on same computer, either specify different port number, or use port=0 so that each client will use different random port.

  • scheduler_node is either IP address or host name where dispyscheduler (Shared Execution) is running; if it is not given, the node where SharedJobCluster is invoked is used
  • pulse_interval is not used in case of SharedJobCluster; instead, dispyscheduler (Shared Execution) program must be started with pulse_interval option appropriately.
  • secret is a string that is (hashed and) used for handshaking of communication with dispyscheduler; i.e., this cluster will only work with dispyscheduler that uses same secret (see clustersecret option to dispyscheduler (Shared Execution)).
  • exclusive must be either True or False. If it is False (default), the scheduler may schedule jobs from different computations/clients (e.g., in different programs) simultaneously. That is, a node may execute one job from one computation and another job from a different computation at the same time. This works well if computations don’t consume too much memory. If, however, computations need lot of memory to execute, they may fail or perform poorly. To use all the nodes exclusively, a computation may set exclusive to True. In that case, the computation waits until all currently executing computations are closed and then use all the nodes exclusively. While an exclusive computation is running, any new computations are queued up and wait for their turn. Using exclusive=True may not use nodes efficiently, if computation doesn’t submit enough jobs to use all available processors.

1.3. Cluster

A cluster created by either JobCluster or SharedJobCluster has following methods:

cluster.submit(*args, **kwargs[, dispy_job_depends=[]])

Creates a DispyJob object (see below for useful fields of this object), schedules it for execution on an eligible node (whenever one becomes available) and returns the job. Results from execution of computation with given arguments will be available in the job object after execution finishes.

This method should be called with the arguments exactly as expected by the computation given to JobCluster or SharedJobCluster. If computation is a Python function, the arguments may also contain keyword arguments. All arguments must be serializable (picklable), as these are sent over the network to the nodes. If an argument is a class object that contains non-serializable members, then the classes may provide __getstate__ method for this purpose (see ‘_Job’ class in dispy.py for an example). If computation is a standalone program, then all arguments must be strings.

When submitting many jobs or if arguments are large, consider the memory required to store all the arguments for all submitted jobs - until all references to jobs and arguments are cleared, the space required to keep them may accumulate, causing memory issues at client/scheduler. In many examples, jobs are stored in a list and never removed; this is not suitable when jobs and arguments require lot of space. See Efficient job submission for an example on one approach to deal with such cases.

As noted in JobCluster, dispy can distribute dependencies to nodes when the cluster is created. All the jobs created by computations can use those dependencies. If, however, each job has its own dependencies (that are needed only for that job’s execution), then such dependencies can be passed with optional keyword parameter dispy_job_depends list to cluster.submit. The format of this dependencies list is same as that for depends parameter of JobCluster. The keyword parameter dispy_job_depends is used by cluster.submit and removed before job is created, so that the computation itself doesn’t get this parameter. Dependencies distributed by dispy_job_depeneds are automaticaly removed after the job is done (even if cleanup is set to False when creating cluster).

cluster.submit_node(node, *args, **kwargs[, dispy_job_depends=[]])

Similar to submit method above, except that the job is scheduled on given node only. node can be an instance of DispyNode (e.g., as received in cluster_status callback), an IP address or host name.

This method, along with cluster_status option, can be used to schedule jobs in the client program to execute jobs with full control over when / how jobs are scheduled. (Jobs submitted with submit method are scheduled with load balancing algorithm.) See job_scheduler.py file in the examples directory for an example use case.

cluster.send_file(path, node)

Sends file with path to node. node can be an instance of DispyNode (as received in status callbacks, for example), IP address or host name. The file is copied to destination path (working directory of computation) with just the base name; i.e., without directory structure. If the file is copied successfully, the return value is 0.

send_file, along with submit_node, can be used with cluster_status to send data files after a node is initialized, for example, to distribute data for distributed in-memory processing, implement different schedulers, or fully control where and when jobs are submitted etc.

cluster.cancel(job)

Terminates given job (returned from previous cluster.submit()). If the job is queued for execution, it is removed from the scheduler, or terminated (killed) it if it has started execution at a node. Note that if the job execution has any side effects (such as updating database, files etc.), cancelling a job may leave unpredictable side effects, depending on at what point job is terminated.

cluster.allocate_node(node)

Allocate given node for the cluster. ‘node’ may be host name or IP address, or an instance of NodeAllocate.

If the request is successful, the return value is 0; otherwise, it will be -1.

cluster.node_jobs(node)

Returns list of jobs currently running on given node, given as host name or IP address.

cluster.set_node_cpus(node, cpus)

Sets (alters) CPUs managed by dispy on a node, given as host name or IP address, to given number of CPUs. If the number of CPUs given is negative then that many CPUs are not used (from the available CPUs).

If the cluster is SharedJobCluster, set_node_cpus does not change the CPUs (as the node is shared by other users/programs).

If the request is successful, the return value indicates number of CPUs used by dispy for given node. Otherwise, the method returns -1.

cluster.wait(timeout=None)

Wait for all scheduled jobs to complete. If timeout is given, it is maximum time in seconds to wait for jobs to complete. If jobs complete before timeout, the return value is True; otherwise, the return value is False.

cluster.close(timeout=None, terminate=False)

Close the cluster (jobs can no longer be submitted to it). If there are any jobs pending, this method waits until they all complete before cluster is closed. If timeout is given, it is maximum time in seconds to wait for jobs to complete. If jobs complete before timeout, cluster is closed and returned value is True; otherwise, the value returned is False unless terminate is True in which case pending jobs are cancelled (removed or terminated by nodes executing them). Additional clusters may be created after this call returns.

cluster.print_status()

Prints status about nodes, such as time each node spent executing jobs etc.

1.4. DispyJob

The result of cluster.submit() call of a cluster is an instance of DispyJob (see dispy.py), which can be used to examine status of job execution, retrieve job results etc. It has following attributes:

  • id can be used by client program to set to any value appropriate. For example, id field can be set to a unique value to distinguish one job from another, as done in the example in dispy: Distributed and Parallel Computing with/for Python. This is the only field that can be written to by the client. Rest of the attributes are read-only.

    While any Python object can be assigned to this attribute, dispy debug log (if enabled) prints this as string and node information in Manage Cluster (Nodes) shows id attribute as string as well. So if id is an instance of a class, providing appropriate __str__ method for the class, as done in the example in Example, will give useful information about the job; otherwise, id may be shown simply as reference to class instance.

  • status indicates current status of job. Its value is one of Created, Running, Finished, Cancelled or Terminated (DispyJob class properties).

  • ip_addr is IP address of node where the job is currently executing, or executed.

When a submitted job is called with job(), it returns that job’s execution result, waiting until the job is finished if it has not finished yet. After a job is finished, following attributes can be read:

  • result will have computation’s result - return value if computation is a function and exit status if it is a program. job.result is same as return value of job()
  • stdout and stderr will have stdout and stderr strings of the execution of computation at server node
  • exception will have exception trace if computation raises any exception; in this case job.result will be None
  • start_time will be the time when job was scheduled for execution on a node
  • end_time will be time when results became available

Job’s result, stdout and stderr should not be large - these are buffered and hence will consume memory (not stored on disk). Moreover, like args and kwargs, result should be serializable (picklable object). If result is (or has) an instance of Python class, that class may have to provide __getstate__ function to serialize the object. If the result is not serializable, or it is too big, then the data can be saved to a file and transferred to the client, as explained in Transferring Files.

After jobs are submitted, cluster.wait() can be used to wait until all submitted jobs for that cluster have finished. If necessary, results of execution can be retrieved by either job() or job.result, as described above.

1.5. NodeAllocate

As mentioned in JobCluster above, nodes must be a list, each element of which must be either a string, tuple or NodeAllocate object. The string and tuple types are used for convenience, but these elements are converted to NodeAllocate object by JobCluster, which gives more control to specify how to use nodes.

class NodeAllocate(host, port=None, cpus=0)

host must be a string, indicating either host name, IP address of node or string with wildcard ‘*’ that matches IP address of nodes found, as explained in nodes of JobCluster.

port is port used by dispynode on given node. If it is not given, default dispynode port (51348) is used.

cpus is maximum number of CPUs to use on given node. Default is to use all CPUs available.

When dispy finds a node, each NodeAllocate object’s allocate method is called.

allocate(cluster, ip_addr, name, cpus, avail_info=None, platform=None)

cluster is instance of JobCluster.

ip_addr is IP address of node for which this method is called.

name is host name of node.

cpus is number of CPUs available on that node.

avail_info indicates latest availability status of node as an instance of DispyNodeAvailInfo if that node has psutil module installed; otherwise avail_info would be None.

platform is as obtained by platform.platform() method on the node. This parameter can be used to filter nodes when cluster consists of nodes with multiple platforms.

This method should return a number indicating number of CPUs to use on that node. The default allocate method returns minimum of CPUs found (given to allocate method) and cpus specified in constructor. NodeAllocate class can be sub-classed to provide different functionality. For example:

class FilterNodeAllocate(dispy.NodeAllocate):
   def allocate(self, cluster, ip_addr, name, cpus, avail_info=None, platform=''):
       # use only nodes that have 1GB of memory and 100GB of disk space available
       # and run Linux 64-bit
       GB = 1024 ** 3
       if (isinstance(avail_info, dispy.DispyNodeAvailInfo) and
           avail_info.memory > GB and avail_info.disk > (100 * GB) and
           re.match('Linux.*x86_64', platform)):
           return cpus # use all available CPUs on this node
       else:
           return 0 # don't use this node

FilterNodeAllocate can then be used to specify nodes when creating JobCluster, for example, with:

cluster = dispy.JobCluster(compute, nodes=[FilterNodeAllocate('*)])

1.6. DispyNodeAvailInfo

If a node has psutil module installed, its latest avaiability status is given as an instance of DispyNodeAvailInfo, which has four read-only attributes:

cpu is available CPU as percent. If it is close to 100, the node is not busy at all, and if it is close to 0, all CPUs are busy running compute-intensive tasks.

memory is available memory in bytes. This is not total memory in system, but usable memory for applications (as interpretted by psutil module).

disk is available disk space in bytes for the partition that dispynode uses as given by dest_path_prefix option (used by dispynode to save clients’ files and jobs are run).

swap is available swap space as percent. If it is close to 100, most of swap space is being used (i.e., system is under heavy memory load), and if it is close to 0, very little of swap space is being used.

1.7. Provisional/Intermediate Results

dispy_provisional_result(result)

Computations (if they are Python functions) can use this function to send provisional or intermediate results to the client. For example, in optimization computations, there may be many (sub) optimal results that the computations can send to the client. The client may ignore, cancel computations or create additional computations based on these results. When computation calls dispy_provisional_result(result), the Python object result (which must be serializable) is sent back to the client and computation continues to execute. The client should use callback option to process the information, as shown in the example:

import random, dispy

def compute(n): # executed on nodes
    import random, time, socket
    name = socket.gethostname()
    cur_best = 1
    for i in xrange(0, n):
        r = random.uniform(0, 1)
        if r <= cur_best:
            # possible result
            dispy_provisional_result((name, r))
            cur_best = r
        time.sleep(0.1)
    # final result
    return (name, cur_best)

def job_callback(job): # executed at the client
    if job.status == dispy.DispyJob.ProvisionalResult:
        if job.result[1] < 0.005:
            # acceptable result; terminate jobs
            print('%s computed: %s' % (job.result[0], job.result[1]))
            # 'jobs' and 'cluster' are created in '__main__' below
            for j in jobs:
                if j.status in [dispy.DispyJob.Created, dispy.DispyJob.Running,
                                dispy.DispyJob.ProvisionalResult]:
                    cluster.cancel(j)

if __name__ == '__main__':
    cluster = dispy.JobCluster(compute, callback=job_callback)
    jobs = []
    for n in range(4):
        job = cluster.submit(random.randint(50,100))
        if job is None:
            print('creating job %s failed!' % n)
            continue
        job.id = n
        jobs.append(job)
    cluster.wait()
    cluster.print_status()
    cluster.close()

In the above example, computations send provisional result if computed number is &lt;= threshold (0.2). If the number computed is less than 0.005, job_callback deems it acceptable and terminates computations.

1.8. Transferring Files

dispy_send_file(path)

Computations (if they are Python functions) can use this function to send files (on server nodes) to the client. This is useful if the nodes and client don’t share a file system and computations generate files.

As explained in DispyJob, computation results (return value in the case of Python functions and exit status in the case of programs), along with any output, errors and exception trace are transferred to the client after a job is finished executing. As these are stored in memory (and sent back as serializable objects), they should not be too big in size. If computations generate large amount of data, the data can be saved in file(s) (on the nodes) and then transferred to the client with dispy_send_file(path). The files are saved on the client with the same path under the directory where the client is running.

For example, a computation that saves data in a file named “file.dat” and transferrs to the client is:

def compute(n):
    # ... generate "file.dat"
    dispy_send_file("file.dat")
    # ... computation may continue, possibly send more files
    return result

dispy_send_file returns 0 if the file is transferred successfully. The maximum size of the file transferred is as per max_file_size option to dispynode (Server); its default value is 10MB.

All the files sent by the computations are saved under the directory where client program is running. If more than one file is sent (generated on multiple nodes, say), care must be taken to use different paths/names to prevent overwriting files.

Note that depends parameter to JobCluster and SharedJobCluster can be used to transfer files (and other types of dependencies) from client to nodes. If files need to be sent after cluster is created, cluster’s send_file method can be used.

1.9. (Fault) Recover Jobs

As mentioned above, dispy stores information about cluster in a file as per recover_file option. If user program terminates unexpectedly, the nodes that execute those jobs can’t send the results back to client. In such a case, the results for the jobs that were scheduled at the time of crash can be retrieved from the nodes with the function in dispy module:

dispy.recover_jobs(recover_file, timeout=None, terminate_pending=False)
  • recover_file is path to the file used when cluster is created. See recover_file option in JobCluster above .
  • timeout is number of seconds to wait for job results from nodes. If timeout seconds is not None, then results for jobs that have finished before timeout are returned; other jobs continue to execute.
  • terminate_pending, if True, will terminate jobs on the nodes that have not finished. This can be used in combination with timeout

This function reads the information about cluster in the recover_file, inquires nodes about jobs, retrieves results of jobs, waiting until they are all finished. The result of this function is list of DispyJob instances with certain attributes filled, such as result, stdout, stderr, exception, status etc. This function also releases the nodes from the computation after retrieving the results of all pending jobs, so the nodes will respond to other clients.

As an example, assume that client’s recover_file is _dispy_20160125235537 and crashed after submitting 5 jobs. To recover these jobs, run following program:

import dispy
jobs = recover_jobs('_dispy_20160126112746')
for job in jobs:
    print('Job result: %s' % job.result)

This will wait until all 5 jobs finish and return DispyJob instances.

1.10. NAT/Firewall Forwarding

By default dispy client uses UDP and TCP ports 51347, dispynode uses UDP and TCP ports 51348, and dispyscheduler uses UDP and TCP pots 51347 and TCP port 51348. If client/node/scheduler are behind a NAT firewall/gateway, then these ports must be forwarded appropriately and ext_ip_addr option must be used. For example, if dispy client is behdind NAT firewall/gateway, JobCluster / SharedJobCluster must set ext_ip_addr to the NAT firewall/gateway address and forward UDP and TCP ports 51347 to the IP address where client is running. Similarly, if dispynode is behind NAT firewall/gateway, ext_ip_addr option must be used.

1.11. SSH Port Forwarding

If nodes are on remote network, nodes may not be able to communicate with client and NAT/Firewall Forwarding may not be possible. In such cases, SSH can be used for port forwarding.

To use this with JobCluster, client port (default is 51347) should be forwarded from each node with ssh -R 51347:127.0.0.1:51347 node, and then parameters ext_ip_addr=127.0.0.1 should be set to JobCluster.

SharedJobCluster by default uses a random port for client. To use port forwarding with dispyscheduler (Shared Execution) running on remote network, force SharedJobCluster to use specific port (other than 51347, 51348 and 51349, which are used by dispyscheduler and dispynode); here port 2345 is assumed. Use ssh tunneling with ssh -R 2345:127.0.0.1:2345 scheduler_node to forward port 2345 and then specify port=2345, ext_ip_addr=127.0.0.1 to SharedJobCluster.

If dispyscheduler (Shared Execution) is running on local network and nodes are on remote network, start dispyscheduler with –ext_ip_addr 127.0.0.1 option and then forward port 51347 (default client port) from each node as done in the case of JobCluster above. No special setup is needed for SharedJobCluster, as the scheduler and client are in same network.

1.12. SSL (Security / Encryption)

If nodes are on public / remote networks, SSL (Secure Sockets Layer) can be used to encrypt all communication, including data, so that the connection is private - only the sender and recipient see the data exchanged and other observers see encrypted data. dispy provides a simple mechnism where digital certificates / symmetric keys are used to encrypt data on one side and decrypt data on the other side. Below are set of commands to generate the certificates using openssl tool (this is just an example - there are other approaches / methods):

openssl req -x509 -newkey rsa:2048 -keyout sskeycert.pem -nodes -out sskeycert.pem -sha256 -days 1000

This command generates self-signed key and certificate pair in file sskeycert.pem that should be used as certfile parameter where SSL is used. It is also possible to generate key and certificate in separate files, and use keyfile and certfile parameters appropriately.

Once the key/certificate pair is generated (or obtained by other means), they should be copied to each of the nodes (over a secure channel, such as ssh). The nodes should be started with dispynode.py --certfile sskeycert.pem (or if key and certificate are in separate files, with dispynode.py --certfile sscert --keyfile sskey) and the client should create cluster with certfile=sskeycert.pem (or certfile=sscert, keyfile=sskey if they are in separate files).

If SharedJobCluster is used, the certificates used by client should be those used wth cluster_certfile and cluster_keyfile options used when starting dispyscheduler; the client itself doesn’t communicate with nodes. dispyscheduler may use additional set of certificates to communicate with nodes using node_certfile and node_keyfile which should also be used by nodes.

1.13. Cloud Computing

When client is in the same network as nodes, using dispy is same as explained above, except that some cloud platforms may not allow UDP broadcast preventing client to discover nodes automatically so nodes may have to be listed explicitly using nodes parameter.

To use nodes in a cloud computing platform when client is in a different network (e.g., local), ext_ip_addr of dispynode (Server) can be used to establish communication between client and remote nodes. Amazon EC2 cloud computing is used an example below, but similar setup should work with other cloud computing platforms, or any setup where nodes are in remote network.

It may be necessary to setup the configuration to allow TCP ports 51347-51349 (default ports used by dispy) or any other ports used in dispy client. For example, with EC2 “Security Group” should be created and assigned to the instance so inbound TCP ports 51347-51349 are allowed.

With EC2 service, a node has a private IP address (called ‘Private DNS Address’) that uses private network of the form 10.x.x.x and public address (called ‘Public DNS Address’) that is of the form ec2-x-x-x-x.x.amazonaws.com. After launching instance(s), one can copy dispy files to the node(s) and run dispynode on each node as:

dispynode.py --ext_ip_addr ec2-x-x-x-x.y.amazonaws.com

(this address can’t be used with -i/–ip_addr option, as the network interface is configured with private IP address only). This node can then be used by dispy client from outside EC2 network by specifying ec2-x-x-x-x.x.amazonaws.com in the nodes list (perhaps using EC2 servers to augment local processing units). With ext_ip_addr, dispy acts similar to NAT - it announces ext_ip_addr to other services instead of the configured ip_addr so that external services send requests to ext_ip_addr and if firewall/gateway forwards them appropriately, dispy will process them.

If the EC2 node can connect to client with the IP address and port (default 51347) used by client, the cluster should be created in the client with:

cluster = dispy.JobCluster(compute, nodes=['ec2-x-x-x-x.y.amazonaws.com'])

If the client is behind a router, the router’s firewall can be configured to forward port 51347 to client’s IP address, cluster can be created with:

cluster = dispy.JobCluster(compute, nodes=['ec2-x-x-x-x.y.amazonaws.com'],
                           ext_ip_addr='router.ip.address')

If client is behind a router and its firewall can’t be setup to forward port 51347, then ssh can be used to forward the port. To use this, first login to EC2 node with:

ssh -i ec2-key.pem 51347:127.0.0.1:51347 userid@ec2-x-x-x-x.y.amazonaws.com

Then start dispynode as mentioned above, and create cluster with:

cluster = dispy.JobCluster(compute, nodes=['ec2-x-x-x-x.y.amazonaws.com'],
                           ip_addr='127.0.0.1')

In case of problems, enable debugging on the nodes (with -d option) and client (with loglevel=dispy.logger.DEBUG option). If that still doesn’t work, check that the node is reachable with telnet ec2-x-x-x-x.y.amazonaws.com 51348 from client (after starting dispynode); the output should contain Connected message.