The Gaudi Framework  master (594c33fa)
GaudiMP.GMPBase.Worker Class Reference
Inheritance diagram for GaudiMP.GMPBase.Worker:
Collaboration diagram for GaudiMP.GMPBase.Worker:

Public Member Functions

def __init__ (self, workerID, queues, events, params, subworkers)
 
def processConfiguration (self)
 
def Engine (self)
 
def getCheckAlgs (self)
 
def checkExecutedPassed (self, algName)
 
def isEventPassed (self)
 
- Public Member Functions inherited from GaudiMP.GMPBase.GMPComponent
def __init__ (self, nodeType, nodeID, queues, events, params, subworkers)
 
def Start (self)
 
def SetupGaudiPython (self)
 
def StartGaudiPython (self)
 
def LoadTES (self, tbufferfile)
 
def getEventNumber (self)
 
def IdentifyWriters (self)
 
def dumpHistograms (self)
 
def Initialize (self)
 
def Finalize (self)
 
def Report (self)
 

Public Attributes

 writerDict
 
 vetoAlgs
 
 eventOutput
 
 app
 
 filerecordsAgent
 
 nIn
 
 firstEvTime
 
 currentEvent
 
 tTime
 
- Public Attributes inherited from GaudiMP.GMPBase.GMPComponent
 nodeType
 
 finalEvent
 
 lastEvent
 
 log
 
 subworkers
 
 nodeID
 
 msgFormat
 
 currentEvent
 
 queues
 
 num
 
 app
 
 evcom
 
 evcoms
 
 hq
 
 fq
 
 nIn
 
 nOut
 
 stat
 
 iTime
 
 rTime
 
 fTime
 
 firstEvTime
 
 tTime
 
 proc
 
 a
 
 evt
 
 hvt
 
 fsr
 
 inc
 
 pers
 
 ts
 
 TS
 
 cntr
 

Detailed Description

Definition at line 996 of file GMPBase.py.

Constructor & Destructor Documentation

◆ __init__()

def GaudiMP.GMPBase.Worker.__init__ (   self,
  workerID,
  queues,
  events,
  params,
  subworkers 
)

Definition at line 997 of file GMPBase.py.

997  def __init__(self, workerID, queues, events, params, subworkers):
998  GMPComponent.__init__(
999  self, "Worker", workerID, queues, events, params, subworkers
1000  )
1001  # Identify the writer streams
1002  self.writerDict = self.IdentifyWriters()
1003  # Identify the accept/veto checks for each event
1004  self.acceptAlgs, self.requireAlgs, self.vetoAlgs = self.getCheckAlgs()
1005  self.log.name = "Worker-%i" % (self.nodeID)
1006  self.log.info("Worker-%i Created OK" % (self.nodeID))
1007  self.eventOutput = True
1008 

Member Function Documentation

◆ checkExecutedPassed()

def GaudiMP.GMPBase.Worker.checkExecutedPassed (   self,
  algName 
)

Definition at line 1188 of file GMPBase.py.

1188  def checkExecutedPassed(self, algName):
1189  if (
1190  self.a.algorithm(algName)._ialg.isExecuted()
1191  and self.a.algorithm(algName)._ialg.filterPassed()
1192  ):
1193  return True
1194  else:
1195  return False
1196 

◆ Engine()

def GaudiMP.GMPBase.Worker.Engine (   self)

Reimplemented from GaudiMP.GMPBase.GMPComponent.

Definition at line 1049 of file GMPBase.py.

