6. Examples

dispy can be used to distribute standalone programs or Python program fragments (functions, classes, modules) and files (programs ,data) to nodes and execute jobs in parallel. It supports various options to handle rather comprehensive cases (such as fault tolerance, sharing nodes in multiple programs simulataneously, using nodes in multiple networks etc.); however, in common setups, the usage is simple, as done in the demonstrative examples below. Some of these examples are also available in ‘examples’ directory where dispy is instaleld, which can be obtained with the program:

import os, dispy
print(os.path.join(os.path.dirname(dispy.__file__), 'examples'))

Short summary of examples (follow the links for details):

  1. Command-line uses dispy as command line tool to distribute and run a standalone program.
  2. Python script is a simple Python program that distributes and runs a standalone program.
  3. Canonical program distributes and executes a Python function with different parameters. The program also prints some of the attributes of each job after it finishes (such as job’s result, start time).
  4. Distributing objects shows how to send Python objects (Class instances) in client program to remote nodes to execute Python function (which takes those objects as arguments).
  5. In-memory processing is one use of setup (and cleanup) feature to initialize nodes before executing jobs. In this case, setup is used to load data (in files transferred from client) in to global variables; each job then uses the data (for read-only) in these variables for in-memory processing (i.e., access data loaded with setup function instead of reading data from file in each job).
  6. Updating globals shows how to use Python’s sharedcypes to initialize (using setup feature as above) a global variable and then update in jobs so that all jobs on a node will see updated value.
  7. Sending files to client illustrates how jobs can use dispy_send_file function to send files to client.
  8. Callback processing uses callback feature to process each job’s result as they are finished.
  9. Efficient job submission uses callback feature to submit just enough jobs to keep scheduler use all CPUs in cluster, instead of all at once, which can cause memory problems.
  10. Port forwarding with SSH example uses ssh to forward port to use nodes in remote network (in this case Amazon EC2 cloud instance).
  11. Recover jobs / nodes shows how to recover jobs and nodes after a client crashes or loses network connection to nodes.
  12. Cluster creation lists various ways to create clusters for different configurations.
  13. MapReduce is a simple implementation of well-known map-reduce problem.

6.1. Command-Line

dispy can be used as a command line tool (for simple cases, scheduling cron jobs); in this case the computations should only be programs and dependencies should only be files.:

dispy.py -f /some/file1 -f file2 -a "arg11 arg12" -a "arg21 arg22" -a "arg3" /some/program

will distribute ‘/some/program’ with dependencies ‘/some/file1’ and ‘file2’ and then execute ‘/some/program’ in parallel with 3 instances: a) arg11 and arg12 (two arguments to the program), b) arg21 and arg22 (two arguments), and c) arg3 (one argument).

6.2. Python Script

A simple client program that distributes a program (say, ‘/path/to/program’), executes them with a sequence of numbers as arguments is:

import dispy
cluster = dispy.JobCluster('/path/to/program')
for i in range(50):
    cluster.submit(i)

The program ‘/path/to/program’ on the client computer is transferred to each of the nodes, so if the program is a binary program then all the nodes should have same architecture as the client.

In the cases above we assume that the programs execute and save the computation results in a database, file system etc. If we are interested in exit status, output from each run etc., then we need to collect each of the jobs submitted from which interested attributes can be retrieved, as done in the example below.

6.3. Canonical Program

A canonical cluster that distributes computation ‘compute’ (Python function) to nodes (running dispynode (Server) on a local network), schedules jobs with the cluster, gets jobs’ results and prints them is sample.py:

# function 'compute' is distributed and executed with arguments
# supplied with 'cluster.submit' below
def compute(n):
    import time, socket
    time.sleep(n)
    host = socket.gethostname()
    return (host, n)

if __name__ == '__main__':
    # executed on client only; variables created below, including modules imported,
    # are not available in job computations
    import dispy, random
    # distribute 'compute' to nodes; in this case, 'compute' does not have
    # any dependencies to run on nodes
    cluster = dispy.JobCluster(compute)
    # run 'compute' with 20 random numbers on available CPUs
    jobs = []
    for i in range(20):
        job = cluster.submit(random.randint(5,20))
        job.id = i # associate an ID to identify jobs (if needed later)
        jobs.append(job)
    # cluster.wait() # waits until all jobs finish
    for job in jobs:
        host, n = job() # waits for job to finish and returns results
        print('%s executed job %s at %s with %s' % (host, job.id, job.start_time, n))
        # other fields of 'job' that may be useful:
        # job.stdout, job.stderr, job.exception, job.ip_addr, job.end_time
    cluster.print_status()  # shows which nodes executed how many jobs etc.

