Source code for PyConf.application

###############################################################################
# (c) Copyright 2019-2021 CERN for the benefit of the LHCb Collaboration      #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################
from __future__ import absolute_import, division, print_function
import datetime
import logging
import math
import os
import re
from collections import OrderedDict
import pydot
import sys
from Gaudi.Configuration import (appendPostConfigAction, ConfigurableUser,
                                 Configurable, INFO)
import Configurables
from Configurables import (ChronoStatSvc, Gaudi__IODataManager,
                           Gaudi__RootCnvSvc)
import GaudiKernel.ProcessJobOptions
from GaudiConf import IOHelper
from RawEventFormat import Raw_location_db

from . import ConfigurationError
from .components import setup_component, is_algorithm, force_location, MutableAlgorithm, Algorithm
from .control_flow import CompositeNode
from .dataflow import dataflow_config, ensure_event_prefix, DataHandle
from .packing import persistreco_version, available_persistreco_versions, default_persistreco_version
from .tonic import configurable
from GaudiConf.LbExec import InputProcessTypes

from .Algorithms import (
    createODIN,
    LHCb__MDFWriter,
    OutputStream,
    CopyInputStream,
    LHCb__UnpackRawEvent,
)
from Configurables import (
    LHCb__Tests__FakeEventTimeProducer as DummyEventTime,
    ApplicationMgr,
    HLTControlFlowMgr,
    HiveDataBrokerSvc,
    DataOnDemandSvc,
    MessageSvc,
    Gaudi__Monitoring__MessageSvcSink as MessageSvcSink,
    Gaudi__Monitoring__JSONSink as JSONSink,
    XMLSummarySvc,
    GitANNSvc,
    AlgContextSvc,
    LoKiSvc,
)
from .filecontent_metadata import metainfo_repos, key_registry, key_registries  # TODO: do not directly talk to key_registry -- put a function in filecontent_metadata to hide this, and import that function...
from .filecontent_metadata import retrieve_encoding_dictionary as retrieve_encoding_dictionary  # noqa: F401

from DDDB.CheckDD4Hep import UseDD4Hep
if UseDD4Hep:
    from Configurables import LHCb__Det__LbDD4hep__IOVProducer as IOVProducer
else:
    # FIXME(NN): We shouldn't need to refer to IOV explicitly in our framework
    from Configurables import LHCb__DetDesc__ReserveDetDescForEvent as reserveIOV

log = logging.getLogger(__name__)
MDF_KEY = 'MDF'
ROOT_KEY = 'ROOT'
#: Valid input/output filetypes
FILE_TYPES = {MDF_KEY, ROOT_KEY}


def _shortPath2Path(path):
    res = path
    if not res.startswith('/Event/'):
        res = '/Event/' + res
    return res


def __veto_args(veto, keys):
    banned = set(veto) & keys
    if banned:
        raise KeyError(
            f'the following keywords are not allowed as explicit arguments: {banned}'
        )


@configurable
def configured_ann_svc(name='HltANNSvc',
                       repositories_and_versions=None,
                       json_file=None,
                       add_registry_keys=True,
                       **kwargs):

    assert json_file is None or add_registry_keys, 'json_file specified, but registry keys are not requested to be added to the configuration -- which implies that the content of the json file is ignored'

    if json_file: key_registry.add_keys_from_jsonfile(json_file)

    if add_registry_keys:
        if 'Overrule' not in kwargs: kwargs['Overrule'] = dict()
        assert not kwargs['Overrule'].keys() & key_registry.keys()
        kwargs['Overrule'].update(
            {int(k, 16): v
             for k, v in key_registry.items()})

    if 'Overrule' in kwargs:
        log.info('Overrule specified for keys {}'.format(', '.join(
            "0x{:08x}".format(k) for k in kwargs['Overrule'].keys())))
    else:
        log.info('No Overrule specified - relying solely on repository')

    if repositories_and_versions and ('Repositories' in kwargs.keys()):
        raise ValueError(
            'duplicate specification of Repositories: {} and {}'.format(
                repositories_and_versions, kwargs))
    if 'Repositories' not in kwargs.keys():
        if repositories_and_versions is None:
            repositories_and_versions = metainfo_repos()
        kwargs['Repositories'] = repositories_and_versions
    if 'RegisterWithMonitoringHub' not in kwargs.keys():
        kwargs['RegisterWithMonitoringHub'] = True
        kwargs['KeysToAlwaysPublish'] = [
            k for k in kwargs.get('Overrule', {}).keys()
        ]
    if 'CheckFSRForDecodingKeys' not in kwargs.keys():
        kwargs['CheckFSRForDecodingKeys'] = True
    return setup_component(GitANNSvc, instance_name=name, **kwargs)


def _datadependencies(algs):
    def __walk(visited, top):
        if top.name in visited: return
        yield top
        visited.add(top.name)
        for handles in top.inputs.values():
            handles = handles if isinstance(handles, list) else [handles]
            for handle in handles:
                for p in __walk(visited, handle.producer):
                    yield p

    visited = set()
    for alg in algs:
        yield from __walk(visited, alg)


