Source code for Moore.config

###############################################################################
# (c) Copyright 2019-2023 CERN for the benefit of the LHCb Collaboration      #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################
import re, logging, inspect
from collections import namedtuple

from Allen.config import allen_detectors, run_allen_reconstruction  # noqa: F401
from PyConf import configurable
from PyConf.Algorithms import (HltDecReportsFilter, HltDecReportsMonitor,
                               DeterministicPrescaler, HltRoutingBitsFilter,
                               LHCb__Tests__RunEventCountAlg)

from PyConf.components import Algorithm
from PyConf.control_flow import (CompositeNode, NodeLogic,
                                 traverse_node_and_children)

from PyConf.application import (ROOT_KEY, make_odin, default_raw_banks,
                                all_nodes_and_algs, configure_input, configure)
from .persistence import persist_line_outputs

from .streams import make_default_streams, Stream
from .stream_writers import stream_writer
from .reports_writers import report_writers_nodes

# These are not really needed here, but most lines import them from config instead of lines module
from .lines import Hlt2Line, SpruceLine

#: Regular expression (compiled) defining the valid selection line names
# Meaning: line names should start with either of Hlt1, Hlt2, Spruce, Pass
#          the following allowed characters are a to z, A to Z, 0 to 9 and _
#          the line name should not end with Line (change of convention https://gitlab.cern.ch/lhcb/Rec/-/issues/375)
SELECTION_LINE_NAME_PATTERN = re.compile(
    r'^(Hlt[12]|Spruce|Pass)[A-Za-z0-9_]+(?<!Line)$')

log = logging.getLogger(__name__)


class Reconstruction(namedtuple('Reconstruction', ['node'])):  # noqa
    """Immutable object fully qualifiying the output of a reconstruction data flow with prefilters.

    Attributes:
        node (CompositeNode): the control flow of the reconstruction.

    """
    __slots__ = ()  # do not add __dict__ (preserve immutability)

    def __new__(cls, name, data_producers, filters=None):
        """Initialize a Reconstruction from name, data_producers and filters.

        Creates two control flow `CompositeNode` with the given `data`
        combined with `NONLAZY_OR` to execute  and a `CompositeNode`.

        Args:
            name (str): name of the reconstruction
            data_producers (list): iterable list of algorithms to produce data
            filters (list): list of filters to apply before running reconstruction
        """
        data_producers_node = CompositeNode(
            name + "_data",
            data_producers,
            combine_logic=NodeLogic.NONLAZY_OR,
            force_order=True)
        if filters is None:
            filters = []
        control_flow = filters + [data_producers_node]
        cf_node = CompositeNode(
            name + "_decision",
            control_flow,
            combine_logic=NodeLogic.LAZY_AND,
            force_order=True)

        return super(Reconstruction, cls).__new__(cls, cf_node)

    @property
    def name(self):
        """Reconstruction name"""
        return self.node.name