6.4. Distributing Objects

If the computation has any dependencies, such as classes, objects or files, they can be specified with ‘depends’ argument and dispy will distribute them along with the computation.

Continuing trivial but illustrative examples, the program obj_instances.py below distributes computation to be executed with instances of a class:

class C(object):
    def __init__(self, i, n):
        self.i = i
        self.n = n

    def show(self):
        print('%s: %.2f' % (self.i, self.n))

def compute(obj):
    # obj is an instance of C
    import time
    time.sleep(obj.n)
    obj.show()  # the output is stored in job.stdout
    return obj.n

if __name__ == '__main__':
    import random, dispy
    # 'compute' needs definition of class C, so include in 'depends'
    cluster = dispy.JobCluster(compute, depends=[C])
    jobs = []
    for i in range(10):
        c = C(i, random.uniform(1, 3)) # create object of C
        job = cluster.submit(c) # it is sent to a node for executing 'compute'
        job.id = c # store this object for later use
        jobs.append(job)
    for job in jobs:
        job() # wait for job to finish
        print('%s: %.2f / %s' % (job.id.i, job.result, job.stdout))

Note that class C is given in ‘depends’ so the code for it is transferred to the nodes automatically and the objects created in client program work transparently in ‘compute’ on remote nodes. The objects are serialized using Pickle and sent over the to the nodes, so the objects must be serializable. If they are not serializable (e.g., they contain references to locks), then the class must provide __getstate__ and __setstate__ methods; see Python object serialization for details. In addition, the objects shouldn’t contain file descriptors, references to other objects not being transferred etc., which are not valid on remote nodes.

6.5. In-memory Processing

setup and cleanup parameters to cluster can be used to initialize/de-initialize a node for running jobs of that computaiton, e.g., to manipulate transferred files, read data into memory so jobs can process data efficiently, set/unset global variables on that node.

In the example below, setup function is used to read data in a file transferred from client in to memory on each node and jobs that are executed later use the data in memory (for in-memory processing) instead of reading from file in every job. This feature works with Posix systems (Linux, OS X and other Unix variants) without limitations - any data can be assigned to variables declared global in setup, as operating system’s fork is used to create child process, which shares the address space of parent process (where setup function is executed) with copy-on-write. This feature can be used, for example, to read large amount of data in file(s) so computations (jobs) can directly access the data in memory, instead of reading same data from file each time.

Under Windows, though, fork is not available, so the global variables are serialized and passed to child process (see multiprocessing’s Programming guidelines for Windows). Thus, for example, modules can’t be loaded in global scope with setup under Windows. Moreover, as each job runs with a copy of all the global variables, initializing objects that require lot of memory may not be possible / efficient (compared to Posix systems where objects are not copied). See pycos’s distributed communicating processes for an alternate approach that doesn’t have these limitations.

The setup function in program node_setup.py below reads the data in a file (transferred with depends) in to global variable. The jobs compute checksum of that data in memory The cleanup function deletes the global variable:

# executed on each node before any jobs are scheduled
def setup(data_file):
    # read data in file to global variable
    global data, algorithms, hashlib

    import hashlib
    data = open(data_file).read()  # read file in to memory; data_file can now be deleted
    if sys.version_info.major > 2:
        data = data.encode() # convert to bytes
        algorithms = list(hashlib.algorithms_guaranteed)
    else:
        algorithms = hashlib.algorithms
    # if running under Windows, modules can't be global, as they are not
    # serializable; instead, they must be loaded in 'compute' (jobs); under
    # Posix (Linux, OS X and other Unix variants), modules declared global in
    # 'setup' will be available in 'compute'

    # 'os' module is already available (loaded by dispynode)
    if os.name == 'nt':  # remove modules under Windows
        del hashlib
    return 0

def cleanup():
    global data, algorithms, hashlib
    del data, algorithms
    if os.name != 'nt':
        del hashlib

def compute(n):
    global hashlib
    if os.name == 'nt': # Under Windows modules must be loaded in jobs
        import hashlib
    # 'data' and 'algorithms' global variables are initialized in 'setup'
    alg = algorithms[n % len(algorithms)]
    csum = getattr(hashlib, alg)()
    csum.update(data)
    return (alg, csum.hexdigest())