1049  def Engine(self):
1050  # rename process
1051  import ctypes
1052 
1053  libc = ctypes.CDLL("libc.so.6")
1054  name = str(self.nodeType) + str(self.nodeID) + "\0"
1055  libc.prctl(15, ctypes.create_unicode_buffer(name), 0, 0, 0)
1056 
1057  startEngine = time.time()
1058  self.log.info("Worker %i starting Engine" % (self.nodeID))
1059  self.Initialize()
1060  self.filerecordsAgent = FileRecordsAgent(self)
1061 
1062  # populate the TESSerializer itemlist
1063  self.log.info("EVT WRITERS ON WORKER : %i" % (len(self.writerDict["events"])))
1064 
1065  nEventWriters = len(self.writerDict["events"])
1066  if nEventWriters:
1067  itemList = set()
1068  optItemList = set()
1069  for m in self.writerDict["events"]:
1070  for item in m.ItemList:
1071  hsh = item.find("#")
1072  if hsh != -1:
1073  item = item[:hsh]
1074  itemList.add(item)
1075  for item in m.OptItemList:
1076  hsh = item.find("#")
1077  if hsh != -1:
1078  item = item[:hsh]
1079  optItemList.add(item)
1080  # If an item is mandatory and optional, keep it only in the optional list
1081  itemList -= optItemList
1082  for item in sorted(itemList):
1083  self.log.info(" adding ItemList Item to ts : %s" % (item))
1084  self.ts.addItem(item)
1085  for item in sorted(optItemList):
1086  self.log.info(" adding Optional Item to ts : %s" % (item))
1087  self.ts.addOptItem(item)
1088  else:
1089  self.log.info("There is no Event Output for this app")
1090  self.eventOutput = False
1091 
1092  # Begin processing
1093  Go = True
1094  while Go:
1095  packet = self.evcom.receive()
1096  if packet:
1097  pass
1098  else:
1099  continue
1100  if packet == "FINISHED":
1101  break
1102  evtNumber, tbin = packet # unpack
1103  if self.cntr != cppyy.nullptr:
1104  self.cntr.setEventCounter(evtNumber)
1105 
1106  # subworkers are forked before the first event is processed
1107  if self.nIn == 0:
1108  self.log.info("Fork new subworkers")
1109 
1110  # Fork subworkers and share services
1111  for k in self.subworkers:
1112  k.SetServices(
1113  self.a,
1114  self.evt,
1115  self.hvt,
1116  self.fsr,
1117  self.inc,
1118  self.pers,
1119  self.ts,
1120  self.cntr,
1121  )
1122  k.Start()
1123  self.a.addAlgorithm(CollectHistograms(self))
1124  self.nIn += 1
1125  self.TS.Load(tbin)
1126 
1127  t = time.time()
1128  sc = self.a.executeEvent()
1129  if self.nIn == 1:
1130  self.firstEvTime = time.time() - t
1131  else:
1132  self.rTime += time.time() - t
1133  if sc.isSuccess():
1134  pass
1135  else:
1136  self.log.warning("Did not Execute Event")
1137  self.evt.clearStore()
1138  continue
1139  if self.isEventPassed():
1140  pass
1141  else:
1142  self.log.warning("Event did not pass : %i" % (evtNumber))
1143  self.evt.clearStore()
1144  continue
1145  if self.eventOutput:
1146  # It may be the case of generating Event Tags; hence
1147  # no event output
1148  self.currentEvent = self.getEventNumber()
1149  tb = self.TS.Dump()
1150  self.evcom.send((self.currentEvent, tb))
1151  self.nOut += 1
1152  self.inc.fireIncident(gbl.Incident("Worker", "EndEvent"))
1153  self.eventLoopSyncer.set()
1154  self.evt.clearStore()
1155  self.log.info("Setting <Last> Event")
1156  self.lastEvent.set()
1157 
1158  self.evcom.finalize()
1159  self.log.info("Worker-%i Finished Processing Events" % (self.nodeID))
1160  # Now send the FileRecords and stop/finalize the appMgr
1161  self.filerecordsAgent.SendFileRecords()
1162  self.Finalize()
1163  self.tTime = time.time() - startEngine
1164  self.Report()
1165 
1166  for k in self.subworkers:
1167  self.log.info("Join subworkers")
1168  k.proc.join()
1169 

◆ getCheckAlgs()

def GaudiMP.GMPBase.Worker.getCheckAlgs (   self)
For some output writers, a check is performed to see if the event has
executed certain algorithms.
These reside in the AcceptAlgs property for those writers

Definition at line 1170 of file GMPBase.py.

1170  def getCheckAlgs(self):
1171  """
1172  For some output writers, a check is performed to see if the event has
1173  executed certain algorithms.
1174  These reside in the AcceptAlgs property for those writers
1175  """
1176  acc = []
1177  req = []
1178  vet = []
1179  for m in self.writerDict["events"]:
1180  if hasattr(m.w, "AcceptAlgs"):
1181  acc += m.w.AcceptAlgs
1182  if hasattr(m.w, "RequireAlgs"):
1183  req += m.w.RequireAlgs
1184  if hasattr(m.w, "VetoAlgs"):
1185  vet += m.w.VetoAlgs
1186  return (acc, req, vet)
1187 

◆ isEventPassed()

def GaudiMP.GMPBase.Worker.isEventPassed (   self)
Check the algorithm status for an event.
Depending on output writer settings, the event
  may be declined based on various criteria.
This is a transcript of the check that occurs in GaudiSvc::OutputStream

Definition at line 1197 of file GMPBase.py.