[docs]def moore_control_flow(options, streams, process, analytics=False): """Return the Moore application control flow node. Combines the lines with `NONLAZY_OR` logic in a global decision control flow node. This is `LAZY_AND`-ed with the output control flow, which consists of Moore-specific report makers/writers and generic persistency. Args: options (ApplicationOptions): holder of application options streams (list of stream objects): control flow nodes of lines process (str): "hlt1", "hlt2", "spruce" or "pass". analytics (bool, optional): For use only in rate/event size analysis. Defaults to False. Returns: node (CompositeNode): Application control flow. """ options.finalize() # Assert condition to filter out HLT2 streaming when using DST output as writing multiple DST files is not supported for now assert not (len(streams.streams) > 1 and process == "hlt2" and options.output_type == "ROOT" ), "This configuration is not supported at this time" lines = [line.node for line in streams.lines] decisions_node = CompositeNode( 'hlt_decision', combine_logic=NodeLogic.NONLAZY_OR, children=lines, force_order=False) # Get dec reports and banks rw_nodes, dec_reports, hlt_raw_banks, routing_bits = report_writers_nodes( streams, process) # precale monitoring to use less resources prescaler_decreports_monitor = DeterministicPrescaler( name=f"{process.upper()}PrescaleDecReportsMonitor", AcceptFraction=0.1, SeedName=f"{process.upper()}PrescaleDecReportsMonitor", ODINLocation=make_odin()) postcaler_decreports_monitor = DeterministicPrescaler( name=f"{process.upper()}PostscaleDecReportsMonitor", AcceptFraction=0.0, SeedName=f"{process.upper()}PostscaleDecReportsMonitor", ODINLocation=make_odin()) decreports_monitor = HltDecReportsMonitor( name=f"{process.upper()}DecReportsMonitor", Input=dec_reports) decisions_monitor_node = CompositeNode( 'monitor_decisions', combine_logic=NodeLogic.LAZY_AND, children=[ prescaler_decreports_monitor, decreports_monitor, postcaler_decreports_monitor ], force_order=True) # We want to run the monitoring of line decisions on every event to have the proper normalization. # Therefore, add the DecReportsMonitor after the control flow node containing all selection lines # and use NONLAZY_OR as type of the node containing both. To not run the monitoring on every event # a prescale is used. To not change the decision of the combined node a postscale is used. lines_node = CompositeNode( "lines", combine_logic=NodeLogic.NONLAZY_OR, children=[ decisions_node, decisions_monitor_node, ], force_order=True) extra_locations_to_persist = [] packed_data = {} line_output_cf = {} if process in ["hlt2", "spruce"]: event_output_prefix = "/Event/Spruce" if process == "spruce" else "/Event/HLT2" reco_output_prefix = "/Event/HLT2" (line_output_cf, extra_locations_to_persist, packed_data) = persist_line_outputs( streams=streams, dec_reports=dec_reports.DecReportsLocation, associate_mc=process == "hlt2" and options.simulation and options.input_type == ROOT_KEY and options.output_type == ROOT_KEY, source_id=process.capitalize(), output_manifest_file=options.output_manifest_file, output_prefix=event_output_prefix, reco_output_prefix=reco_output_prefix, clone_mc=options.simulation and options.input_type == 'ROOT') stream_writers = [] for stream in streams.streams: post_algs = stream_writer( options=options, stream=stream, process=process, propagate_mc=options.simulation and options.input_type == 'ROOT', analytics=analytics, hlt_raw_banks=hlt_raw_banks, dst_data=[packed_data[stream.name].OutputView] if stream.name in packed_data.keys() else [], routing_bits=routing_bits, dec_reports=dec_reports, extra_locations=extra_locations_to_persist) streamFilter = HltDecReportsFilter( Lines=list(line.decision_name for line in stream.lines), DecReports=dec_reports.DecReportsLocation) stream_node_children = [streamFilter] # only hlt2 and sprucing do save line outputs stream_node_children += [ line_output_cf[stream.name] ] if stream.name in line_output_cf.keys() else [] stream_node_children += post_algs stream_node = CompositeNode( stream.name + "_" + 'writer', combine_logic=NodeLogic.LAZY_AND, children=stream_node_children, force_order=True) stream_writers.append(stream_node) if stream_writers: stream_writers_nodes = [ CompositeNode( 'stream_writers', combine_logic=NodeLogic.NONLAZY_OR, children=stream_writers, force_order=False) ] else: stream_writers_nodes = [] return CompositeNode( 'moore', combine_logic=NodeLogic.LAZY_AND, children=([lines_node] + [rw_nodes] + stream_writers_nodes), force_order=True)
[docs]def run_moore(options, make_streams=None, public_tools=[], analytics=False, exclude_incompatible=True): """Configure Moore's entire control and data flow. Convenience function that configures all services, creates the standard Moore control flow and builds the the data flow (by calling the global lines maker). If ``make_streams`` returns a list of lines, a default stream definition is created from it named "default"``. Args: options (ApplicationOptions): holder of application options make_streams: function returning dict of {stream : `DecisionLine` objects}) OR a list of `DecisionLine` objects public_tools (list): list of public `Tool` instances to configure analytics (bool, optional): For use only in rate/event size analysis. Defaults to False. exclude_incompatible (bool, optional): Exclude the lines that are incompatible with multithreaded mode. Defaults to True. """ # First call configure_input for its important side effect of # changing the default values of default_raw_event's arguments. # The latter is the deepest part of the call stack of make_lines. config = configure_input(options) # Then create the data (and control) flow for all streams. streams = (make_streams or options.lines_maker)() # Create default streams definition if make_streams returned a list of lines instead of Streams objects if isinstance(streams, list): streams = make_default_streams(streams) # Exclude the lines with known issues (with non-thread safe algos etc.) if exclude_incompatible: excluded_lines = [] for stream in streams.streams: filtered_lines = [] for l in stream.lines: reason = check_for_known_issues(l) if not reason: filtered_lines += [l] else: excluded_lines += [(l.name, reason)] if len(filtered_lines) > 0: stream.update(filtered_lines) streams.update() if len(excluded_lines) > 0: log.info( f"Following {len(excluded_lines)} lines were automatically excluded:" ) log.info( "Name of Line ---- list of found algos that are known to be not thread safe" ) for line_name, reason in excluded_lines: log.info(f"{line_name} ---- {reason}") # Determine whether Hlt1, Hlt2 or Spruce is being processed hlt1 = all(l.name.startswith('Hlt1') for l in streams.lines) hlt2 = all(l.name.startswith('Hlt2') for l in streams.lines) spruce = all(l.name.startswith('Spruce') for l in streams.lines) passthrough = all(l.name.startswith('Pass') for l in streams.lines) assert hlt1 ^ hlt2 ^ spruce ^ passthrough, 'Expected exclusively all Hlt1, all Hlt2, all Spruce or all Pass lines' if hlt1: process = "hlt1" elif hlt2: process = "hlt2" elif spruce: process = "spruce" elif passthrough: process = "pass" # Combine all lines and output in a global control flow. moore_control_node = moore_control_flow(options, streams, process, analytics) #Filter to return true if physics bit 95 is "on" for this event rb_bank = default_raw_banks('HltRoutingBits') physFilterRequireMask = (0x0, 0x0, 0x80000000) lumiFilterRequireMask = (0x0, 0x0, 0x40000000) rb_filter = [ HltRoutingBitsFilter( name="PhysFilter", RawBanks=rb_bank, RequireMask=physFilterRequireMask, PassOnError=False) ] lumi_filter = [ HltRoutingBitsFilter( name="LumiFilter", RawBanks=rb_bank, RequireMask=lumiFilterRequireMask, PassOnError=False) ] physics_sprucing_node = CompositeNode( 'physics_sprucing_node', combine_logic=NodeLogic.LAZY_AND, children=rb_filter + [moore_control_node], force_order=True) lumi_sprucing_node = CompositeNode( 'lumi_sprucing_node', combine_logic=NodeLogic.LAZY_AND, children=lumi_filter + [LHCb__Tests__RunEventCountAlg(name="LumiCounter", ODIN=make_odin())], force_order=True) spruce_control_node = CompositeNode( 'spruce_control_node', combine_logic=NodeLogic.NONLAZY_OR, children=[lumi_sprucing_node] + [physics_sprucing_node], force_order=False) top_node = spruce_control_node if ( process == "spruce" or process == "pass") else moore_control_node config.update(configure(options, top_node, public_tools=public_tools)) # TODO pass config to gaudi explicitly when that is supported return config
@configurable def get_allen_hlt1_decision_ids(): """ Read Allen HLT1 decision IDs from the Allen control node """ from RecoConf.hlt1_allen import get_allen_line_names from AllenConf.persistency import build_decision_ids return build_decision_ids(get_allen_line_names()) @configurable def allen_control_flow(options, write_all_input_leaves=True): from RecoConf.hlt1_allen import ( allen_gaudi_config, call_allen_raw_reports, call_allen_lumi_summary, call_allen_decision_logger, make_allen_decision) from Allen.config import setup_allen_non_event_data_service from AllenConf.persistency import make_dec_reporter, register_decision_ids options.finalize() ids = get_allen_hlt1_decision_ids() encoding_key = register_decision_ids(ids) # TODO: remove when full configuration of Allen from TCK is implemented make_dec_reporter.global_bind(TCK=encoding_key) # Write DecReports raw banks hlt1_config = allen_gaudi_config() allen_cf = hlt1_config['control_flow_node'] detectors = allen_detectors(allen_cf) non_event_data_node = setup_allen_non_event_data_service( bank_types=detectors) algs = [] srw = call_allen_raw_reports() algs.extend([srw]) new_raw_banks = [ srw.OutputSelView, srw.OutputDecView, srw.OutputRoutingBitsView ] # If lumi is enabled for this sequence, add the output banks too if 'lumi_reconstruction' in hlt1_config: lsm = call_allen_lumi_summary() algs.append(lsm['node']) new_raw_banks.append(lsm['lumi_summary']) report_writers_node = CompositeNode( 'report_writers_allen', combine_logic=NodeLogic.NONLAZY_OR, children=algs, force_order=True) writers = [report_writers_node] if options.output_file: # Give stream a name: 'default' stream = Stream(name="default", lines=[]) post_algs = stream_writer( options=options, stream=stream, process="hlt1", propagate_mc=options.simulation and options.input_type == 'ROOT', analytics=False, hlt_raw_banks=new_raw_banks, routing_bits={"default": []}, dst_data=[], dec_reports=None, write_all_input_leaves=write_all_input_leaves) writers += post_algs allen_algs = [non_event_data_node, allen_cf] # Check if hlt1_config contains gather selections and if so add a decision logger gather_selections = hlt1_config.get('gather_selections', None) if gather_selections is not None: allen_algs.append(call_allen_decision_logger(gather_selections)) allen_node = CompositeNode( 'allen_algorithms', combine_logic=NodeLogic.NONLAZY_AND, children=allen_algs, force_order=True) allen = CompositeNode( 'MooreAllen', combine_logic=NodeLogic.LAZY_AND, children=[allen_node, make_allen_decision()] + writers, force_order=True) return allen def run_allen(options): """Configure Allen within Mooore. Convenience function that sets up an Allen node and sets up services Args: options (ApplicationOptions): holder of application options """ config = configure_input(options) top_cf_node = allen_control_flow(options) config.update(configure(options, top_cf_node)) # TODO pass config to gaudi explicitly when that is supported return config def run_reconstruction(options, make_reconstruction, public_tools=[]): """Configure the reconstruction data flow with a simple control flow. Convenience function that configures all services and creates a data flow. Args: options (ApplicationOptions): holder of application options make_reconstruction: function returning a single CompositeNode object public_tools (list): list of public `Tool` instances to configure """ config = configure_input(options) reconstruction = make_reconstruction() config.update( configure(options, reconstruction.node, public_tools=public_tools)) # TODO pass config to gaudi explicitly when that is supported return config # These shouldn't reall be here but used in too many places def is_DVCommonBase_alg(alg): # a Gaudi::Property registers it's owner and appends it to the doc string # e.g. the doc of ModifyLocations in DVCommonBase is: # ' if set to false, does not append /Particles to outputParticles location [DVCommonBase<GaudiAlgorithm>] ' # so as a proxy if something inherits from DVCommonBase we check if we can find this property return '[DVCommonBase<' in alg._propertyDocDct.get("ModifyLocations", "") def is_GaudiHistoAlg(alg): return '[GaudiHistos<' in alg._propertyDocDct.get( "UseSequencialNumericAutoIDs", "") def check_for_known_issues(line): all_algs = all_nodes_and_algs(line.node, True)[1] # filter out lines which will crash in multi threaded mode # this check is likely incomplete.... # what else is not thread safe? # For now we just look for anything that inherits from DVCommonBase or GaudiHistos return [ a for a in all_algs if is_DVCommonBase_alg(a.type) or is_GaudiHistoAlg(a.type) ] def _get_param_default(function, name): """Return the default value of a function parameter. Raises TypeError if ``function`` has no default keyword parameter called ``name``. """ try: try: sig = inspect.signature(function) p = sig.parameters[name] if p.default == p.empty: raise ValueError() # just forward to raise below return p.default except AttributeError: # Python 2 compatibility spec = inspect.getargspec(function) i = spec.args.index(name) # ValueError if not found return spec.defaults[i - len( spec.args)] # IndexError if not keyword except (KeyError, ValueError, IndexError): raise TypeError('{!r} has no keyword parameter {!r}'.format( function, name)) def add_line_to_registry(registry, name, maker): """Add a line maker to a registry, ensuring no name collisions.""" if name in registry: raise ValueError('{} already names an HLT line maker: ' '{}'.format(name, registry[name])) registry[name] = maker def valid_name(name): """Return True if name follows the selection line (HLT, Sprucing) name conventions.""" try: return SELECTION_LINE_NAME_PATTERN.match(name) is not None except TypeError: return False
[docs]def register_line_builder(registry): """Decorator to register a named HLT line. The decorated function must have keyword parameter `name`. Its default value is used as the key in `registry`, under which the line builder (maker) is registered. Usage: >>> from PyConf.tonic import configurable ... >>> all_lines = {} >>> @register_line_builder(all_lines) ... @configurable ... def the_line_definition(name='Hlt2LineName'): ... # ... ... return DecisionLine(name=name, algs=[]) # filled with control flow ... >>> 'Hlt2LineName' in all_lines True """ def wrapper(wrapped): name = _get_param_default(wrapped, 'name') if not valid_name(name): raise ValueError( '{!r} is not a valid selection line name!'.format(name)) add_line_to_registry(registry, name, wrapped) # TODO return a wrapped function that checks the return type is DecisionLine return wrapped return wrapper
def has_global_event_cut(node: CompositeNode) -> bool: """Checks if a node contains a GEC. Args: node (CompositeNode): The node to check. Returns: bool: True if there is a GEC. """ return any( isinstance(n, Algorithm) and n.typename == "PrGECFilter" for n in traverse_node_and_children(node)) def dummy_function_for_line_imports(): """Most lines import Hlt2/SpruceLine from config instead of lines module This dummy function is not used but it's here to make formatting happy instead of changing the import everywhere. """ b_line = Hlt2Line(name="UnusedHltLine") c_line = SpruceLine(name="UnusedHltLine") return (b_line, c_line) def filter_lines(lines_dict: dict[str, Hlt2Line], pattern_to_remove: str) -> dict[str, Hlt2Line]: """Remove lines using regular expression. Args: lines_dict (dict[str, Hlt2Line]): Line dictionary to filter. pattern_to_remove (str): Name pattern to search for using regex. Returns: dict[str, Hlt2Line]: Filtered line dictionary. """ filtered = { name: line for name, line in lines_dict.items() if re.search(pattern_to_remove, name) is None } return filtered