if __name__ == '__main__':
    import dispy, sys, functools
    # if no data file name is given, use this file as data file
    data_file = sys.argv[1] if len(sys.argv) > 1 else sys.argv[0]
    cluster = dispy.JobCluster(compute, depends=[data_file],
                               setup=functools.partial(setup, data_file), cleanup=cleanup)
    jobs = []
    for n in range(10):
        job = cluster.submit(n)
        job.id = n
        jobs.append(job)

    for job in jobs:
        job()
        if job.status == dispy.DispyJob.Finished:
            print('%s: %s : %s' % (job.id, job.result[0], job.result[1]))
        else:
            print(job.exception)
    cluster.print_status()
    cluster.close()

The above program is written to work under Posix (Linux, OS X, etc) as well as Windows. If running under Posix only, ‘computation’ can use ‘haslib’ as global variable (initialized in ‘setup’ function) to avoid loading in jobs.

6.6. Updating Globals

The example above creates global variables that can be read (but not update) in jobs. setup and cleanup can also be used to create global variables that can be updated in jobs on a node (but not by jobs in other nodes) using multiprocessing module’s Sharing state between processes. The program node_shvars.py below creates an integer shared variable that is updated by jobs running on that node:

def setup():
    import multiprocessing, multiprocessing.sharedctypes
    global shvar
    lock = multiprocessing.Lock()
    shvar = multiprocessing.sharedctypes.Value('i', 1, lock=lock)
    return 0

def cleanup():
    global shvar
    del shvar

def compute():
    import random
    r = random.randint(1, 10)
    global shvar
    shvar.value += r  # update 'shvar'; all jobs on this node will
                      # see updated value
    return shvar.value

if __name__ == '__main__':
    import dispy
    cluster = dispy.JobCluster(compute, setup=setup, cleanup=cleanup)
    jobs = []
    for n in range(10):
        job = cluster.submit()
        job.id = n
        jobs.append(job)

    for job in jobs:
        job()
        if job.status != dispy.DispyJob.Finished:
            print('job %s failed: %s' % (job.id, job.exception))
        else:
            print('%s: %s' % (job.id, job.result))
    cluster.print_status()
    cluster.close()

See pycos’s Distributed Communicating Processes for an alternate approach, where setup can be used to initialize any data that can be updated in computations without any limitations, even under Windows. With pycos, computations have to be tasks and client has to implement scheduling tasks.

6.7. Sending Files to Client

dispy_send_file (see Transferring Files) can be used to transfer file(s) to the client. Assume that computaion creates files with the parameter given (in this case n) so different runs create different files (otherwise, file(s) sent by one computation will overwrite files sent by other computations). Such files can be sent to the client with:

def compute(n):
    import time
    time.sleep(n)
    # assume that computation saves data in file n.dat
    dispy_send_file(str(n) + '.dat') # send file to client
    # ... continue further computations
    return n

if __name__ == '__main__':
    import dispy, random
    cluster = dispy.JobCluster(compute)
    jobs = []
    for i in range(20):
        job = cluster.submit(random.randint(5,20))
        job.id = i
        jobs.append(job)
    for job in jobs:
        job()
        print('job %s results in file %s' % (job.id, str(job.id) + '.dat'))

If the client needs to process the files as soon as they are transferred, Provisional/Intermediate Results feature along with callback can be used to notify the client.

6.8. Callback Processing

submit_node method and cluster_status callback can be used to schedule jobs with full control over when / which node executes a job. The program job_scheduler.py below schedules a job to compute sha1 checksum of data files whenever a processor is available:

# job computation runs at dispynode servers
def compute(path):
    import hashlib, time, os
    csum = hashlib.sha1()
    with open(os.path.basename(path), 'rb') as fd:
        while True:
            data = fd.read(1024000)
            if not data:
                break
            csum.update(data)
    time.sleep(5)
    return csum.hexdigest()

# 'cluster_status' callback function. It is called by dispy (client)
# to indicate node / job status changes. Here node initialization and
# job done status are used to schedule jobs, so at most one job is
# running on a node (even if a node has more than one processor). Data
# files are assumed to be 'data000', 'data001' etc.
def status_cb(status, node, job):
    if status == dispy.DispyJob.Finished:
        print('sha1sum for %s: %s' % (job.id, job.result))
    elif status == dispy.DispyJob.Terminated:
        print('sha1sum for %s failed: %s' % (job.id, job.exception))
    elif status == dispy.DispyNode.Initialized:
        print('node %s with %s CPUs available' % (node.ip_addr, node.avail_cpus))
    else:  # ignore other status messages
        return

    global submitted
    data_file = 'data%03d' % submitted
    if os.path.isfile(data_file):
        submitted += 1
        # 'node' and 'dispy_job_depends' are consumed by dispy;
        # 'compute' is called with only 'data_file' as argument(s)
        job = cluster.submit_node(node, data_file, dispy_job_depends=[data_file])
        job.id = data_file

