-
Notifications
You must be signed in to change notification settings - Fork 0
/
master.py
executable file
·68 lines (49 loc) · 2.22 KB
/
master.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#!/usr/bin/env python
from mpi4py import MPI
from mpiclass import MPIClass
import os
################################################################################
class Master(MPIClass):
#~~~~~~~~~~~~~~~~~~~~~~~~~
def __init__(self,options=None):
MPIClass.__init__(self,options)
self.iteration=0
self.niter = 10*self.comm.Get_size()
return
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def finished(self):
if self.iteration == self.niter:
return True
self.iteration += 1
return False;
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def run(self):
status = MPI.Status()
instruct = None
result = None
# execution loop, until we determine we are finished.
while not self.finished():
# get 'result' from any slave rank that is 'ready'.
result = self.comm.recv(source=MPI.ANY_SOURCE, tag=self.tags['ready'], status=status)
ready_rank = status.Get_source()
# do something useful with the result.
if result: print(result)
# send instructions to the ready rank. For this simple example
# this is just a string, but could be any pickleable data type
instruct = "step_{:05d}".format(self.iteration)
self.comm.send(instruct, dest=ready_rank, tag=self.tags['execute'])
print("Running step {} on rank {}".format(self.iteration,ready_rank))
# cleanup loop, send 'terminate' tag to each slave rank in
# whatever order they become ready.
# Don't forget to catch their final 'result'
print(" --> Finished dispatch, Terminating ranks")
requests = []
for s in range(1,self.comm.Get_size()):
result = self.comm.recv(source=MPI.ANY_SOURCE, tag=self.tags['ready'], status=status)
if result: print(result)
# send terminate tag, but no need to wait
requests.append(
self.comm.isend(None, dest=status.Get_source(), tag=self.tags['terminate']))
# OK, messages sent, wait for all to complete
MPI.Request.waitall(requests)
return