1197  def isEventPassed(self):
1198  """
1199  Check the algorithm status for an event.
1200  Depending on output writer settings, the event
1201  may be declined based on various criteria.
1202  This is a transcript of the check that occurs in GaudiSvc::OutputStream
1203  """
1204  passed = False
1205 
1206  self.log.debug("self.acceptAlgs is %s" % (str(self.acceptAlgs)))
1207  if self.acceptAlgs:
1208  for name in self.acceptAlgs:
1209  if self.checkExecutedPassed(name):
1210  passed = True
1211  break
1212  else:
1213  passed = True
1214 
1215  self.log.debug("self.requireAlgs is %s" % (str(self.requireAlgs)))
1216  for name in self.requireAlgs:
1217  if self.checkExecutedPassed(name):
1218  pass
1219  else:
1220  self.log.info("Evt declined (requireAlgs) : %s" % (name))
1221  passed = False
1222 
1223  self.log.debug("self.vetoAlgs is %s" % (str(self.vetoAlgs)))
1224  for name in self.vetoAlgs:
1225  if self.checkExecutedPassed(name):
1226  pass
1227  else:
1228  self.log.info("Evt declined : (vetoAlgs) : %s" % (name))
1229  passed = False
1230  return passed
1231 
1232 
1233 # =============================================================================
1234 
1235 

◆ processConfiguration()

def GaudiMP.GMPBase.Worker.processConfiguration (   self)

Reimplemented from GaudiMP.GMPBase.GMPComponent.

Definition at line 1009 of file GMPBase.py.

1009  def processConfiguration(self):
1010  # Worker :
1011  # No input
1012  # No output
1013  # No Histos
1014  self.config["EventSelector"].Input = []
1015  self.config["ApplicationMgr"].OutStream = []
1016  if "HistogramPersistencySvc" in self.config.keys():
1017  self.config["HistogramPersistencySvc"].OutputFile = ""
1018  formatHead = "[Worker-%i]" % (self.nodeID)
1019  self.config["MessageSvc"].Format = "%-13s " % formatHead + self.msgFormat
1020 
1021  for key, lst in self.writerDict.items():
1022  self.log.info("Writer Type : %s\t : %i" % (key, len(lst)))
1023 
1024  for m in self.writerDict["tuples"]:
1025  # rename Tuple output file with an appendix
1026  # based on worker id, for merging later
1027  newName = m.getNewName(".", ".w%i." % (self.nodeID))
1028  self.config[m.key].Output = newName
1029 
1030  # Suppress INFO Output for all but Worker-0
1031  # if self.nodeID == 0 :
1032  # pass
1033  # else :
1034  # self.config[ 'MessageSvc' ].OutputLevel = ERROR
1035 
1036  if self.app == "Gauss":
1037  try:
1038  if "ToolSvc.EvtCounter" not in self.config:
1039  from Configurables import EvtCounter
1040 
1041  counter = EvtCounter()
1042  else:
1043  counter = self.config["ToolSvc.EvtCounter"]
1044  counter.UseIncident = False
1045  except Exception:
1046  # ignore errors when trying to change the configuration of the EvtCounter
1047  self.log.warning("Cannot configure EvtCounter")
1048 

Member Data Documentation

◆ app

GaudiMP.GMPBase.Worker.app

Definition at line 1036 of file GMPBase.py.

◆ currentEvent

GaudiMP.GMPBase.Worker.currentEvent

Definition at line 1148 of file GMPBase.py.

◆ eventOutput

GaudiMP.GMPBase.Worker.eventOutput

Definition at line 1007 of file GMPBase.py.

◆ filerecordsAgent

GaudiMP.GMPBase.Worker.filerecordsAgent

Definition at line 1060 of file GMPBase.py.

◆ firstEvTime

GaudiMP.GMPBase.Worker.firstEvTime

Definition at line 1130 of file GMPBase.py.

◆ nIn

GaudiMP.GMPBase.Worker.nIn

Definition at line 1107 of file GMPBase.py.

◆ tTime

GaudiMP.GMPBase.Worker.tTime

Definition at line 1163 of file GMPBase.py.

◆ vetoAlgs

GaudiMP.GMPBase.Worker.vetoAlgs

Definition at line 1004 of file GMPBase.py.

◆ writerDict

GaudiMP.GMPBase.Worker.writerDict

Definition at line 1002 of file GMPBase.py.


The documentation for this class was generated from the following file:
GaudiPython.Pythonizations.executeEvent
executeEvent
Helpers for re-entrant interfaces.
Definition: Pythonizations.py:574
DataOnDemand.Dump
bool Dump
Definition: DataOnDemand.py:35
StringKeyEx.keys
keys
Definition: StringKeyEx.py:64