if __name__ == '__main__':
    import dispy, sys, os
    cluster = dispy.JobCluster(compute, cluster_status=status_cb)
    submitted = 0
    while True:
        try:
            cmd = sys.stdin.readline().strip().lower()
        except KeyboardInterrupt:
            break
        if cmd == 'quit' or cmd == 'exit':
            break

    cluster.wait()
    cluster.print_status()

6.9. Efficient Job Submission

When a job is submitted to cluster, the arguments are kept in DispyJob structure returned. These arguments are used to resubmit the jobs if a node fails (if computation is reentrant). When large number of jobs are submitted, especially with large data in arguments, the memory required to keep arguments (at the client) may be an issue.

Note also that in many examples, jobs are kept in list and processed in sequence later, without removing jobs from the list. This is acceptable when job arguments dont’t require too much memory, but not suitable otherwise (as arguments are kept even after jobs are done). It may be better to keep jobs in a dictionary (using a unique index that is also set as job ID) and remove the job from this dictionary as soon as it is done.

The program bounded_submit.py below uses callback feature to keep the scheduler pipeline busy but not submit all jobs at once. The variables lower_bound and upper_bound control number of jobs that are scheduled at any time. These can be adjusted as suggested, or dynamically updated with either NodeAllocate or cluster_status feature.:

def compute(n):  # executed on nodes
    import time
    time.sleep(n)
    return n

# dispy calls this function to indicate change in job status
def job_callback(job): # executed at the client
    global pending_jobs, jobs_cond
    if (job.status == dispy.DispyJob.Finished  # most usual case
        or job.status in (dispy.DispyJob.Terminated, dispy.DispyJob.Cancelled,
                          dispy.DispyJob.Abandoned)):
        # 'pending_jobs' is shared between two threads, so access it with
        # 'jobs_cond' (see below)
        jobs_cond.acquire()
        if job.id: # job may have finished before 'main' assigned id
            pending_jobs.pop(job.id)
            # dispy.logger.info('job "%s" done with %s: %s', job.id, job.result, len(pending_jobs))
            if len(pending_jobs) <= lower_bound:
                jobs_cond.notify()
        jobs_cond.release()

if __name__ == '__main__':
    import dispy, threading, random

    # set lower and upper bounds as appropriate; assuming there are 30
    # processors in a cluster, bounds are set to 50 to 100
    lower_bound, upper_bound = 50, 100
    # use Condition variable to protect access to pending_jobs, as
    # 'job_callback' is executed in another thread
    jobs_cond = threading.Condition()
    cluster = dispy.JobCluster(compute, callback=job_callback)
    pending_jobs = {}
    # submit 1000 jobs
    i = 0
    while i <= 1000:
        i += 1
        job = cluster.submit(random.uniform(3, 7))
        jobs_cond.acquire()
        job.id = i
        # there is a chance the job may have finished and job_callback called by
        # this time, so put it in 'pending_jobs' only if job is pending
        if job.status == dispy.DispyJob.Created or job.status == dispy.DispyJob.Running:
            pending_jobs[i] = job
            # dispy.logger.info('job "%s" submitted: %s', i, len(pending_jobs))
            if len(pending_jobs) >= upper_bound:
                while len(pending_jobs) > lower_bound:
                    jobs_cond.wait()
        jobs_cond.release()

    cluster.wait()
    cluster.print_status()
    cluster.close()

6.10. Port Forwarding with SSH

If the nodes are on remote network and client is behind a firewall that can’t be configured to allow ports (default 51347-51349) from nodes to client, then ssh can be used for port forwarding (and security). In this case Amazon EC2 instances are used as remote nodes. To use this feature, use ssh to forward 51347 (default port used by client that the nodes send information to) with ssh -R 51347:localhost:51347 <remote-node>, perhaps to start dispynode. In the case of Amazon EC2, nodes can be configured to use public (external) IP address (in the range 54.x.x.x) in addition to local IP address (in the range 172.x.x.x). dispynode should be started to use external IP address with dispynode.py -i 54.204.242.185 so client and nodes can communicate. The client should also set its IP address to localhost (127.0.0.1) so the remote node will connect to client over ssh tunnel. sshportfw.py shows the configuration and steps:

