Parallel Programming in Python
Submitted by sugree on Thu, 07/03/2008 - 23:19.
สืบเนื่องจากตอนลองเขียน MPI ด้วย Pythonเมื่อต้นสัปดาห์ เพื่อให้เห็นผลชัดเจนก็ลองเขียนจริงๆ จังๆ ให้มันแก้ปัญหาได้ซักข้อ ขอสัมผัสอารมณ์ ความรัก ความรู้สึกซักหน่อย แล้วก็ได้เรื่องจริงๆ โจทย์ก็อันเดิม LCS อันแสนธรรมดา
การเขียนโปรแกรมแบบขนาดที่ดีต้องเริ่มจากไม่ขนาน
#!/usr/bin/env python from numpy import array, zeros, arange class LCS: def __init__(self): self.tab = [] def __repr__(self): return repr(self.tab) def reset(self, corner, hedge, vedge): self.tab = zeros((len(hedge)+1, len(vedge)+1), dtype='int') self.tab[0][0] = corner for i,v in zip(range(len(vedge)), hedge): self.tab[i+1][0] = v for i,v in zip(range(len(hedge)), hedge): self.tab[0][i+1] = v def solve(self, corner, hedge, vedge, a, b): self.reset(corner, hedge, vedge) m, n = len(vedge), len(hedge) for i in range(1, m+1): for j in range(1, n+1): if a[j-1] == b[i-1]: self.tab[i][j] = self.tab[i-1][j-1]+1 else: self.tab[i][j] = max(self.tab[i][j-1], self.tab[i-1][j]) return self.tab[m][n] if __name__ == '__main__': import sys fd = open(sys.argv[1]) a = fd.readline().strip() b = fd.readline().strip() fd.close() solver = LCS() #solver.reset(1, range(len(a)), range(len(b))) print solver.solve(0, zeros(len(a)), zeros(len(b)), a, b) print solver
โค้ดข้างบนเขียนแบบเตรียมเอาไว้ใช้ต่อตอนขนานโดยเฉพาะ อาจจะดูแหม่งๆ ไปบ้าง ถัดมาก็ยังคงเป็นไม่ขนาน แต่เปลี่ยนไปใช้ระเบียบวิธีแบบขนานไปแล้ว แค่ยังใส่ในลูปธรรมดา
#!/usr/bin/env python import sys from math import ceil from numpy import array, zeros, arange class LCS: def __init__(self): self.tab = [] def __repr__(self): return repr(self.tab) def reset(self, corner, hedge, vedge): self.tab = zeros((len(vedge)+1, len(hedge)+1), dtype='int') self.tab[0][0] = corner for i,v in zip(range(len(vedge)), vedge): self.tab[i+1][0] = v for i,v in zip(range(len(hedge)), hedge): self.tab[0][i+1] = v def solve(self, corner, hedge, vedge, a, b): self.reset(corner, hedge, vedge) m, n = len(vedge), len(hedge) for i in range(1, m+1): for j in range(1, n+1): if a[j-1] == b[i-1]: self.tab[i][j] = self.tab[i-1][j-1]+1 else: self.tab[i][j] = max(self.tab[i][j-1], self.tab[i-1][j]) return self.tab[m][n] def get_result(self): corner = self.tab[-1][-1] hedge = array([i for i in self.tab[-1][1:]]) vedge = array([i[-1] for i in self.tab[1:]]) return corner, hedge, vedge class Store: def __init__(self, y, x): self.y = y self.x = x self.a = None self.b = None self.corner = None self.hedge = None self.vedge = None def __repr__(self): return 'Store(%d,%d,%s,%s,%s,%s,%s)' % (self.y, self.x, self.a, self.b, str(self.corner), self.hedge, self.vedge) class SequentialSolver: def __init__(self, a, b): self.a = a self.b = b self.size_x = 4 self.size_y = 4 def debug(self, msg): sys.stderr.write('#%d ' % self.rank+msg) def solve(self): #self.np = MPI.COMM_WORLD.Get_size() #self.rank = MPI.COMM_WORLD.Get_rank() self.np = 1 self.rank = 0 self.debug('ready\n') if self.rank == 0: self.master() else: self.worker() def get_store(self, y, x): key = '%d-%d' % (y, x) if key not in self.store: start_y = y*self.size_y start_x = x*self.size_x o = Store(y, x) o.a = self.a[start_x:start_x+self.size_x] o.b = self.b[start_y:start_y+self.size_y] o.corner = None o.hedge = None o.vedge = None self.store[key] = o return self.store[key] def set_store(self, o): key = '%d-%d' % (o.y, o.x) self.store[key] = o def process(self, o): solver = LCS() solver.solve(o.corner, o.hedge, o.vedge, o.a, o.b) corner, hedge, vedge = solver.get_result() c = self.get_store(o.y+1, o.x+1) c.corner = corner r = self.get_store(o.y, o.x+1) r.vedge = vedge[:len(r.b)] d = self.get_store(o.y+1, o.x) d.hedge = hedge[:len(d.a)] return c, r, d def worker(self): pass def master(self): self.store = {} self.ready = [] self.running = [] self.avaiable = range(1, self.np) m = int(ceil(float(len(self.a))/self.size_x)) n = int(ceil(float(len(self.b))/self.size_y)) for i in range(m): o = self.get_store(0, i) o.corner = 0 size = min(self.size_x, len(self.a)-i*self.size_x) o.hedge = zeros(size, dtype='int') for i in range(n): o = self.get_store(i, 0) o.corner = 0 size = min(self.size_y, len(self.b)-i*self.size_y) o.vedge = zeros(size, dtype='int') self.ready.append((0, 0)) while self.ready: y, x = self.ready.pop(0) o = self.get_store(y, x) c, r, d = self.process(o) if r.corner is not None and \ r.hedge is not None and \ r.vedge is not None and \ r.x < m and r.y < n: self.ready.append((r.y, r.x)) if d.corner is not None and \ d.hedge is not None and \ d.vedge is not None and \ d.x < m and d.y < n: self.ready.append((d.y, d.x)) print c.corner if __name__ == '__main__': import sys fd = open(sys.argv[1]) a = fd.readline().strip() b = fd.readline().strip() fd.close() solver = SequentialSolver(a, b) solver.solve()
อาจจะดูยาวไปบ้าง เน้นอ่านออก ทีนี้ก็ขนานของจริง
#!/usr/bin/env mpi4py import sys from math import ceil from mpi4py import MPI from numpy import array, zeros, arange class LCS: def __init__(self): self.tab = [] def __repr__(self): return repr(self.tab) def reset(self, corner, hedge, vedge): self.tab = zeros((len(vedge)+1, len(hedge)+1), dtype='int') self.tab[0][0] = corner for i,v in zip(range(len(vedge)), vedge): self.tab[i+1][0] = v for i,v in zip(range(len(hedge)), hedge): self.tab[0][i+1] = v def solve(self, corner, hedge, vedge, a, b): self.reset(corner, hedge, vedge) m, n = len(vedge), len(hedge) for i in range(1, m+1): for j in range(1, n+1): if a[j-1] == b[i-1]: self.tab[i][j] = self.tab[i-1][j-1]+1 else: self.tab[i][j] = max(self.tab[i][j-1], self.tab[i-1][j]) return self.tab[m][n] def get_result(self): corner = self.tab[-1][-1] hedge = array([i for i in self.tab[-1][1:]]) vedge = array([i[-1] for i in self.tab[1:]]) return corner, hedge, vedge class Store: def __init__(self, y, x): self.y = y self.x = x self.a = None self.b = None self.corner = None self.hedge = None self.vedge = None def __repr__(self): return 'Store(%d,%d,%s,%s,%s,%s,%s)' % (self.y, self.x, self.a, self.b, str(self.corner), self.hedge, self.vedge) class ParallelSolver: def __init__(self, a, b, size_x, size_y): self.a = a self.b = b self.size_x = size_x self.size_y = size_y def debug(self, msg): #sys.stderr.write('#%d ' % self.rank+msg) pass def solve(self): self.np = MPI.COMM_WORLD.Get_size() self.rank = MPI.COMM_WORLD.Get_rank() self.debug('ready\n') if self.rank == 0: self.master() else: self.worker() def get_store(self, y, x): key = '%d-%d' % (y, x) if key not in self.store: start_y = y*self.size_y start_x = x*self.size_x o = Store(y, x) o.a = self.a[start_x:start_x+self.size_x] o.b = self.b[start_y:start_y+self.size_y] o.corner = None o.hedge = None o.vedge = None self.store[key] = o return self.store[key] def set_store(self, o): key = '%d-%d' % (o.y, o.x) self.store[key] = o def process(self, o): solver = LCS() solver.solve(o.corner, o.hedge, o.vedge, o.a, o.b) return solver.get_result() def send_store(self, o, dest): self.debug('send %s to %d\n' % (o, dest)) MPI.COMM_WORLD.Send(o, dest=dest) def recv_store(self): o = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE) self.debug('recv %s\n' % o) return o def send_result(self, o, corner, hedge, vedge, dest): MPI.COMM_WORLD.Send([self.rank, o, corner, hedge, vedge], dest=dest) self.debug('send result to %d\n' % dest) def recv_result(self): ret = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE) self.debug('recv %s\n' % ret) return ret def worker(self): while 1: o = self.recv_store() if not o: break corner, hedge, vedge = self.process(o) self.send_result(o, corner, hedge, vedge, 0) def master(self): self.store = {} self.ready = [] self.running = [] self.available = range(1, self.np) m = int(ceil(float(len(self.a))/self.size_x)) n = int(ceil(float(len(self.b))/self.size_y)) for i in range(m): o = self.get_store(0, i) o.corner = 0 size = min(self.size_x, len(self.a)-i*self.size_x) o.hedge = zeros(size, dtype='int') for i in range(n): o = self.get_store(i, 0) o.corner = 0 size = min(self.size_y, len(self.b)-i*self.size_y) o.vedge = zeros(size, dtype='int') self.ready.append((0, 0)) while self.get_store(m, n).corner is None: while self.ready and self.available: y, x = self.ready.pop(0) o = self.get_store(y, x) dest = self.available.pop(0) self.send_store(o, dest) source, o, corner, hedge, vedge = self.recv_result() self.available.append(source) c = self.get_store(o.y+1, o.x+1) c.corner = corner r = self.get_store(o.y, o.x+1) r.vedge = vedge[:len(r.b)] d = self.get_store(o.y+1, o.x) d.hedge = hedge[:len(d.a)] if r.corner is not None and \ r.hedge is not None and \ r.vedge is not None and \ r.x < m and r.y < n: self.ready.append((r.y, r.x)) if d.corner is not None and \ d.hedge is not None and \ d.vedge is not None and \ d.x < m and d.y < n: self.ready.append((d.y, d.x)) for i in range(1, self.np): self.send_store(None, i) #print self.store print self.get_store(m, n).corner if __name__ == '__main__': import sys fd = open(sys.argv[1]) a = fd.readline().strip() b = fd.readline().strip() fd.close() solver = ParallelSolver(a, b, 100, 100) solver.solve()
ที่น่าประทับใจอยู่ที่บรรทัดนี้
def send_store(self, o, dest): self.debug('send %s to %d\n' % (o, dest)) MPI.COMM_WORLD.Send(o, dest=dest) def recv_store(self): o = MPI.COMM_WORLD.Recv(source=MPI.ANY_SOURCE) self.debug('recv %s\n' % o) return o
ตอนเขียน C มันยากมาก ถึงกับเบื่อ ไม่เคยคิดจะส่ง struct แต่ใน Python ดูเหมือนว่าจะใช้ Pickle ง่ายมากๆ ไม่ต้องทำอะไรพิเศษ อยากส่งก็ส่ง





Post new comment