home |
install |
examples |
basic API |
support
Use execnet.MultiChannel to work with multiple channels:
>>> import execnet
>>> ch1 = execnet.makegateway().remote_exec("channel.send(1)")
>>> ch2 = execnet.makegateway().remote_exec("channel.send(2)")
>>> mch = execnet.MultiChannel([ch1, ch2])
>>> len(mch)
2
>>> mch[0] is ch1 and mch[1] is ch2
True
>>> ch1 in mch and ch2 in mch
True
>>> sum(mch.receive_each())
3
Use MultiChannel.make_receive_queue() to get a queue from which to obtain results:
>>> ch1 = execnet.makegateway().remote_exec("channel.send(1)")
>>> ch2 = execnet.makegateway().remote_exec("channel.send(2)")
>>> mch = execnet.MultiChannel([ch1, ch2])
>>> queue = mch.make_receive_queue()
>>> chan1, res1 = queue.get()
>>> chan2, res2 = queue.get(timeout=3)
>>> res1 + res2
3
Use channel callbacks if you want to process incoming data immediately and without blocking execution:
>>> import execnet
>>> gw = execnet.makegateway()
>>> ch = gw.remote_exec("channel.receive() ; channel.send(42)")
>>> l = []
>>> ch.setcallback(l.append)
>>> ch.send(1)
>>> ch.waitclose()
>>> assert l == [42]
Note that the callback function will be executed in the receiver thread and should not block or run for too long.
Use MultiChannel.make_receive_queue(endmarker) to specify an object to be put to the queue when the remote side of a channel is closed. The endmarker will also be put to the Queue if the gateway is blocked in execution and is terminated/killed:
>>> group = execnet.Group(['popen'] * 3) # create three gateways
>>> mch = group.remote_exec("channel.send(channel.receive()+1)")
>>> queue = mch.make_receive_queue(endmarker=42)
>>> mch[0].send(1)
>>> chan1, res1 = queue.get()
>>> res1
2
>>> group.terminate(timeout=1) # kill processes waiting on receive
>>> for i in range(3):
... chan1, res1 = queue.get()
... assert res1 == 42
>>> group
<Group []>
If you have multiple CPUs or hosts you can create as many gateways and then have a process sit on each CPU and wait for a task to proceed. One complication is that we want to ensure clean termination of all processes and loose no result. Here is an example that just uses local subprocesses and does the task:
import execnet
group = execnet.Group()
for i in range(4): # 4 CPUs
group.makegateway()
def process_item(channel):
# task processor, sits on each CPU
import time, random
channel.send("ready")
for x in channel:
if x is None: # we can shutdown
break
# sleep random time, send result
time.sleep(random.randrange(3))
channel.send(x*10)
# execute taskprocessor everywhere
mch = group.remote_exec(process_item)
# get a queue that gives us results
q = mch.make_receive_queue(endmarker=-1)
tasks = range(10) # a list of tasks, here just integers
terminated = 0
while 1:
channel, item = q.get()
if item == -1:
terminated += 1
print "terminated %s" % channel.gateway.id
if terminated == len(mch):
print "got all results, terminating"
break
continue
if item != "ready":
print "other side %s returned %r" % (channel.gateway.id, item)
if not tasks:
print "no tasks remain, sending termination request to all"
mch.send_each(None)
tasks = -1
if tasks and tasks != -1:
task = tasks.pop()
channel.send(task)
print "sent task %r to %s" % (task, channel.gateway.id)
group.terminate()