def compute(n): # function sent to remote nodes for execution
    time.sleep(n)
    return n

if __name__ == '__main__':
    import dispy
    # list remote nodes (here Amazon EC2 instance with external IP 54.204.242.185)
    nodes = ['54.204.242.185']
    # use ssh to forward port 51347 for each node; e.g.,
    # 'ssh -R 51347:localhost:51347 54.204.242.185'

    # start dispynode on each node with 'dispynode.py -i 54.204.242.185' (so dispynode
    # uses external IP address instead of default local IP address)
    cluster = dispy.JobCluster(compute, nodes=nodes, ip_addr='127.0.0.1')
    jobs = []
    for i in range(1, 10):
        job = cluster.submit(i)
        jobs.append(job)
    for job in jobs:
        print('result: %s' % job())

6.11. Recover jobs / nodes

dispy’s cluster saves information necessary for fault-recovery of jobs and nodes in case client crashes or loses network connection to nodes after submitting jobs. Even if nodes detect that client is not reachable anymore (nodes and client periodically send pulse messages to check remote side is reachable), they will finish executing currently running jobs and save job results. These results can be retrieved at the client using dispy.recover_jobs function (see (Fault) Recover Jobs). To recover jobs (with the results, output etc.), following program can be used:

import dispy
jobs = dispy.recover_jobs()
for job in jobs:
    print('Job result: %s' % job.result)

This will wait for all running jobs (at the time client crashed) on all nodes and wait for the jobs to finish. A timeout can also be specified if necessary.

To recover nodes immediately, following program can be used:

import dispy
jobs = dispy.recover_jobs(timeout=1, terminate_pending=True)
for job in jobs:
    print('Job result: %s' % job.result)

This will wait up to a second (timeout value) and then request nodes to terminate any pending jobs (still running). The nodes then become available for new client.

6.12. Cluster Creation

Cluster creation can be customized for various use cases; some examples are:

  • cluster = dispy.JobCluster(compute, depends=[ClassA, moduleB, 'file1']) distributes ‘compute’ along with ClassA (Python object), moduleB (Python object) and ‘file1’, a file on client computer. Presumably ClassA, moduleB and file1 are needed by ‘compute’.

  • cluster = dispy.JobCluster(compute, nodes=['node20', '192.168.2.21', 'node24']) sends computation to nodes ‘node20’, ‘node24’ and node with IP address ‘192.168.2.21’. These nodes could be in different networks, as explicit names / IP addresses are listed.

  • If nodes are on remote network, then certain ports need to be forwarded as the nodes connect to the client to send status / results of jobs; see NAT/Firewall Forwarding. If port forwarding is not possible, then ssh tunneling can be used. To use this, ssh to each node with ssh -R 51347:127.0.0.1:51347 node (to possibly execute dispynode (Server) program on the node if not already running), then specify ext_ip_addr=127.0.0.1,nodes=[node] to JobCluster. If using more than one node, list them all in nodes. If client port 51347 is not usable, alternate port, say, 2345 can be forwarded with ssh -R 2345:127.0.0.1:2345 node (use JobCluster with ext_ip_addr=127.0.0.1,port=2345,nodes=[node]). See SSH Port Forwarding for more details.

  • cluster = dispy.JobCluster(compute, nodes=['192.168.2.*']) sends computation to all nodes whose IP address starts with ‘192.168.2’. In this case, it is assumed that ‘192.168.2’ is local network (since UDP broadcast is used to discover nodes in a network and broadcasting packets don’t cross networks).

  • cluster = dispy.JobCluster(compute, nodes=['192.168.3.5', '192.168.3.22',
    '172.16.11.22', 'node39', '192.168.2.*']) sends computation to nodes with IP addresses ‘192.168.3.5’, ‘192.168.3.22’, ‘172.16.11.22’ and node ‘node39’ (since explicit names / IP addresses are listed, they could be on different networks), all nodes whose IP address starts with ‘192.168.2’ (local network).

  • cluster = dispy.JobCluster(compute, nodes=['192.168.3.5', '192.168.3.*', '192.168.2.*']) In this case, dispy will send discovery messages to node with IP address ‘192.168.3.5’. If this node is running ‘dispynetrelay’, then all the nodes on that network are eligible for executing this computation, as wildcard ‘192.168.3.*’ matches IP addresses of those nodes. In addition, computation is also sent to all nodes whose IP address starts with ‘192.168.2’ (local network).

  • cluster = dispy.JobCluster(compute, nodes=['192.168.3.5', '192.168.8.20', '172.16.2.99', '*']) In this case, dispy will send discovery messages to nodes with IP address ‘192.168.3.5’, ‘192.168.8.20’ and ‘172.16.2.99’. If these nodes all are running dispynetrelay, then all the nodes on those networks are eligible for executing this computation, as wildcard * matches IP addresses of those nodes. In addition, computation is also sent to all nodes on local network (since they also match wildcard * and discovery message is broadcast on local network).

  • Assuming that 192.168.1.39 is the (private) IP address where dispy client is used, a.b.c.d is the (public) IP address of NAT firewall/gateway (that can be reached from outside) and dispynode is running at another public IP address e.f.g.h (so that a.b.c.d and e.f.g.h can communicate, but e.f.g.h can’t communicate with 192.168.1.39), cluster = dispy.JobCluster(compute, ip_addr='192.168.1.39', ext_ip_addr='a.b.c.d', nodes=['e.f.g.h']) would work if NAT firewall/gateway forwards TCP port 51347 to 192.168.1.39.

  • cluster = dispy.JobCluster(compute, secret='super') distributes ‘compute’ to nodes that also use secret ‘super’ (i.e., nodes started with dispynode.py -s super). Note that secret is used only for establishing communication, but not used to encrypt programs or code for python objects. This can be useful to prevent other users from (inadvertantly) using the nodes. If encryption is needed, SSL can be used; see below.

  • cluster = dispy.JobCluster(compute, certfile='mycert', keyfile='mykey') distributes ‘compute’ and encrypts all communication using SSL certificate stored in ‘mycert’ file and key stored in ‘mykey’ file. In this case, dispynode must also use same certificate and key; i.e., each dispynode must be invoked with dispynode --certfile="mycert" --keyfile="mykey"'

    If both certificate and key are stored in same file, say, ‘mycertkey’, they are expected to be in certfile: cluster = dispy.JobCluster(compute, certfile='mycertkey')

  • cluster1 = dispy.JobCluster(compute1, nodes=['192.168.3.2', '192.168.3.5'])
    cluster2 = dispy.JobCluster(compute2, nodes=['192.168.3.10', '192.168.3.11'])
    distribute ‘compute1’ to nodes 192.168.3.2 and 192.168.3.5, and ‘compute2’ to nodes 192.168.3.10 and 192.168.3.11. With this setup, specific computations can be scheduled on certain node(s).