[docs]def mdf_writer(path, location, compression=0, ignore_tae_info=True, compression_alg="ZSTD"): """Return an MDF writer which writes a single TES `location` to `path`. By default no compression is used, since `IOSvc` does not currently support compressed inputs and also because there is already compression internally to the "DstData" raw banks (which dominate when no detector raw banks are written out). See https://gitlab.cern.ch/lhcb/LHCb/-/issues/163. """ return LHCb__MDFWriter( BankLocation=location, Compress=compression, CompressAlg=compression_alg, IgnoreTAE=ignore_tae_info, Connection=path)
[docs]def online_writer(location, stream, enable_tae=False): """Return an OnlineAlg writer which writes a single TES `location`.""" from PyConf.Algorithms import Online__OutputAlg, Online__RawEventToBanks tae_properties = {} if enable_tae: tae_properties = dict( UseTAELeaves=True, # Enable writing of TAE output BankDirectory="/Event/Banks", CentralFromRawData=True, # See the Hlt2Calib_TAE line in Moore/Hlt/Hlt2Conf/python/Hlt2Conf/lines/calibration/calibration.py TAEHalfWindow=force_location("/Event/Banks/TAEHalfWindowOutput"), ) raw_data = Online__RawEventToBanks(RawEvent=location).RawData return Online__OutputAlg( name=f"EventOutput_{stream}", RawData=raw_data, RawGuard=force_location("/Event/Banks/RawDataGuard"), **tae_properties, )
[docs]def format_output_location(l, add_depth=True): """Return the TES location `l` formatted for an output writer. Args: l (str or DataHandle) """ l = getattr(l, 'location', l) # Ensure the location ends with the depth specification `#N` or `#*` if add_depth and not re.match(r".*#(\d+|\*)$", l): l = "{}#1".format(l) return l
[docs]def root_writer(path, locations): """Return a ROOT/DST writer which writes TES `locations` to `path`.""" locations = [format_output_location(l) for l in locations] return OutputStream( OptItemList=locations, Output="DATAFILE='{}' SVC='Gaudi::RootCnvSvc' OPT='RECREATE'".format( path), )
[docs]def root_copy_input_writer(options, path, locations=None, tes_veto_locations=None): """Return algorithm for writing ROOT/DST files. The writer will write not only the locations specified in `locations`, but also the locations collected by RootIOAlg. By scheduling the latter as the very first producer, one can copy all input file locations to the output. Args: path (str): Path the output file should be written to. locations (list of str): TES locations to write. tes_veto_locations (list of str): TES locations that should not be propagated from the input. Returns: The writer algorithm to typically be scheduled last. """ locations = [format_output_location(l) for l in locations or []] tes_veto_locations = [ format_output_location(l, add_depth=False) for l in tes_veto_locations or [] ] writer = CopyInputStream( InputFileLeavesLocation=create_or_reuse_rootIOAlg(options). InputLeavesLocation, OptItemList=locations, TESVetoList=tes_veto_locations, Output="DATAFILE='{}' SVC='Gaudi::RootCnvSvc' OPT='RECREATE'".format( path), ) return writer
_ApplicationOptions_locked = False _ApplicationOptions_lockedSlots = {}
[docs]class ApplicationOptions(ConfigurableUser): """Holder for application configuration. Configuration can be mutated until `.finalize()` is called. At that point any dynamic defaults based on other properties are resolved. """ # FIXME(RM) we can improve upon the base class ConfigurableUser by creating # a new base clase that # - it more pythonic (e.g. no __slots__, __doc__) # - does better type checking of the properties # - makes `c.prop` work if prop hasn't been set # - supports namespaced/grouped options # - forbids a subsequent instantiation of ApplicationOptions such that one # has to do `from ... import app` rather than # `from ... import ApplicationOptions; ApplicationOptions().xxx` # - nicer printout of the options # - by default do not overwrite, i.e. # opts.evt_max = 1; opts.evt_max = 2; assert opts.evt_max == 1; # opts.evt_max.force(3); assert opts.evt_max == 3 # OR # opts.evt_max.set_if_not_set(x) OR opts.evt_max.default(x) # - ... def __setattr__(self, attr, val): if attr in _ApplicationOptions_lockedSlots: raise ConfigurationError( "Can't change property {}! It was locked in: {}".format( attr, _ApplicationOptions_lockedSlots[attr])) super(ApplicationOptions, self).__setattr__(attr, val)
[docs] def lockOption(self, option): """make a specific option entry immutable Args: option (str): name of option entry to lock """ if option not in self.__slots__: raise ConfigurationError( "Trying to lock unkown option {}".format(option)) global _ApplicationOptions_lockedSlots if option in _ApplicationOptions_lockedSlots: raise ConfigurationError( "Property {} was already locked in: {}".format( option, _ApplicationOptions_lockedSlots[option])) try: import inspect doc = inspect.stack()[1][0].f_code.co_filename except: doc = "callsite couldn't be determined" _ApplicationOptions_lockedSlots[option] = doc
__slots__ = { # input related 'input_files': [], # type is list of str 'input_type': '', # TODO use enum 'input_raw_format': 0.5, 'input_process': '', 'input_stream': '', 'evt_max': -1, 'first_evt': 0, 'print_freq': 10000, 'ioalg_buffer_nb_events': 40, 'mdf_ioalg_name': 'IOAlgFileRead', 'root_ioalg_name': 'RootIOAlg', 'root_ioalg_opts': {}, # conditions for processing 'data_type': 'Upgrade', 'dddb_tag': '', # To Be Dropped once DetDesc support is gone 'conddb_tag': '', 'geometry_backend': "DD4Hep" if UseDD4Hep else "DetDesc", 'geometry_version': '', 'conditions_version': '', 'simulation': None, # output related 'output_file': '', 'output_type': '', 'histo_file': '', 'ntuple_file': '', 'output_level': INFO, 'python_logging_level': logging.INFO, # - dump monitoring entities (counters, histograms, etc.) 'monitoring_file': '', # multithreaded processing 'n_threads': 1, 'event_store': 'HiveWhiteBoard', # one of EvtStoreSvc, HiveWhiteBoard 'n_event_slots': -1, # defaults to 1.2 * n_threads # if false scheduler calls Algorithm::execute instead of Algorithm::sysExecute, # which breaks some non-functional algorithms 'scheduler_legacy_mode': True, # estimated size of the per-event memory pool. set to zero to disable the pool. 'memory_pool_size': 10 * 1024 * 1024, # TODO such defaults can be expressed here explicitly # debugging # Control flow file name; not generated if empty 'control_flow_file': '', # Data flow file name; not generated if empty 'data_flow_file': '', # Output manifest of TES contents 'output_manifest_file': '', # where do the decoding keys go? 'append_decoding_keys_to_output_manifest': True, # write decoding keys to local git repository in builda area 'write_decoding_keys_to_git': True, # require that the configuration matched the unique keys specified. 'require_specific_decoding_keys': [], # input manifest of TES contents -- only needed when running passthrough to produce the output manifest, and/or as source of decoding keys... 'input_manifest_file': '', # Algorithms to be executed before the main control flow. # Could be used for controlling profiling, e.g. CallgrindProfile, PerfProfile. 'preamble_algs': [], 'msg_svc_format': '% F%35W%S %7W%R%T %0W%M', 'msg_svc_time_format': '%Y-%m-%d %H:%M:%S UTC', # Moore-specific options # TODO move to a Moore-specific ApplicationOptions class 'lines_maker': None, # Optional Auditors. Disabled (empty) by default. 'auditors': [], # phoenix file name; not generated if empty 'phoenix_filename': '', # version of persistreco of input, output is always latest (default is latest, unless otherwise set in a bind) 'persistreco_version': default_persistreco_version(), # in case GaudiPython is used, set this to true in order to change # the behavior of the scheduler accordingly 'gaudipython_mode': False } _propertyDocDct = { 'input_type': 'the type of I/O. One of None, Online, MDF or ROOT.', 'input_raw_format': ('sets the expected raw input format (splitting)' 'See definitions at ' 'https://gitlab.cern.ch/lhcb-datapkg/RawEventFormat/blob/master/python/RawEventFormat/__init__.py' ), 'input_stream': 'stream name. Default is empty', 'geometry_backend': 'Backend to be used for the geometry, can be DD4Hep or DetDesc. Use "NONE" for disabling the geometry', 'ioalg_buffer_nb_events': 'number of events to pre-fetch, the default value (20) is reasonable for HLT2/Analysis. It needs to be increased for HLT1, typically to 20000', 'mdf_ioalg_name': 'name of the ioalg to be used when input_type is MDF. Choices currently are IOAlgMemoryMap and IOAlgFileRead, the later being the default. Note that the former supposes the input files are local', 'root_ioalg_name': 'name of the ioalg to be used when input_type is ROOT. Choices currently are RootIOAlg and RootIOAlgExt, the former being the default. Note that the later shold not be used other than when using CopyInputStream until it\'s fixed', 'root_ioalg_opts': 'dictionnary of properties to be passed to the RootIOAlg at construction time', 'event_store': 'Event store implementation: HiveWhiteBoard (default) or EvtStoreSvc (faster).', 'auditors': 'Define list of auditors to run. Possible common choices include "NameAuditor", "MemoryAuditor" or "ChronoAuditor". For a full list see Gaudi documentation.', 'phoenix_filename': ' define the file where phoenix event data are writen. Defaults to none = no phoenix output' } if not UseDD4Hep: __slots__["velo_motion_system_yaml"] = "" _propertyDocDct[ "velo_motion_system_yaml"] = "path to a VP/Motion.yml directory from a Git CondDB clone" def _validate(self): """Raise an exception if the options are not consistent.""" # TODO validate separately the case of input_type == "NONE" # since now tags are not used in that case. # Configuration is ill-defined without these properties required = ["input_type"] if self.getProp("geometry_backend") == "DD4Hep": required.extend(["geometry_version", "conditions_version"]) elif self.getProp("geometry_backend") == "DetDesc": required.extend(["dddb_tag", "conddb_tag"]) # Check simulation flag has been explicitly set if not self.isPropertySet("simulation"): raise ConfigurationError( "Required option simulation must be set to True or False as appropriate" ) if self.getProp("geometry_backend") == "DD4Hep": if not self.geometry_version and self.dddb_tag: log.warning( "Required option geometry_version not set, using the value from dddb_tag" ) self.geometry_version = self.dddb_tag if not self.conditions_version and self.conddb_tag: log.warning( "Required option conditions_version not set, using the value from conddb_tag" ) self.conditions_version = self.conddb_tag not_set = [attr for attr in required if not getattr(self, attr)] if not_set: raise ConfigurationError( "Required options not set: {}".format(not_set)) # TODO check if input raw_format makes sense for Online if self.output_file and self.output_type != "MDF": assert self.n_event_slots == 1, 'Cannot write output other than MDF multithreaded' assert self.n_threads == 1, 'Cannot write output other than MDF multithreaded' assert self.output_type in FILE_TYPES, ( 'Output filetype not supported: {}'.format(self.output_type)) # make sure the right version of persistency locations is set if self.persistreco_version != default_persistreco_version(): available_versions = available_persistreco_versions() if self.persistreco_version not in available_versions: raise ConfigurationError( f"Requested persistency version (of input) '{self.persistreco_version}' is not known, available options are {available_versions}" ) persistreco_version.global_bind(version=self.persistreco_version) def _set_defaults(self): if self.n_event_slots <= 0: self.n_event_slots = (math.ceil(1.2 * self.n_threads) if self.n_threads > 1 else 1)
[docs] def finalize(self): """Finalize configuration and prevent further changes.""" global _ApplicationOptions_locked if _ApplicationOptions_locked: return _ApplicationOptions_locked = True # Never filter messages in the root logger's handler. # A non-root logger can thus have a lower level if set explicitly. GaudiKernel.ProcessJobOptions.GetConsoleHandler().enable(0) # applyConfigurableUsers() is too loud with info(), so silence it if '--debug' not in sys.argv: GaudiKernel.Configurable.log.setLevel(logging.WARNING) # If python_logging_level is set, pass it to the root logger. # Otherwise, gaudirun.py would set INFO or DEBUG (with --debug). if self.isPropertySet('python_logging_level'): logging.getLogger().setLevel(self.getProp('python_logging_level')) # Workaround ConfigurableUser limitation for name, default in self.getDefaultProperties().items(): if not self.isPropertySet(name): self.setProp(name, default) self._set_defaults() self._validate() if self.python_logging_level <= logging.INFO: print(self, flush=True)
[docs] def set_conds_from_testfiledb(self, key): """Set DDDB and CondDB tags, data type and simulation flag from a TestFileDB entry. Args: key (str): Key in the TestFileDB. """ from PRConfig.TestFileDB import test_file_db qualifiers = test_file_db[key].qualifiers self.data_type = qualifiers['DataType'] self.simulation = qualifiers['Simulation'] self.dddb_tag = qualifiers['DDDB'] self.conddb_tag = qualifiers['CondDB'] if "GeometryVersion" in qualifiers: self.geometry_version = qualifiers["GeometryVersion"] if "ConditionsVersion" in qualifiers: self.conditions_version = qualifiers["ConditionsVersion"]
[docs] def set_input_from_testfiledb(self, key): """Set input file paths and file type from a TestFileDB entry. Args: key (str): Key in the TestFileDB. """ from PRConfig.TestFileDB import test_file_db self.input_files = test_file_db[key].filenames file_format = test_file_db[key].qualifiers['Format'] self.input_type = 'ROOT' if file_format != 'MDF' else file_format
[docs] def set_input_and_conds_from_testfiledb(self, key): """Set input and conditions according to a TestFileDB entry. Args: key (str): Key in the TestFileDB. """ self.set_input_from_testfiledb(key) self.set_conds_from_testfiledb(key)
[docs] def applyConf(self): # we should not do anything here raise NotImplementedError( 'The {} configurable should not be called'.format( self.__class__.__name__))
[docs] def getProperties(self): """Stable-order override of Configurable.getProperties.""" props = super(ApplicationOptions, self).getProperties() return OrderedDict(sorted(props.items()))
### Singleton on Root and MDF IOAlg in case root/mdf is used as input _mdfIOAlgSingleton = None _rootIOAlgSingleton = None
[docs]def create_or_reuse_mdfIOAlg(location, options): """ creates a MDFIOAlg instance if it does not alredy exists """ global _mdfIOAlgSingleton if _mdfIOAlgSingleton is None: # create the MDF IOAlg if needed _mdfIOAlgSingleton = Algorithm( getattr(Configurables, options.mdf_ioalg_name), name="MDFIOAlg", outputs={ 'RawEventLocation': force_location(location), 'EventBufferLocation': None }, Input=options.input_files, BufferNbEvents=options.ioalg_buffer_nb_events, NSkip=options.first_evt) else: if _mdfIOAlgSingleton.RawEventLocation.location != ensure_event_prefix( location): raise ValueError( 'Same input was required twice from MDF, mapping to different locations : \'%s\' and \'%s\'' % (location, _mdfIOAlgSingleton.RawEventLocation)) return _mdfIOAlgSingleton
def input_from_mdf_file(location, propertyName=None, options=None, forced_type=None): # some safety checks if propertyName is not None and propertyName != 'RawEventLocation': raise ValueError( 'Unsupported input %s from an MDF file. Only RawEventLocation is supported' % propertyName) if options is None: raise ValueError( "No options have been passed, so MDFIOAlg cannot be created.") return create_or_reuse_mdfIOAlg(location, options).RawEventLocation
[docs]def create_or_reuse_rootIOAlg(options): """ creates a RootIOAlg instance if it does not alredy exists """ global _rootIOAlgSingleton if _rootIOAlgSingleton is None: # check we have options if options is None: raise ValueError( "No options have been passed, so RootIOAlg cannot be created. Pass options or call create_or_reuse_rootIOAlg directly" ) # create RootIOAlg if needed _rootIOAlgSingleton = MutableAlgorithm( getattr(Configurables, options.root_ioalg_name), name="RootIOAlg", Input=options.input_files, BufferNbEvents=options.ioalg_buffer_nb_events, NSkip=options.first_evt, EventBranches=[], # FIXME : should not be set (and feature should be dropped) but this requires that # algos do not require data they do not use (using getIfExist to check there was nothing there) AllowMissingInput=True, **options.root_ioalg_opts) return _rootIOAlgSingleton
[docs]def input_from_root_file(path, forced_type, options=None): """ deals with reading the given path from root input file and returns a DataHandle for it This may instantiate a RootIOAlg or reuse the existing one Arguments : - path : the path of the data to retrieve. Will be used as the TES path and as the branch name in the root file - options : PyConf options to be used in case the RootIOAlg needs to be created. If not provided and no RootIOAlg preexists, an exception will be raised """ rootIOAlgSingleton = create_or_reuse_rootIOAlg(options) full_path = _shortPath2Path(path) # check whether new input is not colliding with an existing one found = False for p in rootIOAlgSingleton._properties['EventBranches']: # we already have that input, do not duplicate found = p == full_path if found: break if not found: # new input, add it to the list rootIOAlgSingleton._properties['EventBranches'].append(full_path) # name of the property associated to the DataHandle for that output of the underlying RootIOAlg # We use the path + "Location" property_name = "_".join(full_path.split("/")[1:]) + "_Location" # return made up DataHandle. RootIOAlg will instantiate one dynamically on the C++ side dh = DataHandle(rootIOAlgSingleton, property_name, force_location(path)) dh.force_type(forced_type) return dh
[docs]def all_nodes_and_algs(node, recurse_algs=False): """Return the list of all reachable nodes and algorithms.""" if not isinstance(node, CompositeNode): raise TypeError('{} is not of type CompositeNode'.format(node)) # use OrderedDict as there is no OrderedSet is the standard library nodes = OrderedDict() algs_in_cf = OrderedDict() _all_nodes_and_algs(node, nodes, algs_in_cf) if recurse_algs: algs = sorted( set().union(*(alg.all_producers() for alg in algs_in_cf)), key=lambda a: a.id) else: algs = list(algs_in_cf) return list(nodes), algs
def _all_nodes_and_algs(node, nodes, algs): if node in nodes: return nodes[node] = None for c in node.children: if isinstance(c, CompositeNode): _all_nodes_and_algs(c, nodes, algs) elif is_algorithm(c): algs[c] = None else: raise TypeError( 'Child {!r} of {!r} is neither a CompositeNode nor an ' 'Algorithm'.format(c, node))
[docs]class ComponentConfig(dict): """Object holding the configuration of Gaudi Configurables.""" def add(self, component): self[component.getFullName()] = component return component
[docs] def update(self, other): for c in other.values(): self.add(c)
def make_raw_event_with_Online(location, bank_type="ALL", forced_type=None): from PyConf.Algorithms import Online__InputAlg, Online__BanksToRawEvent # TODO what if we're called twice with a different location? tae = location.removeprefix("/Event").replace("DAQ/RawEvent", "").replace( "/", "") def transform(RawData, RawGuard, DAQErrors, **outputs): dirs = set(os.path.dirname(o) for o in outputs.values()) assert len(dirs) == 1, str(outputs) return { "BankDirectory": dirs.pop(), "RawData": RawData, "RawGuard": RawGuard, "DAQErrors": DAQErrors, "ExtraOutputs": list(outputs.values()) } if not location.startswith("/Event"): location = "/Event/" + location raw_data_loc = re.sub(r"/Event/(Prev|Next)[0-9]+", "/Event", location) tae_locs = { "TAEHalfWindow": force_location("/Event/Banks/TAEHalfWindow"), } tae_locs |= { f"Prev{i}": force_location(f"/Event/Banks/Prev{i}") for i in range(10) } tae_locs |= { f"Next{i}": force_location(f"/Event/Banks/Next{i}") for i in range(10) } alg = Online__InputAlg( name="Online__InputAlg", outputs={ "RawData": force_location(raw_data_loc), "RawGuard": force_location("/Event/Banks/RawDataGuard"), "DAQErrors": None, } | tae_locs, output_transform=transform, DeclareData=True, # output RawGuard DeclareEvent=True, # output RawData (incl. TAE crossings) DeclareErrors=False, ) return Online__BanksToRawEvent( name="Online__BanksToRawEvent_" + bank_type + tae, RawData=getattr(alg, tae or "RawData"), BankType=bank_type).RawEvent
[docs]@configurable def default_raw_event(bank_type, raw_event_format, maker, stream=''): """Return a raw event that contains a given bank. Args: bank_type: Required raw bank type raw_event_format: RawEventFormat key in `Raw_location_db`. maker: Maker of input data Returns: DataHandle: Raw event containing banks of the requested types. """ if raw_event_format is None: raise ValueError('raw_event_format is required (or must be bound)') if stream != '': location = f'/Event/{stream}/RawEvent' else: raw_bank_locations = Raw_location_db[raw_event_format] if bank_type: location = raw_bank_locations.get( bank_type, list(raw_bank_locations.values())[0]) if type(location) == list: if len(location) == 1: location = location[0] else: raise ValueError( 'Requested raw bank ({}) points to multiple locations ({}) ' 'for raw event format {}. Restrict raw bank types or merge raw ' 'events manually.'.format(bank_type, location, raw_event_format)) else: location = list(raw_bank_locations.values())[0] return maker(location, forced_type="LHCb::RawEvent")
[docs]@configurable def default_raw_banks(bank_type, tae=0, **kwargs): ''' retrieves RawBank of given type if tae is not 0, actually retrieves the bank from Prev<x> or Next<x> event depending on sign of tae and with x being absolute value of tae ''' raw_event = default_raw_event(bank_type) if 'name' not in kwargs: kwargs['name'] = f'UnpackRawEvent_{bank_type}' # manage TAE case taePath = '' if tae != 0: taeName = f"Next{tae}" if tae > 0 else f"Prev{-tae}" taePath = f"/{taeName}" kwargs['name'] += "-" + taeName def output_transform(RawBankLocations): return {"RawBankLocations": [RawBankLocations]} __veto_args({'RawEventLocation', 'BankTypes', 'RawBankLocations'}, kwargs.keys()) # Note: the aim is to have a standardized location for all RawBanks, independent of where the RawEvent came from, # from this point onwards return LHCb__UnpackRawEvent( RawEventLocation=raw_event, BankTypes=[bank_type], output_transform=output_transform, outputs={ 'RawBankLocations': force_location(f'/Event/RawBanks/{taePath}{bank_type}') }, **kwargs, ).RawBankLocations
@configurable def make_odin(make_raw=default_raw_banks, tae=0, **kwargs): if 'name' not in kwargs: kwargs['name'] = 'Decode_ODIN' if tae != 0: taeName = '' if tae == 0 else f"Next{tae}" if tae > 0 else f"Prev{-tae}" kwargs['name'] += "-" + taeName __veto_args({'RawBanks'}, kwargs.keys()) return createODIN(RawBanks=make_raw('ODIN', tae=tae), **kwargs).ODIN # TODO get rid of magic initial time (must go to configure_input) INITIAL_TIME = 1433509200 @configurable def make_fake_event_time(start=INITIAL_TIME, step=0): # when running with in simulation mode we have to feed "reserveIOV" # and event time that is more realistic than what we have in the simulated data (0) odin_loc = '/Event/DAQ/DummyODIN' return setup_component( DummyEventTime, "DummyEventTime", Start=start, Step=step, ODIN=odin_loc)
[docs]@configurable def make_callgrind_profile(start=10, stop=90, dump=90, dumpName='CALLGRIND-OUT'): """Return algorithm that allows to use callgrind profiling. Pass it to the application options with ``preamble_algs=[make_callgrind_profile()]``. For more info see CodeAnalysisTools_. .. _CodeAnalysisTools: https://twiki.cern.ch/twiki/bin/view/LHCb/CodeAnalysisTools Args: start: Event count at which callgrind starts profiling. Defaults to 10 events. stop: Event count at which callgrind stops profiling. Defaults to 90 events. dump: Event count at which callgrind dumps results. Defaults to 90 events. dumpName: Name of dump. Defaults to 'CALLGRIND-OUT'. """ from PyConf.Algorithms import CallgrindProfile return CallgrindProfile( StartFromEventN=start, StopAtEventN=stop, DumpAtEventN=dump, DumpName=dumpName)
[docs]def configure_input(options): """Configure all aspects related to application inputs. Args: options (ApplicationOptions): Application options instance. Returns: ComponentConfig: Dict of configured Gaudi Configurable instances. """ options.finalize() config = ComponentConfig() config.add(ApplicationMgr(EvtSel="NONE", EvtMax=options.evt_max)) # TODO split out conditions from configure_input from Configurables import DDDBConf if options.geometry_backend == "DD4Hep": geo_version = options.geometry_version or options.dddb_tag cond_version = options.conditions_version or options.conddb_tag if geo_version and cond_version: config.add( DDDBConf( Simulation=options.simulation, GeometryVersion=geo_version, ConditionsVersion=cond_version, DataType=options.data_type)) elif options.geometry_backend == "DetDesc": from Configurables import CondDB config.add( DDDBConf( Simulation=options.simulation, DataType=options.data_type)) config.add( CondDB( Upgrade=True, Tags={ 'DDDB': options.dddb_tag, 'SIMCOND': options.conddb_tag, })) if options.input_type.upper() == "NONE": return config if options.input_type == 'Online': if options.input_files: raise ConfigurationError("input_files must be empty for Online") default_raw_event.global_bind( raw_event_format=options.input_raw_format, stream=options.input_stream, maker=make_raw_event_with_Online) elif options.input_type == 'MDF' or options.input_type == 'RAW': if not options.input_files: raise ConfigurationError( "Cannot run with an empty input_files list") # FIXME : drop when FSRSink does not highjack TFile from the IODataManager # these lines are only here so that the later comes before the former and thus # is ended after. Without it, FSRSink just core dumps, write to a closed TFile if options.output_type == 'ROOT': cnvsvc = setup_component( Gaudi__RootCnvSvc, instance_name="RootCnvSvc", EnableIncident=1) config.add(cnvsvc) ApplicationMgr().ExtSvc.append(cnvsvc) # Disable warning about not being able to navigate ancestors mgr = setup_component( Gaudi__IODataManager, instance_name="IODataManager", DisablePFNWarning=True) config.add(mgr) ApplicationMgr().ExtSvc.append(mgr) # finally setting up the MDF input default_raw_event.global_bind( raw_event_format=options.input_raw_format, stream=options.input_stream, maker=lambda location, propertyName=None, forced_type=None: input_from_mdf_file(location, propertyName, options)) else: # ROOT input_type if not options.input_files: raise ConfigurationError( "Cannot run with an empty input_files list") input_iohelper = IOHelper(options.input_type, options.output_type or None) # setupServices may create (the wrong) EventDataSvc, so do it first setup_component(options.event_store, instance_name='EventDataSvc') input_iohelper.setupServices() create_or_reuse_rootIOAlg(options) # Disable warning about not being able to navigate ancestors mgr = setup_component( Gaudi__IODataManager, instance_name="IODataManager", DisablePFNWarning=True) config.add(mgr) ApplicationMgr().ExtSvc.append(mgr) default_raw_event.global_bind( raw_event_format=options.input_raw_format, stream=options.input_stream, maker=input_from_root_file) # TODO: Remove use of getattr once ApplicationOptions is removed if getattr(options, "xml_file_catalog", None): from Gaudi.Configuration import FileCatalog config.add( setup_component( FileCatalog, Catalogs=[f"xmlcatalog_file:{options.xml_file_catalog}"], )) return config
def assert_empty_dataondemand_service(): assert DataOnDemandSvc().AlgMap == {}, DataOnDemandSvc().AlgMap assert DataOnDemandSvc().NodeMap == {}, DataOnDemandSvc().NodeMap def configure(options, control_flow_node, public_tools=[], make_odin=make_odin, make_fake_odin=make_fake_event_time): options.finalize() config = ComponentConfig() nodes, algs = all_nodes_and_algs(control_flow_node) # The following components are setup outside of the normal control and data flow. Thus, the automatic dependence evaluation in python does not work. # As for data, ODIN is required to get the time and the correspoding IOV, let's add ODIN unconditionally to the list of producer algorithms, # then the HiveDataBrokerSvc can take care of it. No input->no odin if options.input_type != "NONE" and not options.simulation: algs = algs + [make_odin().producer] configuration = dataflow_config() barriers = sorted( set(i.fullname for i in _datadependencies(algs) if i.is_barrier)) for alg in algs: configuration.update(alg.configuration()) for tool in public_tools: configuration.update(tool.configuration()) configurable_algs, configurable_tools = configuration.apply() del configuration has_velo_ms_override = not UseDD4Hep and options.velo_motion_system_yaml if options.simulation or options.input_type == "NONE": fake_odin = make_fake_odin() configurable_algs += [fake_odin] odin_loc = fake_odin.ODIN.path() else: odin_loc = make_odin().location if UseDD4Hep: configurable_algs += [ setup_component( IOVProducer, "ReserveIOVDD4hep", ODIN=odin_loc, ) ] else: if has_velo_ms_override: from Configurables import LHCb__DetDesc__VeloMotionSystemFromYaml as YamlMS configurable_algs += [ setup_component( YamlMS, "MotionSystemFromYaml", YAMLPath=options.velo_motion_system_yaml, ODIN=make_odin().location, ExtraOutputs=[ "VeloMotionSystemOverrideFromYAML" ], # this is used to enforce an order between YamlMS and reserveIOV ) ] configurable_algs += [ setup_component( reserveIOV, "reserveIOV", ODIN=odin_loc, IOVLock="/Event/IOVLock", ExtraInputs=(["VeloMotionSystemOverrideFromYAML"] if has_velo_ms_override else []), ) ] whiteboard = config.add( setup_component( options.event_store, instance_name='EventDataSvc', EventSlots=options.n_event_slots, ForceLeaves=True)) # TODO: Remove use of getattr once ApplicationOptions is removed end_event_incident = "" if getattr(options, "xml_summary_file", None): end_event_incident = "FinishedProcessingSomeEvent" scheduler = config.add( setup_component( HLTControlFlowMgr, CompositeCFNodes=[node.represent() for node in nodes], MemoryPoolSize=options.memory_pool_size, ThreadPoolSize=options.n_threads, EndEventIncident=end_event_incident, EnableLegacyMode=options.scheduler_legacy_mode, BarrierAlgNames=barriers, PreambleAlgs=options.preamble_algs, GaudiPythonMode=options.gaudipython_mode)) appMgr = config.add( ApplicationMgr(OutputLevel=options.output_level, EventLoop=scheduler)) appMgr.ExtSvc.insert( 0, whiteboard ) # FIXME this cannot work when configurables are not singletons # TODO: Remove use of getattr once ApplicationOptions is removed if getattr(options, "event_timeout", None): from Configurables import StalledEventMonitor appMgr.StalledEventMonitoring = True config.add( setup_component( StalledEventMonitor, EventTimeout=options.event_timeout, StackTrace=True, MaxTimeoutCount=1, )) if options.auditors: from Configurables import AuditorSvc config.add(setup_component(AuditorSvc, Auditors=options.auditors)) appMgr.ExtSvc += [AuditorSvc()] appMgr.AuditAlgorithms = True if not any(a for a in options.auditors if a == 'ChronoAuditor' or ( isinstance(a, Configurable) and a.getType() == 'ChronoAuditor')): # Turn off most output from ChronoStatSvc # Even if it is not explicitly configured, some components will # instantiate it (like DetailedMaterialLocator) causing some output. ChronoStatSvc().ChronoPrintOutTable = False ChronoStatSvc().PrintUserTime = False if not UseDD4Hep: from Configurables import UpdateManagerSvc config.add(setup_component(UpdateManagerSvc, WithoutBeginEvent=True)) config.add( setup_component(HiveDataBrokerSvc, DataProducers=configurable_algs)) # for LoKi config.add(setup_component(AlgContextSvc, BypassIncidents=True)) config.add(setup_component(LoKiSvc, Welcome=False)) # ANNSvc for encoding/decoding hltANNSvc = configured_ann_svc(json_file=options.input_manifest_file) config.add(hltANNSvc) appMgr.ExtSvc += [hltANNSvc] # configure message service config.add( setup_component( MessageSvc, Format=options.msg_svc_format, timeFormat=options.msg_svc_time_format)) appMgr.ExtSvc += [MessageSvcSink()] if not UseDD4Hep: from Configurables import EventClockSvc config.add( setup_component( EventClockSvc, InitialTime=int(INITIAL_TIME * 1e9))) # configure the monitoring sink that writes to a file if options.monitoring_file: sink = setup_component(JSONSink, FileName=options.monitoring_file) config.add(sink) appMgr.ExtSvc += [sink] # TODO: Remove use of getattr once ApplicationOptions is removed if getattr(options, "input_process", None): if options.input_process == InputProcessTypes.Hlt2: fsr_sink = setup_component( "LHCb__FSR__Sink", instance_name="FileSummaryRecord", AcceptRegex= r"^LumiCounter\.eventsByRun$|^HltANNSvc\.DecodingKeys$") config.add(fsr_sink) appMgr.ExtSvc.append(fsr_sink) # TODO: Remove use of getattr once ApplicationOptions is removed if getattr(options, "xml_summary_file", None): summary_svc = setup_component( XMLSummarySvc, xmlfile=options.xml_summary_file, EndEventIncident=end_event_incident) config.add(summary_svc) appMgr.ExtSvc.append(summary_svc) # TODO: Remove use of getattr once ApplicationOptions is removed if getattr(options, "compression", None): from Configurables import RootCnvSvc from GaudiKernel.Configurable import ConfigurableGeneric config.add( setup_component(RootCnvSvc, **options.compression.as_gaudi_config())) component = config.add( setup_component(ConfigurableGeneric, instance_name="RFileCnv")) for k, v in options.compression.as_gaudi_config().items(): setattr(component, k, v) # Check there is no collision between the histogram and the ntuple file # they need to be different to avoid the writer of Histogram (a Sink) and # the write of NTuples (still the HistogramPersistencySvc for now) to step # on each other feet if options.histo_file and options.histo_file == options.ntuple_file: raise Exception( "Histogram and NTuple files need to be different. Here they are both set to '%s'" % options.histo_file) if options.histo_file: from Configurables import Gaudi__Histograming__Sink__Root root_sink = setup_component( Gaudi__Histograming__Sink__Root, FileName=options.histo_file) config.add(root_sink) appMgr.ExtSvc.append(root_sink) if options.ntuple_file: from Configurables import NTupleSvc, HistogramPersistencySvc config.add( setup_component( HistogramPersistencySvc, OutputFile=options.ntuple_file)) config.add(ApplicationMgr(HistogramPersistency="ROOT")) ntuple_svc = setup_component( NTupleSvc, Output=[ "FILE1 DATAFILE='{}' TYPE='ROOT' OPT='NEW'".format( options.ntuple_file) ]) config.add(ntuple_svc) appMgr.ExtSvc.append(ntuple_svc) # set the Phoenix Sink output filename (same as histo_name above) if options.phoenix_filename: from Configurables import LHCb__Phoenix__Sink phoenix_sink = setup_component( LHCb__Phoenix__Sink, FileName=options.phoenix_filename) config.add(phoenix_sink) appMgr.ExtSvc.append(phoenix_sink) if options.control_flow_file: fn_root, fn_ext = os.path.splitext(options.control_flow_file) plot_control_flow( control_flow_node, filename=fn_root, extensions=[fn_ext[1:]]) if options.data_flow_file: fn_root, fn_ext = os.path.splitext(options.data_flow_file) plot_data_flow(algs, filename=fn_root, extensions=[fn_ext[1:]]) # keys have to either go to git, or to the output manifest file... if options.output_file and not options.write_decoding_keys_to_git and not ( options.output_manifest_file and options.append_decoding_keys_to_output_manifest): log.warning( 'you have specified an output file, but also blocked any writing of decoding keys -- as a result, you will likely not be able to decode the data written' ) if options.output_type == 'ROOT': output_iohelper = IOHelper(None, options.output_type or None) output_iohelper.setupServices() if 'StoreOldFSRs' in options.root_ioalg_opts and options.root_ioalg_opts[ 'StoreOldFSRs']: # needed to handle old FSRs. FIXME : to be removed when old FSRs are gone from Configurables import Gaudi__RootCnvSvc from Gaudi.Configuration import (FileRecordDataSvc, PersistencySvc) # Set up the FileRecordDataSvc frSvc = FileRecordDataSvc( ForceLeaves=True, EnableFaultHandler=True, RootCLID=1, #was the next line missed accidentally? PersistencySvc="PersistencySvc/FileRecordPersistencySvc") config.add(frSvc) appMgr.ExtSvc.append(frSvc) fileSvc = Gaudi__RootCnvSvc("FileRecordCnvSvc") # The FSRs are trees with only one entry per branch fileSvc.MinBufferSize = 512 fileSvc.ApproxEventsPerBasket = 1 fileSvc.ShareFiles = "YES" config.add(fileSvc) appMgr.ExtSvc.append(fileSvc) PersistencySvc("FileRecordPersistencySvc").CnvServices += [fileSvc] # if we generated _or read_ any keys, flush them to git (if so requested) # propagate any keys from the input manifest if options.input_manifest_file: key_registry.add_keys_from_jsonfile(options.input_manifest_file) if options.write_decoding_keys_to_git: for kr in key_registries.values(): kr.flush_to_git() # Check that the configuration creates the decoding keys we expect to use. if len(options.require_specific_decoding_keys) > 0: if not (isinstance(options.require_specific_decoding_keys[0], str)): raise ConfigurationError( "Please provide a non-zero list of keys as strings.") if set(options.require_specific_decoding_keys) != set( key_registry.keys()): raise ConfigurationError( "Configuration and required keys are inconsistent") if options.output_manifest_file: if options.append_decoding_keys_to_output_manifest: key_registry.append_to_jsonfile( options.output_manifest_file, indent=4) # Make sure nothing's configuring the DoD behind our back, e.g. LHCbApp appendPostConfigAction(assert_empty_dataondemand_service) # TODO add configurable_tools and configurable_algs to config return config def _gaudi_datetime_format(dt): """Return the datetime object formatted as Gaudi does it. As seen in the application "Welcome" banner. """ return dt.strftime('%a %h %d %H:%M:%S %Y')
[docs]def plot_data_flow(algs, filename='data_flow', extensions=('gv', )): """Save a visualisation of the current data flow. Args: algs (list): List of Algorithm instances. filename (str): Basename of the file to create. extensions (list): List of file extensions to create. One file is created per extensions. Possible values include `'gv'` for saving the raw graphviz representation, and `'png'` and `'pdf'` for saving graphics. Note: The `dot` binary must be present on the system for saving files with graphical extensions. The raw `.gv` format can be convert be hand like:: dot -Tpdf data_flow.gv > data_flow.pdf """ now = _gaudi_datetime_format(datetime.datetime.now()) label = 'Data flow generated at {}'.format(now) top = pydot.Dot( graph_name='Data flow', label=label, strict=True, rankdir='LR') top.set_node_defaults(shape='box') for alg in algs: alg._graph(top) for ext in extensions: format = 'raw' if ext == 'gv' else ext top.write('{}.{}'.format(filename, ext), format=format)
[docs]def plot_control_flow(top_node, filename='control_flow', extensions=('gv', )): """Save a visualisation of the current control flow. Args: filename (str): Basename of the file to create. extensions (list): List of file extensions to create. One file is created per extensions. Possible values include `'gv'` for saving the raw graphviz representation, and `'png'` and `'pdf'` for saving graphics. Note: The `dot` binary must be present on the system for saving files with graphical extensions. The raw `.gv` format can be convert be hand like:: dot -Tpdf data_flow.gv > data_flow.pdf """ now = _gaudi_datetime_format(datetime.datetime.now()) label = 'Control flow generated at {}'.format(now) graph = pydot.Dot( graph_name='control_flow', label=label, strict=True, compound=True) top_node._graph(graph) for ext in extensions: format = 'raw' if ext == 'gv' else ext graph.write('{}.{}'.format(filename, ext), format=format)