Gaudi Framework, version v23r0 |
Home | Generated: Mon Jan 30 2012 |
Public Member Functions | |
def | __init__ |
def | __del__ |
def | process |
Public Attributes | |
ncpus | |
ppservers | |
sessions | |
server | |
mode | |
pool | |
stats | |
Private Member Functions | |
def | _printStatistics |
def | _mergeStatistics |
Class to in charge of managing the tasks and distributing them to the workers. They can be local (using other cores) or remote using other nodes in the local cluster
Definition at line 101 of file Parallel.py.
def GaudiMP::Parallel::WorkManager::__init__ | ( | self, | |
ncpus = 'autodetect' , |
|||
ppservers = None |
|||
) |
Definition at line 106 of file Parallel.py.
00107 : 00108 if ncpus == 'autodetect' : self.ncpus = multiprocessing.cpu_count() 00109 else : self.ncpus = ncpus 00110 if ppservers : 00111 import pp 00112 self.ppservers = ppservers 00113 self.sessions = [ SshSession(srv) for srv in ppservers ] 00114 self.server = pp.Server(ncpus=self.ncpus, ppservers=self.ppservers) 00115 self.mode = 'cluster' 00116 else : 00117 self.pool = multiprocessing.Pool(self.ncpus) 00118 self.mode = 'multicore' 00119 self.stats = {}
def GaudiMP::Parallel::WorkManager::__del__ | ( | self ) |
Definition at line 120 of file Parallel.py.
def GaudiMP::Parallel::WorkManager::_mergeStatistics | ( | self, | |
stat | |||
) | [private] |
Definition at line 157 of file Parallel.py.
def GaudiMP::Parallel::WorkManager::_printStatistics | ( | self ) | [private] |
Definition at line 148 of file Parallel.py.
00149 : 00150 njobs = 0 00151 for stat in self.stats.values(): 00152 njobs += stat.njob 00153 print 'Job execution statistics:' 00154 print 'job count | % of all jobs | job time sum | time per job | job server' 00155 for name, stat in self.stats.items(): 00156 print ' %d | %6.2f | %8.3f | %8.3f | %s' % (stat.njob, 100.*stat.njob/njobs, stat.time, stat.time/stat.njob, name)
def GaudiMP::Parallel::WorkManager::process | ( | self, | |
task, | |||
items, | |||
timeout = 90000 |
|||
) |
Definition at line 123 of file Parallel.py.
00124 : 00125 if not isinstance(task,Task) : 00126 raise TypeError("task argument needs to be an 'Task' instance") 00127 # --- Call the Local initialialization 00128 task.initializeLocal() 00129 # --- Schedule all the jobs .... 00130 if self.mode == 'cluster' : 00131 jobs = [self.server.submit(_prefunction, (_ppfunction, task, item), (), ('GaudiMP.Parallel','time')) for item in items] 00132 for job in jobs : 00133 result, stat = job() 00134 task._mergeResults(result) 00135 self._mergeStatistics(stat) 00136 self._printStatistics() 00137 self.server.print_stats() 00138 elif self.mode == 'multicore' : 00139 start = time.time() 00140 jobs = self.pool.map_async(_ppfunction, zip([task for i in items] , items )) 00141 for result, stat in jobs.get(timeout) : 00142 task._mergeResults(result) 00143 self._mergeStatistics(stat) 00144 end = time.time() 00145 self._printStatistics() 00146 print 'Time elapsed since server creation %f' %(end-start) 00147 # --- Call the Local Finalize task.finalize()
Definition at line 106 of file Parallel.py.
Definition at line 106 of file Parallel.py.
Definition at line 106 of file Parallel.py.
Definition at line 106 of file Parallel.py.
Definition at line 106 of file Parallel.py.
Definition at line 106 of file Parallel.py.
Definition at line 106 of file Parallel.py.