6.13. MapReduce

A simple version of word count example from MapReduce:

# a version of word frequency example from mapreduce tutorial

def mapper(doc):
    # input reader and map function are combined
    import os
    words = []
    with open(os.path.join('/tmp', doc)) as fd:
        for line in fd:
            words.extend((word.lower(), 1) for word in line.split() \
                         if len(word) > 3 and word.isalpha())
    return words

def reducer(words):
    # we should generate sorted lists which are then merged,
    # but to keep things simple, we use dicts
    word_count = {}
    for word, count in words:
        if word not in word_count:
            word_count[word] = 0
        word_count[word] += count
    # print('reducer: %s to %s' % (len(words), len(word_count)))
    return word_count

if __name__ == '__main__':
    import dispy, logging
    # assume nodes node1 and node2 have 'doc1', 'doc2' etc. on their
    # local storage, so no need to transfer them
    map_cluster = dispy.JobCluster(mapper, nodes=['node1', 'node2'], reentrant=True)
    # any node can work on reduce
    reduce_cluster = dispy.JobCluster(reducer, nodes=['*'], reentrant=True)
    map_jobs = []
    for f in ['doc1', 'doc2', 'doc3', 'doc4', 'doc5']:
        job = map_cluster.submit(f)
        map_jobs.append(job)
    reduce_jobs = []
    for map_job in map_jobs:
        words = map_job()
        if not words:
            print(map_job.exception)
            continue
        # simple partition
        n = 0
        while n < len(words):
            m = min(len(words) - n, 1000)
            reduce_job = reduce_cluster.submit(words[n:n+m])
            reduce_jobs.append(reduce_job)
            n += m
    # reduce
    word_count = {}
    for reduce_job in reduce_jobs:
        words = reduce_job()
        if not words:
            print(reduce_job.exception)
            continue
        for word, count in words.iteritems():
            if word not in word_count:
                word_count[word] = 0
            word_count[word] += count
    # sort words by frequency and print
    for word in sorted(word_count, key=lambda x: word_count[x], reverse=True):
        count = word_count[word]
        print(word, count)
    reduce_cluster.print_status()