###############################################################################
# (c) Copyright 2019-2021 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
from __future__ import absolute_import, division, print_function
import datetime
import logging
import math
import os
import re
from collections import OrderedDict
import pydot
import sys
from Gaudi.Configuration import (appendPostConfigAction, ConfigurableUser,
Configurable, INFO)
import Configurables
from Configurables import (ChronoStatSvc, Gaudi__IODataManager,
Gaudi__RootCnvSvc)
import GaudiKernel.ProcessJobOptions
from GaudiConf import IOHelper
from RawEventFormat import Raw_location_db
from . import ConfigurationError
from .components import setup_component, is_algorithm, force_location, MutableAlgorithm, Algorithm
from .control_flow import CompositeNode
from .dataflow import dataflow_config, ensure_event_prefix, DataHandle
from .packing import persistreco_version, available_persistreco_versions, default_persistreco_version
from .tonic import configurable
from GaudiConf.LbExec import InputProcessTypes
from .Algorithms import (
createODIN,
LHCb__MDFWriter,
OutputStream,
CopyInputStream,
LHCb__UnpackRawEvent,
)
from Configurables import (
LHCb__Tests__FakeEventTimeProducer as DummyEventTime,
ApplicationMgr,
HLTControlFlowMgr,
HiveDataBrokerSvc,
DataOnDemandSvc,
MessageSvc,
Gaudi__Monitoring__MessageSvcSink as MessageSvcSink,
Gaudi__Monitoring__JSONSink as JSONSink,
XMLSummarySvc,
GitANNSvc,
AlgContextSvc,
LoKiSvc,
)
from .filecontent_metadata import metainfo_repos, key_registry, key_registries # TODO: do not directly talk to key_registry -- put a function in filecontent_metadata to hide this, and import that function...
from .filecontent_metadata import retrieve_encoding_dictionary as retrieve_encoding_dictionary # noqa: F401
from DDDB.CheckDD4Hep import UseDD4Hep
if UseDD4Hep:
from Configurables import LHCb__Det__LbDD4hep__IOVProducer as IOVProducer
else:
# FIXME(NN): We shouldn't need to refer to IOV explicitly in our framework
from Configurables import LHCb__DetDesc__ReserveDetDescForEvent as reserveIOV
log = logging.getLogger(__name__)
MDF_KEY = 'MDF'
ROOT_KEY = 'ROOT'
#: Valid input/output filetypes
FILE_TYPES = {MDF_KEY, ROOT_KEY}
def _shortPath2Path(path):
res = path
if not res.startswith('/Event/'):
res = '/Event/' + res
return res
def __veto_args(veto, keys):
banned = set(veto) & keys
if banned:
raise KeyError(
f'the following keywords are not allowed as explicit arguments: {banned}'
)
@configurable
def configured_ann_svc(name='HltANNSvc',
repositories_and_versions=None,
json_file=None,
add_registry_keys=True,
**kwargs):
assert json_file is None or add_registry_keys, 'json_file specified, but registry keys are not requested to be added to the configuration -- which implies that the content of the json file is ignored'
if json_file: key_registry.add_keys_from_jsonfile(json_file)
if add_registry_keys:
if 'Overrule' not in kwargs: kwargs['Overrule'] = dict()
assert not kwargs['Overrule'].keys() & key_registry.keys()
kwargs['Overrule'].update(
{int(k, 16): v
for k, v in key_registry.items()})
if 'Overrule' in kwargs:
log.info('Overrule specified for keys {}'.format(', '.join(
"0x{:08x}".format(k) for k in kwargs['Overrule'].keys())))
else:
log.info('No Overrule specified - relying solely on repository')
if repositories_and_versions and ('Repositories' in kwargs.keys()):
raise ValueError(
'duplicate specification of Repositories: {} and {}'.format(
repositories_and_versions, kwargs))
if 'Repositories' not in kwargs.keys():
if repositories_and_versions is None:
repositories_and_versions = metainfo_repos()
kwargs['Repositories'] = repositories_and_versions
if 'RegisterWithMonitoringHub' not in kwargs.keys():
kwargs['RegisterWithMonitoringHub'] = True
kwargs['KeysToAlwaysPublish'] = [
k for k in kwargs.get('Overrule', {}).keys()
]
if 'CheckFSRForDecodingKeys' not in kwargs.keys():
kwargs['CheckFSRForDecodingKeys'] = True
return setup_component(GitANNSvc, instance_name=name, **kwargs)
def _datadependencies(algs):
def __walk(visited, top):
if top.name in visited: return
yield top
visited.add(top.name)
for handles in top.inputs.values():
handles = handles if isinstance(handles, list) else [handles]
for handle in handles:
for p in __walk(visited, handle.producer):
yield p
visited = set()
for alg in algs:
yield from __walk(visited, alg)
[docs]def mdf_writer(path,
location,
compression=0,
ignore_tae_info=True,
compression_alg="ZSTD"):
"""Return an MDF writer which writes a single TES `location` to `path`.
By default no compression is used, since `IOSvc` does not currently support
compressed inputs and also because there is already compression internally
to the "DstData" raw banks (which dominate when no detector raw banks are
written out). See https://gitlab.cern.ch/lhcb/LHCb/-/issues/163.
"""
return LHCb__MDFWriter(
BankLocation=location,
Compress=compression,
CompressAlg=compression_alg,
IgnoreTAE=ignore_tae_info,
Connection=path)
[docs]def online_writer(location, stream, enable_tae=False):
"""Return an OnlineAlg writer which writes a single TES `location`."""
from PyConf.Algorithms import Online__OutputAlg, Online__RawEventToBanks
tae_properties = {}
if enable_tae:
tae_properties = dict(
UseTAELeaves=True, # Enable writing of TAE output
BankDirectory="/Event/Banks",
CentralFromRawData=True,
# See the Hlt2Calib_TAE line in Moore/Hlt/Hlt2Conf/python/Hlt2Conf/lines/calibration/calibration.py
TAEHalfWindow=force_location("/Event/Banks/TAEHalfWindowOutput"),
)
raw_data = Online__RawEventToBanks(RawEvent=location).RawData
return Online__OutputAlg(
name=f"EventOutput_{stream}",
RawData=raw_data,
RawGuard=force_location("/Event/Banks/RawDataGuard"),
**tae_properties,
)
[docs]def root_writer(path, locations):
"""Return a ROOT/DST writer which writes TES `locations` to `path`."""
locations = [format_output_location(l) for l in locations]
return OutputStream(
OptItemList=locations,
Output="DATAFILE='{}' SVC='Gaudi::RootCnvSvc' OPT='RECREATE'".format(
path),
)
_ApplicationOptions_locked = False
_ApplicationOptions_lockedSlots = {}
[docs]class ApplicationOptions(ConfigurableUser):
"""Holder for application configuration.
Configuration can be mutated until `.finalize()` is called. At that
point any dynamic defaults based on other properties are resolved.
"""
# FIXME(RM) we can improve upon the base class ConfigurableUser by creating
# a new base clase that
# - it more pythonic (e.g. no __slots__, __doc__)
# - does better type checking of the properties
# - makes `c.prop` work if prop hasn't been set
# - supports namespaced/grouped options
# - forbids a subsequent instantiation of ApplicationOptions such that one
# has to do `from ... import app` rather than
# `from ... import ApplicationOptions; ApplicationOptions().xxx`
# - nicer printout of the options
# - by default do not overwrite, i.e.
# opts.evt_max = 1; opts.evt_max = 2; assert opts.evt_max == 1;
# opts.evt_max.force(3); assert opts.evt_max == 3
# OR
# opts.evt_max.set_if_not_set(x) OR opts.evt_max.default(x)
# - ...
def __setattr__(self, attr, val):
if attr in _ApplicationOptions_lockedSlots:
raise ConfigurationError(
"Can't change property {}! It was locked in: {}".format(
attr, _ApplicationOptions_lockedSlots[attr]))
super(ApplicationOptions, self).__setattr__(attr, val)
[docs] def lockOption(self, option):
"""make a specific option entry immutable
Args:
option (str): name of option entry to lock
"""
if option not in self.__slots__:
raise ConfigurationError(
"Trying to lock unkown option {}".format(option))
global _ApplicationOptions_lockedSlots
if option in _ApplicationOptions_lockedSlots:
raise ConfigurationError(
"Property {} was already locked in: {}".format(
option, _ApplicationOptions_lockedSlots[option]))
try:
import inspect
doc = inspect.stack()[1][0].f_code.co_filename
except:
doc = "callsite couldn't be determined"
_ApplicationOptions_lockedSlots[option] = doc
__slots__ = {
# input related
'input_files': [], # type is list of str
'input_type': '', # TODO use enum
'input_raw_format': 0.5,
'input_process': '',
'input_stream': '',
'evt_max': -1,
'first_evt': 0,
'print_freq': 10000,
'ioalg_buffer_nb_events': 40,
'mdf_ioalg_name': 'IOAlgFileRead',
'root_ioalg_name': 'RootIOAlg',
'root_ioalg_opts': {},
# conditions for processing
'data_type': 'Upgrade',
'dddb_tag': '', # To Be Dropped once DetDesc support is gone
'conddb_tag': '',
'geometry_backend': "DD4Hep" if UseDD4Hep else "DetDesc",
'geometry_version': '',
'conditions_version': '',
'simulation': None,
# output related
'output_file': '',
'output_type': '',
'histo_file': '',
'ntuple_file': '',
'output_level': INFO,
'python_logging_level': logging.INFO,
# - dump monitoring entities (counters, histograms, etc.)
'monitoring_file': '',
# multithreaded processing
'n_threads': 1,
'event_store': 'HiveWhiteBoard', # one of EvtStoreSvc, HiveWhiteBoard
'n_event_slots': -1, # defaults to 1.2 * n_threads
# if false scheduler calls Algorithm::execute instead of Algorithm::sysExecute,
# which breaks some non-functional algorithms
'scheduler_legacy_mode': True,
# estimated size of the per-event memory pool. set to zero to disable the pool.
'memory_pool_size': 10 * 1024 * 1024,
# TODO such defaults can be expressed here explicitly
# debugging
# Control flow file name; not generated if empty
'control_flow_file': '',
# Data flow file name; not generated if empty
'data_flow_file': '',
# Output manifest of TES contents
'output_manifest_file': '',
# where do the decoding keys go?
'append_decoding_keys_to_output_manifest': True,
# write decoding keys to local git repository in builda area
'write_decoding_keys_to_git': True,
# require that the configuration matched the unique keys specified.
'require_specific_decoding_keys': [],
# input manifest of TES contents -- only needed when running passthrough to produce the output manifest, and/or as source of decoding keys...
'input_manifest_file': '',
# Algorithms to be executed before the main control flow.
# Could be used for controlling profiling, e.g. CallgrindProfile, PerfProfile.
'preamble_algs': [],
'msg_svc_format': '% F%35W%S %7W%R%T %0W%M',
'msg_svc_time_format': '%Y-%m-%d %H:%M:%S UTC',
# Moore-specific options
# TODO move to a Moore-specific ApplicationOptions class
'lines_maker': None,
# Optional Auditors. Disabled (empty) by default.
'auditors': [],
# phoenix file name; not generated if empty
'phoenix_filename': '',
# version of persistreco of input, output is always latest (default is latest, unless otherwise set in a bind)
'persistreco_version': default_persistreco_version(),
# in case GaudiPython is used, set this to true in order to change
# the behavior of the scheduler accordingly
'gaudipython_mode': False
}
_propertyDocDct = {
'input_type':
'the type of I/O. One of None, Online, MDF or ROOT.',
'input_raw_format':
('sets the expected raw input format (splitting)'
'See definitions at '
'https://gitlab.cern.ch/lhcb-datapkg/RawEventFormat/blob/master/python/RawEventFormat/__init__.py'
),
'input_stream':
'stream name. Default is empty',
'geometry_backend':
'Backend to be used for the geometry, can be DD4Hep or DetDesc. Use "NONE" for disabling the geometry',
'ioalg_buffer_nb_events':
'number of events to pre-fetch, the default value (20) is reasonable for HLT2/Analysis. It needs to be increased for HLT1, typically to 20000',
'mdf_ioalg_name':
'name of the ioalg to be used when input_type is MDF. Choices currently are IOAlgMemoryMap and IOAlgFileRead, the later being the default. Note that the former supposes the input files are local',
'root_ioalg_name':
'name of the ioalg to be used when input_type is ROOT. Choices currently are RootIOAlg and RootIOAlgExt, the former being the default. Note that the later shold not be used other than when using CopyInputStream until it\'s fixed',
'root_ioalg_opts':
'dictionnary of properties to be passed to the RootIOAlg at construction time',
'event_store':
'Event store implementation: HiveWhiteBoard (default) or EvtStoreSvc (faster).',
'auditors':
'Define list of auditors to run. Possible common choices include "NameAuditor", "MemoryAuditor" or "ChronoAuditor". For a full list see Gaudi documentation.',
'phoenix_filename':
' define the file where phoenix event data are writen. Defaults to none = no phoenix output'
}
if not UseDD4Hep:
__slots__["velo_motion_system_yaml"] = ""
_propertyDocDct[
"velo_motion_system_yaml"] = "path to a VP/Motion.yml directory from a Git CondDB clone"
def _validate(self):
"""Raise an exception if the options are not consistent."""
# TODO validate separately the case of input_type == "NONE"
# since now tags are not used in that case.
# Configuration is ill-defined without these properties
required = ["input_type"]
if self.getProp("geometry_backend") == "DD4Hep":
required.extend(["geometry_version", "conditions_version"])
elif self.getProp("geometry_backend") == "DetDesc":
required.extend(["dddb_tag", "conddb_tag"])
# Check simulation flag has been explicitly set
if not self.isPropertySet("simulation"):
raise ConfigurationError(
"Required option simulation must be set to True or False as appropriate"
)
if self.getProp("geometry_backend") == "DD4Hep":
if not self.geometry_version and self.dddb_tag:
log.warning(
"Required option geometry_version not set, using the value from dddb_tag"
)
self.geometry_version = self.dddb_tag
if not self.conditions_version and self.conddb_tag:
log.warning(
"Required option conditions_version not set, using the value from conddb_tag"
)
self.conditions_version = self.conddb_tag
not_set = [attr for attr in required if not getattr(self, attr)]
if not_set:
raise ConfigurationError(
"Required options not set: {}".format(not_set))
# TODO check if input raw_format makes sense for Online
if self.output_file and self.output_type != "MDF":
assert self.n_event_slots == 1, 'Cannot write output other than MDF multithreaded'
assert self.n_threads == 1, 'Cannot write output other than MDF multithreaded'
assert self.output_type in FILE_TYPES, (
'Output filetype not supported: {}'.format(self.output_type))
# make sure the right version of persistency locations is set
if self.persistreco_version != default_persistreco_version():
available_versions = available_persistreco_versions()
if self.persistreco_version not in available_versions:
raise ConfigurationError(
f"Requested persistency version (of input) '{self.persistreco_version}' is not known, available options are {available_versions}"
)
persistreco_version.global_bind(version=self.persistreco_version)
def _set_defaults(self):
if self.n_event_slots <= 0:
self.n_event_slots = (math.ceil(1.2 * self.n_threads)
if self.n_threads > 1 else 1)
[docs] def finalize(self):
"""Finalize configuration and prevent further changes."""
global _ApplicationOptions_locked
if _ApplicationOptions_locked:
return
_ApplicationOptions_locked = True
# Never filter messages in the root logger's handler.
# A non-root logger can thus have a lower level if set explicitly.
GaudiKernel.ProcessJobOptions.GetConsoleHandler().enable(0)
# applyConfigurableUsers() is too loud with info(), so silence it
if '--debug' not in sys.argv:
GaudiKernel.Configurable.log.setLevel(logging.WARNING)
# If python_logging_level is set, pass it to the root logger.
# Otherwise, gaudirun.py would set INFO or DEBUG (with --debug).
if self.isPropertySet('python_logging_level'):
logging.getLogger().setLevel(self.getProp('python_logging_level'))
# Workaround ConfigurableUser limitation
for name, default in self.getDefaultProperties().items():
if not self.isPropertySet(name):
self.setProp(name, default)
self._set_defaults()
self._validate()
if self.python_logging_level <= logging.INFO:
print(self, flush=True)
[docs] def set_conds_from_testfiledb(self, key):
"""Set DDDB and CondDB tags, data type and simulation flag
from a TestFileDB entry.
Args:
key (str): Key in the TestFileDB.
"""
from PRConfig.TestFileDB import test_file_db
qualifiers = test_file_db[key].qualifiers
self.data_type = qualifiers['DataType']
self.simulation = qualifiers['Simulation']
self.dddb_tag = qualifiers['DDDB']
self.conddb_tag = qualifiers['CondDB']
if "GeometryVersion" in qualifiers:
self.geometry_version = qualifiers["GeometryVersion"]
if "ConditionsVersion" in qualifiers:
self.conditions_version = qualifiers["ConditionsVersion"]
[docs] def applyConf(self):
# we should not do anything here
raise NotImplementedError(
'The {} configurable should not be called'.format(
self.__class__.__name__))
[docs] def getProperties(self):
"""Stable-order override of Configurable.getProperties."""
props = super(ApplicationOptions, self).getProperties()
return OrderedDict(sorted(props.items()))
### Singleton on Root and MDF IOAlg in case root/mdf is used as input
_mdfIOAlgSingleton = None
_rootIOAlgSingleton = None
[docs]def create_or_reuse_mdfIOAlg(location, options):
"""
creates a MDFIOAlg instance if it does not alredy exists
"""
global _mdfIOAlgSingleton
if _mdfIOAlgSingleton is None:
# create the MDF IOAlg if needed
_mdfIOAlgSingleton = Algorithm(
getattr(Configurables, options.mdf_ioalg_name),
name="MDFIOAlg",
outputs={
'RawEventLocation': force_location(location),
'EventBufferLocation': None
},
Input=options.input_files,
BufferNbEvents=options.ioalg_buffer_nb_events,
NSkip=options.first_evt)
else:
if _mdfIOAlgSingleton.RawEventLocation.location != ensure_event_prefix(
location):
raise ValueError(
'Same input was required twice from MDF, mapping to different locations : \'%s\' and \'%s\''
% (location, _mdfIOAlgSingleton.RawEventLocation))
return _mdfIOAlgSingleton
def input_from_mdf_file(location,
propertyName=None,
options=None,
forced_type=None):
# some safety checks
if propertyName is not None and propertyName != 'RawEventLocation':
raise ValueError(
'Unsupported input %s from an MDF file. Only RawEventLocation is supported'
% propertyName)
if options is None:
raise ValueError(
"No options have been passed, so MDFIOAlg cannot be created.")
return create_or_reuse_mdfIOAlg(location, options).RawEventLocation
[docs]def create_or_reuse_rootIOAlg(options):
"""
creates a RootIOAlg instance if it does not alredy exists
"""
global _rootIOAlgSingleton
if _rootIOAlgSingleton is None:
# check we have options
if options is None:
raise ValueError(
"No options have been passed, so RootIOAlg cannot be created. Pass options or call create_or_reuse_rootIOAlg directly"
)
# create RootIOAlg if needed
_rootIOAlgSingleton = MutableAlgorithm(
getattr(Configurables, options.root_ioalg_name),
name="RootIOAlg",
Input=options.input_files,
BufferNbEvents=options.ioalg_buffer_nb_events,
NSkip=options.first_evt,
EventBranches=[],
# FIXME : should not be set (and feature should be dropped) but this requires that
# algos do not require data they do not use (using getIfExist to check there was nothing there)
AllowMissingInput=True,
**options.root_ioalg_opts)
return _rootIOAlgSingleton
[docs]def all_nodes_and_algs(node, recurse_algs=False):
"""Return the list of all reachable nodes and algorithms."""
if not isinstance(node, CompositeNode):
raise TypeError('{} is not of type CompositeNode'.format(node))
# use OrderedDict as there is no OrderedSet is the standard library
nodes = OrderedDict()
algs_in_cf = OrderedDict()
_all_nodes_and_algs(node, nodes, algs_in_cf)
if recurse_algs:
algs = sorted(
set().union(*(alg.all_producers() for alg in algs_in_cf)),
key=lambda a: a.id)
else:
algs = list(algs_in_cf)
return list(nodes), algs
def _all_nodes_and_algs(node, nodes, algs):
if node in nodes:
return
nodes[node] = None
for c in node.children:
if isinstance(c, CompositeNode):
_all_nodes_and_algs(c, nodes, algs)
elif is_algorithm(c):
algs[c] = None
else:
raise TypeError(
'Child {!r} of {!r} is neither a CompositeNode nor an '
'Algorithm'.format(c, node))
[docs]class ComponentConfig(dict):
"""Object holding the configuration of Gaudi Configurables."""
def add(self, component):
self[component.getFullName()] = component
return component
[docs] def update(self, other):
for c in other.values():
self.add(c)
def make_raw_event_with_Online(location, bank_type="ALL", forced_type=None):
from PyConf.Algorithms import Online__InputAlg, Online__BanksToRawEvent
# TODO what if we're called twice with a different location?
tae = location.removeprefix("/Event").replace("DAQ/RawEvent", "").replace(
"/", "")
def transform(RawData, RawGuard, DAQErrors, **outputs):
dirs = set(os.path.dirname(o) for o in outputs.values())
assert len(dirs) == 1, str(outputs)
return {
"BankDirectory": dirs.pop(),
"RawData": RawData,
"RawGuard": RawGuard,
"DAQErrors": DAQErrors,
"ExtraOutputs": list(outputs.values())
}
if not location.startswith("/Event"):
location = "/Event/" + location
raw_data_loc = re.sub(r"/Event/(Prev|Next)[0-9]+", "/Event", location)
tae_locs = {
"TAEHalfWindow": force_location("/Event/Banks/TAEHalfWindow"),
}
tae_locs |= {
f"Prev{i}": force_location(f"/Event/Banks/Prev{i}")
for i in range(10)
}
tae_locs |= {
f"Next{i}": force_location(f"/Event/Banks/Next{i}")
for i in range(10)
}
alg = Online__InputAlg(
name="Online__InputAlg",
outputs={
"RawData": force_location(raw_data_loc),
"RawGuard": force_location("/Event/Banks/RawDataGuard"),
"DAQErrors": None,
} | tae_locs,
output_transform=transform,
DeclareData=True, # output RawGuard
DeclareEvent=True, # output RawData (incl. TAE crossings)
DeclareErrors=False,
)
return Online__BanksToRawEvent(
name="Online__BanksToRawEvent_" + bank_type + tae,
RawData=getattr(alg, tae or "RawData"),
BankType=bank_type).RawEvent
[docs]@configurable
def default_raw_event(bank_type, raw_event_format, maker, stream=''):
"""Return a raw event that contains a given bank.
Args:
bank_type: Required raw bank type
raw_event_format: RawEventFormat key in `Raw_location_db`.
maker: Maker of input data
Returns:
DataHandle: Raw event containing banks of the requested types.
"""
if raw_event_format is None:
raise ValueError('raw_event_format is required (or must be bound)')
if stream != '':
location = f'/Event/{stream}/RawEvent'
else:
raw_bank_locations = Raw_location_db[raw_event_format]
if bank_type:
location = raw_bank_locations.get(
bank_type,
list(raw_bank_locations.values())[0])
if type(location) == list:
if len(location) == 1:
location = location[0]
else:
raise ValueError(
'Requested raw bank ({}) points to multiple locations ({}) '
'for raw event format {}. Restrict raw bank types or merge raw '
'events manually.'.format(bank_type, location,
raw_event_format))
else:
location = list(raw_bank_locations.values())[0]
return maker(location, forced_type="LHCb::RawEvent")
[docs]@configurable
def default_raw_banks(bank_type, tae=0, **kwargs):
'''
retrieves RawBank of given type
if tae is not 0, actually retrieves the bank from Prev<x> or Next<x> event
depending on sign of tae and with x being absolute value of tae
'''
raw_event = default_raw_event(bank_type)
if 'name' not in kwargs: kwargs['name'] = f'UnpackRawEvent_{bank_type}'
# manage TAE case
taePath = ''
if tae != 0:
taeName = f"Next{tae}" if tae > 0 else f"Prev{-tae}"
taePath = f"/{taeName}"
kwargs['name'] += "-" + taeName
def output_transform(RawBankLocations):
return {"RawBankLocations": [RawBankLocations]}
__veto_args({'RawEventLocation', 'BankTypes', 'RawBankLocations'},
kwargs.keys())
# Note: the aim is to have a standardized location for all RawBanks, independent of where the RawEvent came from,
# from this point onwards
return LHCb__UnpackRawEvent(
RawEventLocation=raw_event,
BankTypes=[bank_type],
output_transform=output_transform,
outputs={
'RawBankLocations':
force_location(f'/Event/RawBanks/{taePath}{bank_type}')
},
**kwargs,
).RawBankLocations
@configurable
def make_odin(make_raw=default_raw_banks, tae=0, **kwargs):
if 'name' not in kwargs: kwargs['name'] = 'Decode_ODIN'
if tae != 0:
taeName = '' if tae == 0 else f"Next{tae}" if tae > 0 else f"Prev{-tae}"
kwargs['name'] += "-" + taeName
__veto_args({'RawBanks'}, kwargs.keys())
return createODIN(RawBanks=make_raw('ODIN', tae=tae), **kwargs).ODIN
# TODO get rid of magic initial time (must go to configure_input)
INITIAL_TIME = 1433509200
@configurable
def make_fake_event_time(start=INITIAL_TIME, step=0):
# when running with in simulation mode we have to feed "reserveIOV"
# and event time that is more realistic than what we have in the simulated data (0)
odin_loc = '/Event/DAQ/DummyODIN'
return setup_component(
DummyEventTime,
"DummyEventTime",
Start=start,
Step=step,
ODIN=odin_loc)
[docs]@configurable
def make_callgrind_profile(start=10,
stop=90,
dump=90,
dumpName='CALLGRIND-OUT'):
"""Return algorithm that allows to use callgrind profiling.
Pass it to the application options with
``preamble_algs=[make_callgrind_profile()]``.
For more info see CodeAnalysisTools_.
.. _CodeAnalysisTools: https://twiki.cern.ch/twiki/bin/view/LHCb/CodeAnalysisTools
Args:
start: Event count at which callgrind starts profiling.
Defaults to 10 events.
stop: Event count at which callgrind stops profiling.
Defaults to 90 events.
dump: Event count at which callgrind dumps results.
Defaults to 90 events.
dumpName: Name of dump.
Defaults to 'CALLGRIND-OUT'.
"""
from PyConf.Algorithms import CallgrindProfile
return CallgrindProfile(
StartFromEventN=start,
StopAtEventN=stop,
DumpAtEventN=dump,
DumpName=dumpName)
def assert_empty_dataondemand_service():
assert DataOnDemandSvc().AlgMap == {}, DataOnDemandSvc().AlgMap
assert DataOnDemandSvc().NodeMap == {}, DataOnDemandSvc().NodeMap
def configure(options,
control_flow_node,
public_tools=[],
make_odin=make_odin,
make_fake_odin=make_fake_event_time):
options.finalize()
config = ComponentConfig()
nodes, algs = all_nodes_and_algs(control_flow_node)
# The following components are setup outside of the normal control and data flow. Thus, the automatic dependence evaluation in python does not work.
# As for data, ODIN is required to get the time and the correspoding IOV, let's add ODIN unconditionally to the list of producer algorithms,
# then the HiveDataBrokerSvc can take care of it. No input->no odin
if options.input_type != "NONE" and not options.simulation:
algs = algs + [make_odin().producer]
configuration = dataflow_config()
barriers = sorted(
set(i.fullname for i in _datadependencies(algs) if i.is_barrier))
for alg in algs:
configuration.update(alg.configuration())
for tool in public_tools:
configuration.update(tool.configuration())
configurable_algs, configurable_tools = configuration.apply()
del configuration
has_velo_ms_override = not UseDD4Hep and options.velo_motion_system_yaml
if options.simulation or options.input_type == "NONE":
fake_odin = make_fake_odin()
configurable_algs += [fake_odin]
odin_loc = fake_odin.ODIN.path()
else:
odin_loc = make_odin().location
if UseDD4Hep:
configurable_algs += [
setup_component(
IOVProducer,
"ReserveIOVDD4hep",
ODIN=odin_loc,
)
]
else:
if has_velo_ms_override:
from Configurables import LHCb__DetDesc__VeloMotionSystemFromYaml as YamlMS
configurable_algs += [
setup_component(
YamlMS,
"MotionSystemFromYaml",
YAMLPath=options.velo_motion_system_yaml,
ODIN=make_odin().location,
ExtraOutputs=[
"VeloMotionSystemOverrideFromYAML"
], # this is used to enforce an order between YamlMS and reserveIOV
)
]
configurable_algs += [
setup_component(
reserveIOV,
"reserveIOV",
ODIN=odin_loc,
IOVLock="/Event/IOVLock",
ExtraInputs=(["VeloMotionSystemOverrideFromYAML"]
if has_velo_ms_override else []),
)
]
whiteboard = config.add(
setup_component(
options.event_store,
instance_name='EventDataSvc',
EventSlots=options.n_event_slots,
ForceLeaves=True))
# TODO: Remove use of getattr once ApplicationOptions is removed
end_event_incident = ""
if getattr(options, "xml_summary_file", None):
end_event_incident = "FinishedProcessingSomeEvent"
scheduler = config.add(
setup_component(
HLTControlFlowMgr,
CompositeCFNodes=[node.represent() for node in nodes],
MemoryPoolSize=options.memory_pool_size,
ThreadPoolSize=options.n_threads,
EndEventIncident=end_event_incident,
EnableLegacyMode=options.scheduler_legacy_mode,
BarrierAlgNames=barriers,
PreambleAlgs=options.preamble_algs,
GaudiPythonMode=options.gaudipython_mode))
appMgr = config.add(
ApplicationMgr(OutputLevel=options.output_level, EventLoop=scheduler))
appMgr.ExtSvc.insert(
0, whiteboard
) # FIXME this cannot work when configurables are not singletons
# TODO: Remove use of getattr once ApplicationOptions is removed
if getattr(options, "event_timeout", None):
from Configurables import StalledEventMonitor
appMgr.StalledEventMonitoring = True
config.add(
setup_component(
StalledEventMonitor,
EventTimeout=options.event_timeout,
StackTrace=True,
MaxTimeoutCount=1,
))
if options.auditors:
from Configurables import AuditorSvc
config.add(setup_component(AuditorSvc, Auditors=options.auditors))
appMgr.ExtSvc += [AuditorSvc()]
appMgr.AuditAlgorithms = True
if not any(a for a in options.auditors if a == 'ChronoAuditor' or (
isinstance(a, Configurable) and a.getType() == 'ChronoAuditor')):
# Turn off most output from ChronoStatSvc
# Even if it is not explicitly configured, some components will
# instantiate it (like DetailedMaterialLocator) causing some output.
ChronoStatSvc().ChronoPrintOutTable = False
ChronoStatSvc().PrintUserTime = False
if not UseDD4Hep:
from Configurables import UpdateManagerSvc
config.add(setup_component(UpdateManagerSvc, WithoutBeginEvent=True))
config.add(
setup_component(HiveDataBrokerSvc, DataProducers=configurable_algs))
# for LoKi
config.add(setup_component(AlgContextSvc, BypassIncidents=True))
config.add(setup_component(LoKiSvc, Welcome=False))
# ANNSvc for encoding/decoding
hltANNSvc = configured_ann_svc(json_file=options.input_manifest_file)
config.add(hltANNSvc)
appMgr.ExtSvc += [hltANNSvc]
# configure message service
config.add(
setup_component(
MessageSvc,
Format=options.msg_svc_format,
timeFormat=options.msg_svc_time_format))
appMgr.ExtSvc += [MessageSvcSink()]
if not UseDD4Hep:
from Configurables import EventClockSvc
config.add(
setup_component(
EventClockSvc, InitialTime=int(INITIAL_TIME * 1e9)))
# configure the monitoring sink that writes to a file
if options.monitoring_file:
sink = setup_component(JSONSink, FileName=options.monitoring_file)
config.add(sink)
appMgr.ExtSvc += [sink]
# TODO: Remove use of getattr once ApplicationOptions is removed
if getattr(options, "input_process", None):
if options.input_process == InputProcessTypes.Hlt2:
fsr_sink = setup_component(
"LHCb__FSR__Sink",
instance_name="FileSummaryRecord",
AcceptRegex=
r"^LumiCounter\.eventsByRun$|^HltANNSvc\.DecodingKeys$")
config.add(fsr_sink)
appMgr.ExtSvc.append(fsr_sink)
# TODO: Remove use of getattr once ApplicationOptions is removed
if getattr(options, "xml_summary_file", None):
summary_svc = setup_component(
XMLSummarySvc,
xmlfile=options.xml_summary_file,
EndEventIncident=end_event_incident)
config.add(summary_svc)
appMgr.ExtSvc.append(summary_svc)
# TODO: Remove use of getattr once ApplicationOptions is removed
if getattr(options, "compression", None):
from Configurables import RootCnvSvc
from GaudiKernel.Configurable import ConfigurableGeneric
config.add(
setup_component(RootCnvSvc,
**options.compression.as_gaudi_config()))
component = config.add(
setup_component(ConfigurableGeneric, instance_name="RFileCnv"))
for k, v in options.compression.as_gaudi_config().items():
setattr(component, k, v)
# Check there is no collision between the histogram and the ntuple file
# they need to be different to avoid the writer of Histogram (a Sink) and
# the write of NTuples (still the HistogramPersistencySvc for now) to step
# on each other feet
if options.histo_file and options.histo_file == options.ntuple_file:
raise Exception(
"Histogram and NTuple files need to be different. Here they are both set to '%s'"
% options.histo_file)
if options.histo_file:
from Configurables import Gaudi__Histograming__Sink__Root
root_sink = setup_component(
Gaudi__Histograming__Sink__Root, FileName=options.histo_file)
config.add(root_sink)
appMgr.ExtSvc.append(root_sink)
if options.ntuple_file:
from Configurables import NTupleSvc, HistogramPersistencySvc
config.add(
setup_component(
HistogramPersistencySvc, OutputFile=options.ntuple_file))
config.add(ApplicationMgr(HistogramPersistency="ROOT"))
ntuple_svc = setup_component(
NTupleSvc,
Output=[
"FILE1 DATAFILE='{}' TYPE='ROOT' OPT='NEW'".format(
options.ntuple_file)
])
config.add(ntuple_svc)
appMgr.ExtSvc.append(ntuple_svc)
# set the Phoenix Sink output filename (same as histo_name above)
if options.phoenix_filename:
from Configurables import LHCb__Phoenix__Sink
phoenix_sink = setup_component(
LHCb__Phoenix__Sink, FileName=options.phoenix_filename)
config.add(phoenix_sink)
appMgr.ExtSvc.append(phoenix_sink)
if options.control_flow_file:
fn_root, fn_ext = os.path.splitext(options.control_flow_file)
plot_control_flow(
control_flow_node, filename=fn_root, extensions=[fn_ext[1:]])
if options.data_flow_file:
fn_root, fn_ext = os.path.splitext(options.data_flow_file)
plot_data_flow(algs, filename=fn_root, extensions=[fn_ext[1:]])
# keys have to either go to git, or to the output manifest file...
if options.output_file and not options.write_decoding_keys_to_git and not (
options.output_manifest_file
and options.append_decoding_keys_to_output_manifest):
log.warning(
'you have specified an output file, but also blocked any writing of decoding keys -- as a result, you will likely not be able to decode the data written'
)
if options.output_type == 'ROOT':
output_iohelper = IOHelper(None, options.output_type or None)
output_iohelper.setupServices()
if 'StoreOldFSRs' in options.root_ioalg_opts and options.root_ioalg_opts[
'StoreOldFSRs']:
# needed to handle old FSRs. FIXME : to be removed when old FSRs are gone
from Configurables import Gaudi__RootCnvSvc
from Gaudi.Configuration import (FileRecordDataSvc, PersistencySvc)
# Set up the FileRecordDataSvc
frSvc = FileRecordDataSvc(
ForceLeaves=True,
EnableFaultHandler=True,
RootCLID=1, #was the next line missed accidentally?
PersistencySvc="PersistencySvc/FileRecordPersistencySvc")
config.add(frSvc)
appMgr.ExtSvc.append(frSvc)
fileSvc = Gaudi__RootCnvSvc("FileRecordCnvSvc")
# The FSRs are trees with only one entry per branch
fileSvc.MinBufferSize = 512
fileSvc.ApproxEventsPerBasket = 1
fileSvc.ShareFiles = "YES"
config.add(fileSvc)
appMgr.ExtSvc.append(fileSvc)
PersistencySvc("FileRecordPersistencySvc").CnvServices += [fileSvc]
# if we generated _or read_ any keys, flush them to git (if so requested)
# propagate any keys from the input manifest
if options.input_manifest_file:
key_registry.add_keys_from_jsonfile(options.input_manifest_file)
if options.write_decoding_keys_to_git:
for kr in key_registries.values():
kr.flush_to_git()
# Check that the configuration creates the decoding keys we expect to use.
if len(options.require_specific_decoding_keys) > 0:
if not (isinstance(options.require_specific_decoding_keys[0], str)):
raise ConfigurationError(
"Please provide a non-zero list of keys as strings.")
if set(options.require_specific_decoding_keys) != set(
key_registry.keys()):
raise ConfigurationError(
"Configuration and required keys are inconsistent")
if options.output_manifest_file:
if options.append_decoding_keys_to_output_manifest:
key_registry.append_to_jsonfile(
options.output_manifest_file, indent=4)
# Make sure nothing's configuring the DoD behind our back, e.g. LHCbApp
appendPostConfigAction(assert_empty_dataondemand_service)
# TODO add configurable_tools and configurable_algs to config
return config
def _gaudi_datetime_format(dt):
"""Return the datetime object formatted as Gaudi does it.
As seen in the application "Welcome" banner.
"""
return dt.strftime('%a %h %d %H:%M:%S %Y')
[docs]def plot_data_flow(algs, filename='data_flow', extensions=('gv', )):
"""Save a visualisation of the current data flow.
Args:
algs (list): List of Algorithm instances.
filename (str): Basename of the file to create.
extensions (list): List of file extensions to create.
One file is created per extensions.
Possible values include `'gv'` for saving the raw graphviz representation,
and `'png'` and `'pdf'` for saving graphics.
Note:
The `dot` binary must be present on the system for saving
files with graphical extensions. The raw `.gv` format can be
convert be hand like::
dot -Tpdf data_flow.gv > data_flow.pdf
"""
now = _gaudi_datetime_format(datetime.datetime.now())
label = 'Data flow generated at {}'.format(now)
top = pydot.Dot(
graph_name='Data flow', label=label, strict=True, rankdir='LR')
top.set_node_defaults(shape='box')
for alg in algs:
alg._graph(top)
for ext in extensions:
format = 'raw' if ext == 'gv' else ext
top.write('{}.{}'.format(filename, ext), format=format)
[docs]def plot_control_flow(top_node, filename='control_flow', extensions=('gv', )):
"""Save a visualisation of the current control flow.
Args:
filename (str): Basename of the file to create.
extensions (list): List of file extensions to create.
One file is created per extensions.
Possible values include `'gv'` for saving the raw graphviz representation,
and `'png'` and `'pdf'` for saving graphics.
Note:
The `dot` binary must be present on the system for saving
files with graphical extensions. The raw `.gv` format can be
convert be hand like::
dot -Tpdf data_flow.gv > data_flow.pdf
"""
now = _gaudi_datetime_format(datetime.datetime.now())
label = 'Control flow generated at {}'.format(now)
graph = pydot.Dot(
graph_name='control_flow', label=label, strict=True, compound=True)
top_node._graph(graph)
for ext in extensions:
format = 'raw' if ext == 'gv' else ext
graph.write('{}.{}'.format(filename, ext), format=format)