Source code for Moore.lines

###############################################################################
# (c) Copyright 2021 CERN for the benefit of the LHCb Collaboration           #
#                                                                             #
# This software is distributed under the terms of the GNU General Public      #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING".   #
#                                                                             #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization  #
# or submit itself to any jurisdiction.                                       #
###############################################################################
import logging

import Functors as F

from PyConf.Algorithms import (
    DeterministicPrescaler,
    VoidFilter,
)
from PyConf.application import make_odin
from PyConf.components import Algorithm
from PyConf.reading import get_decreports
from PyConf.control_flow import CompositeNode, NodeLogic

from Moore.selreports import (
    UnconvertableAlgorithmError,
    convert_output as convert_output_for_selreports,
)

from .monitoring import monitoring, run_default_monitoring

#: Ending common to all line decision names
DECISION_SUFFIX = "Decision"

log = logging.getLogger(__name__)


def _producer(datahandle_or_producer):
    try:
        return datahandle_or_producer.producer
    except AttributeError:
        return datahandle_or_producer


def _node_basics(node):
    """Return the flattened list of all non-node descendents of `node`."""
    basics = []
    for child in node.children:
        if isinstance(child, CompositeNode):
            basics += _node_basics(child)
        else:
            basics.append(child)
    return basics


def _remove_decision_output(algs, decision_node):
    """Return `algs` with any children of `decision_node` removed.
    """
    decision_algs = set(_node_basics(decision_node))
    return [a for a in map(_producer, algs) if a not in decision_algs]


def _return_filter(name, hlt_filter_code, filter_source_id):
    if not isinstance(hlt_filter_code, list):
        hlt_filter_code = [hlt_filter_code]
    assert all(
        'Decision' in line for line in
        hlt_filter_code), "hlt_filter_code must use the line DECISION_SUFFIX"
    if not all(filter_source_id in line for line in hlt_filter_code):
        raise ValueError(
            "A {!r} filter can only interpret {!r} line filters.".format(
                filter_source_id, filter_source_id))
    line_regex = "|".join(line for line in hlt_filter_code)
    return VoidFilter(
        name=name,
        allow_duplicate_instances_with_distinct_names=True,
        Cut=F.DECREPORTS_RE_FILTER(
            Regex=line_regex, DecReports=get_decreports(filter_source_id)))


[docs]class DecisionLine(object): """Object fully qualifying an HLT line. The control flow of the line, a `PyConf.control_flow.CompositeNode`, is exposed as the `node` property. It will run the given `algs` prepended with a `DeterministicPrescaler` (even if the prescale is 1), all combined with `PyConf.control_flow.NodeLogic.LAZY_AND` logic. The attributes of this object should be considered immutable. Args: name (str): name of the line algs (iterable of `Algorithm`): control flow of the line prescale (float): accept fraction of the prescaler postscale (float): accept fraction of the postscaler Attributes: name (str): name of the line node (`PyConf.control_flow.CompositeNode`): full control flow of the line; if this CF node is positive, the line is said to have 'fired' output_producer (`Algorithm`): the last algorithm in the line's control flow. On a positive CF decision the application may wish to use this output, e.g. to fill selection reports or luminosity banks """ def __init__(self, name, algs, prescale=1.0, postscale=1.0): # Line names ending with "Decision" would be confusing, so forbid it if name.endswith(DECISION_SUFFIX): raise ValueError("line name ({}) should not end with {}".format( name, DECISION_SUFFIX)) self.name = name self.node = self._decision_node(self.name, algs, prescale, postscale) self.output_producer = self._output_producer(self.node) @property def decision_name(self): """Decision name.""" return self.name + DECISION_SUFFIX @property def produces_output(self): """Return True if this line produces output.""" return self.output_producer is not None def __str__(self): return (f"<{type(self).__name__} with name \"{self.name}\"" + f" at {id(self):#0x}>") @staticmethod def _decision_node(name, algs, prescale, postscale): """Return a `CompositeNode` that evaluates this line's decision.""" prescaler = DeterministicPrescaler( name=f"{name}_Prescaler", AcceptFraction=float( prescale ), # make sure prescale is not interpreted as 'int' because it changes the hash computation... SeedName=name + "Prescaler", ODINLocation=make_odin()) postscaler = DeterministicPrescaler( name=f"{name}_Postscaler", AcceptFraction=float(postscale), SeedName=name + "Postscaler", ODINLocation=make_odin()) node = CompositeNode(name, (prescaler, ) + tuple(algs) + ( (postscaler, ) if postscale < 1.0 else tuple())) return node @staticmethod def _output_producer(node): """Return the producer that defines the 'output data' of this line. The producer is defined as the last child in the control flow node, i.e. the last item passed as the `algs` argument to the `DecisionLine` constructor. In cases where the last algorithms are monitoring algorithm, the last producer is searched for. How the output data of the producer is used depends on the application. If the producer creates no output, None is returned. """ for c in filter( lambda c: isinstance(c, Algorithm) and not c.typename.startswith('Monitor_'), reversed(node.children)): last = c break # Could in principle have control node here; will deal with this use # case if it arises # TODO: we are already checking that "last" is an algorithm... assert isinstance(last, Algorithm), last # If the last algorithm produces nothing, there is no 'producer' return last if last.outputs else None
# FIXME rename to Hlt2LumiNanofyLine class Hlt2LuminosityLine(DecisionLine): """Line that propagates and "nanofies" lumi events. Hlt2LuminosityLine is used to propagate and "nanofy" the lumi events selected in HLT 1. "Nanofy" means that the line defines the exact set of raw banks to be saved independently from what the default for a stream is. This special treatment happens in config.py:stream_writer(). The output of this line is only used to count the number of lumi events in physics streams. Therefore, only the minimum necessary banks are requested: ODIN and HltRoutingBits. """ _HLT1_FILTER_SOURCE_ID = "Hlt1" _instantiated = False def __init__( self, algs=[], hlt1_filter_code="Hlt1ODINLumiDecision", ): if Hlt2LuminosityLine._instantiated: raise RuntimeError("Can instantiate Hlt2LuminosityLine only once") Hlt2LuminosityLine._instantiated = True name = "Hlt2Lumi" self.hlt1_filter_code = hlt1_filter_code if hlt1_filter_code: hlt1_filter = _return_filter(f"{name}_Hlt1Filter", self.hlt1_filter_code, self._HLT1_FILTER_SOURCE_ID) algs = [hlt1_filter] + algs self.raw_banks = ["ODIN", "HltRoutingBits"] super(Hlt2LuminosityLine, self).__init__( name, algs, prescale=1.0, postscale=1.0) class Hlt2LumiCountersLine(DecisionLine): """Line that computes HLT2 lumi counters. Hlt2LumiCountersLine is used to compute luminosity counters and merge them into an HltLumiSummary object. This is then consumed by the HltLumiWriter raw bank encoder. """ _HLT1_FILTER_SOURCE_ID = "Hlt1" _instantiated = False def __init__( self, algs, output, hlt1_filter_code="Hlt1ODINLumiDecision", raw_banks=None, ): if Hlt2LumiCountersLine._instantiated: raise RuntimeError( "Can instantiate Hlt2LumiCountersLine only once") Hlt2LumiCountersLine._instantiated = True name = "Hlt2LumiCounters" self.hlt1_filter_code = hlt1_filter_code if hlt1_filter_code: hlt1_filter = _return_filter(f"{name}_Hlt1Filter", self.hlt1_filter_code, self._HLT1_FILTER_SOURCE_ID) algs = [hlt1_filter] + algs self.raw_banks = tuple(sorted(set(raw_banks or []), key=hash)) # FIXME remove key=hash everywhere in this file... super(Hlt2LumiCountersLine, self).__init__( name, algs, prescale=1.0, postscale=1.0) if "LHCb::RawBank" not in output.type: raise TypeError( f"The output producer must produce a RawBank view but got {format(output.type)}" ) self.output_producer = output.producer class Hlt1Line(DecisionLine): """Object fully qualifying an HLT1 line. Extends `DecisionLine` with control flow that ensures additional objects are created for later persistence on a positive decision. Args: name (str): name of the line; must begin with `Hlt1`. algs (iterable of `Algorithm`): control flow of the line prescale (float): accept fraction of the prescaler postscale (float): accept fraction of the postscaler Attributes: objects_to_persist (list of DataHandle): Objects which this lines requests to be persisted on a positive decision. The control flow of the line will guarantee that these objects will exist in that case. """ def __init__(self, name, algs, prescale=1., postscale=1.0, raw_banks=None): super(Hlt1Line, self).__init__(name, algs, prescale, postscale) assert self.name.startswith("Hlt1") # The line guarantees that these objects will be present in the TES if # this line made a positive decision self.objects_to_persist = [] self.raw_banks = tuple(sorted(set(raw_banks or []), key=hash)) if self.produces_output: output_node, self.objects_to_persist = self._output_node( self.node, self.output_producer) # Wrap the decision and output nodes if output_node is not None: self.node = CompositeNode(self.name + "DecisionWithOutput", (self.node, output_node)) def to_json(self): return (self.name, self.node) @staticmethod def _output_node(decision_node, output_producer): try: algs = [convert_output_for_selreports(output_producer)] objects_to_persist = list(algs[-1].outputs.values()) except UnconvertableAlgorithmError as e: log.warning( "Cannot convert output objects for line {}; SelReports will be unavailable: {}" .format(decision_node.name, e)) algs = [] objects_to_persist = [] # Algorithms already in the decision CF don't need to be in the # output CF. Although one can imagine a scheduler that allows this # type of duplication, it is currently not permitted under an # ordered control flow node (see LHCb#108) algs = _remove_decision_output(algs, decision_node) if algs: output_node = CompositeNode( decision_node.name + "Output", algs, combine_logic=NodeLogic.NONLAZY_OR, force_order=False) else: output_node = None return output_node, objects_to_persist
[docs]class Hlt2Line(DecisionLine): """Object fully qualifying an HLT2 line. Extends `DecisionLine` with control flow that ensures additional objects are created for later persistence on a positive decision. This supports extra outputs (TurboSP) and reconstruction persistence (Turbo++/PersistReco). Args: name (str): name of the line; must begin with `Hlt2`. algs (iterable of `Algorithm`): control flow of the line prescale (float): accept fraction of the prescaler postscale (float): accept fraction of the postscaler extra_outputs (iterable of 2-tuple): List of `(name, DataHandle)` pairs. persistreco (bool): If True, request HLT2 reconstruction persistence. hlt1_filter_code (list(str)): string used to define a HLT1 filter. Attributes: objects_to_persist (list of DataHandle): Objects which this lines requests to be persisted on a positive decision. The control flow of the line will guarantee that these objects will exist in that case. extra_outputs (iterable of 2-tuple): List of `(name, DataHandle)` pairs which this line requests are persisted on a positive decision. The name is just an identifier, which can be used by the application to construct a TES path when persisting each extra output persistreco (bool): If True, this line requests HLT2 reconstruction persistence on a positive decision. The CF of the line (the `node` attribute) will ensure that the reconstruction algorithms that produce the reconstruction objects will run when the line fires locations_to_move (dict of strings): Dictionary of TES paths, which the persistence code will need to move (keys) and the paths they should be moved to (values). This is currently on filled in case a FlavourTag object is found in extra_outputs. hlt1_filter_code (list(str)): If not empty, the string is used to define a HLT1 filter that is prepended to the control flow defined by `algs`. If empty, a default filter according to 'get_default_hlt1_filter_code_for_hlt2' is applied. ThOr DECREPORTS_RE_FILTER is used. Note that this parameter should look like "['Hlt1ADecision', 'Hlt1.*Decision']". monitoring_variables (tuple(str)): Variables to create default monitoring plots for. These variables need to be implemented in .monitoring.py """ _CLASS_NAME_PREFIX = "Hlt2" _HLT1_FILTER_SOURCE_ID = "Hlt1" def __init__( self, name, algs, prescale=1., postscale=1.0, extra_outputs=None, persistreco=False, tagging_particles=False, calo_digits=False, calo_clusters=False, pv_tracks=False, track_ancestors=False, raw_banks=None, hlt1_filter_code="", monitoring_variables=("pt", "eta", "m", "vchi2", "ipchi2", "n_candidates"), ): # import needs to be here to pass tests from Hlt2Conf.settings.defaults import get_default_hlt1_filter_code_for_hlt2 self.hlt1_filter_code = hlt1_filter_code if hlt1_filter_code: hlt1_filter_seq = [ _return_filter(f"{name}_Hlt1Filter", self.hlt1_filter_code, self._HLT1_FILTER_SOURCE_ID) ] elif get_default_hlt1_filter_code_for_hlt2() != "": self.hlt1_filter_code = get_default_hlt1_filter_code_for_hlt2() hlt1_filter_seq = [ _return_filter("Default_Hlt1Filter", self.hlt1_filter_code, self._HLT1_FILTER_SOURCE_ID) ] else: hlt1_filter_seq = [] algs = hlt1_filter_seq + algs self.monitoring_variables = monitoring_variables # the first is a local switch for every instance of Hlt2Line, the second a global one (`with run_default_monitoring.bind(run=False)`). if self.monitoring_variables and run_default_monitoring(): monitoring_algs = monitoring(algs, name, self.monitoring_variables) if len(monitoring_algs) == 0: log.debug(f"No default monitoring for {name}") else: algs += monitoring_algs super(Hlt2Line, self).__init__(name, algs, prescale, postscale) if not self.name.startswith(self._CLASS_NAME_PREFIX): raise ValueError("name {!r} does not start with {!r}".format( name, self._CLASS_NAME_PREFIX)) self.extra_outputs = tuple(sorted(set(extra_outputs or []), key=hash)) self.raw_banks = tuple(sorted(set(raw_banks or []), key=hash)) self.persistreco = persistreco self.tagging_particles = tagging_particles self.calo_digits = calo_digits self.calo_clusters = calo_clusters self.pv_tracks = True if persistreco else pv_tracks self.track_ancestors = track_ancestors # The line guarantees that these objects will be present in the TES if # this line made a positive decision self.objects_to_persist = [] if self.produces_output: output_node, self.objects_to_persist, self.locations_to_move = self._output_node( self.node, self.output_producer, self.extra_outputs, self.persistreco, self.tagging_particles) # Wrap the decision and output nodes if output_node is not None: self.node = CompositeNode(self.name + "DecisionWithOutput", (self.node, output_node)) def to_json(self): return (self.name, self.node, self.extra_outputs, self.persistreco, self.tagging_particles) ##Overwrite `produces_output` so that pass through lines with `persistreco==True` or `extra_outputs` count as `physics_lines` in `config.py` @property def produces_output(self): """Return True if this line produces output OR requests reconstruction OR requests extra_outputs.""" return self.output_producer is not None or self.persistreco or self.tagging_particles or self.extra_outputs is not None @staticmethod def _output_node(decision_node, output_producer, extra_outputs, persistreco, tagging_particles): # Build 2-tuple of (output path components, output producer) for main candidate of the HLT2 line main_output = (output_producer, (decision_node.name, )) # Build list of 2-tuple of (output path components, output producer) for the extra_outputs additional_outputs = [] for prefix, output in extra_outputs: additional_outputs.append((output, (decision_node.name, prefix))) algs, objects_to_persist, locations_to_move = Hlt2Line._line_outputs( main_output, additional_outputs, persistreco, tagging_particles) # Algorithms already in the decision CF don't need to be in the # output CF. Although one can imagine a scheduler that allows this # type of duplication, it is currently not permitted under an # ordered control flow node (see LHCb#108) algs = _remove_decision_output(algs, decision_node) if algs: # Run all output producers in a separate CF node, and record all of # their outputs so Moore can persist them output_node = CompositeNode( decision_node.name + "Output", algs, combine_logic=NodeLogic.NONLAZY_OR, force_order=False) else: output_node = None return output_node, objects_to_persist, locations_to_move @staticmethod #def _line_outputs(outputs_locations, persistreco): def _line_outputs(main_output_location, additional_output_locations, persistreco, tagging_particles): """Return output-producing algorithms and their outputs. Args: main_outputs_location (2-tuple): 2-tuple of a DataHandle and location components tuple for the main candidate of the HLT2 line. The semantics are that this line wishes to have the object represented by the DataHandle to be persisted at the TES location represented by the location tuple (an N-tuple of str). additional_outputs_locations (list of 2-tuple): Each entry is a 2-tuple of a DataHandle and location components tuple. The semantics are that this line wishes to have the object represented by the DataHandle to be persisted at the TES location represented by the location tuple (an N-tuple of str). persistreco (bool): If True, this line requests the full HLT2 reconstruction be persisted along with its other outputs. tagging_particles (bool): If True, this line requests the proto particles for tagging be persisted along with its other outputs. Returns: algs (list of Algorithm): Output data producer algorithms which this line's CF must run on a positive decision, in order to produce the output objects it has requested. objects_to_persist (list of DataHandle): Output objects to be persisted on a positive decision. Note: There is an implicit understanding between the logic here and that implemented in `Moore.persistence`. The latter can only support persisting certain types of objects, and the logic here performs some basic configuration-time checking to ensure lines do not request objects which Moore does not know how to persist. As of this writing, line outputs and extra outputs can only be LHCb::Particle producers and LHCb::FlavourTags producers. If other objects are to be supported, this method must be updated alongside the cloner algorithms configured in `Moore.persistence.cloning`. """ from RecoConf.reconstruction_objects import reconstruction from .persistence.particle_moving import ( CopyParticles, CopyFlavourTags, is_particle_producer, particle_output, is_flavourtag_producer, flavourtag_output) from .persistence.persistreco import persistreco_line_outputs reco = reconstruction() pvs = reco["PVs"] rec_summary = reco["RecSummary"] algs = [] objects_to_persist = [] locations_to_move = {} main_output, main_location_components = main_output_location main_producer = _producer(main_output) if main_output is not None: # Branch on the C++ type of the outputs of the producer # For legacy LHCb::Particle outputs, we must navigate back to the # producer to determine whether we should run the LHCb::Particle # moving algorithm # Copy the main output of the HLT2 line and add the moved container # the list of objects to be persisted. if is_particle_producer(main_producer): main_particles = particle_output(main_producer) main_mover = CopyParticles(main_particles, main_location_components) algs.append(main_mover) objects_to_persist += main_mover.outputs.values() else: log.warning( "Unsupported type {} for line output {} (for location {}); this will not be persisted" .format(main_output.type, main_output, main_location_components)) # Loop over the extra outputs and copy them, then add the moved containers # the list of objects to be persisted. for output, location_components in additional_output_locations: producer = _producer(output) if is_particle_producer(producer): particles = particle_output(producer) mover = CopyParticles(particles, location_components) algs.append(mover) objects_to_persist += mover.outputs.values() elif is_flavourtag_producer(producer): if not is_particle_producer(main_producer): log.warning( "Trying to persist FlavourTag but the last algorithm in the line is not a particle producer. Therefor cannot persist FlavourTags objects." ) else: flavourtags = flavourtag_output(producer) mover = CopyFlavourTags(flavourtags, location_components) algs.append(mover) objects_to_persist += mover.outputs.values() else: log.warning( "Unsupported type {} for line output {} (for location {}); this will not be persisted" .format(output.type, output, location_components)) if persistreco: pr_objs = list(persistreco_line_outputs(reco).values()) algs += pr_objs objects_to_persist += pr_objs if tagging_particles and not persistreco: ft_objs = [reco["LongProtos"], reco["UpstreamProtos"]] algs += ft_objs objects_to_persist += ft_objs # All Turbo lines (lines producing output) should always get PVs and the RecSummary if objects_to_persist: if pvs not in algs: algs.append(pvs) objects_to_persist.append(pvs) if rec_summary not in algs: algs.append(rec_summary) objects_to_persist.append(rec_summary) return algs, objects_to_persist, locations_to_move
class SpruceLine(Hlt2Line): """Object fully qualifying a Sprucing line. A variant of `Hlt2Line` for Sprucing selections. Args: All arguments of `Hlt2Line` and the following extra: hlt2_filter_code (list(str)): If not empty, the list of strings is used to define a HLT2 filter that is prepended to the control flow defined by `algs`. ThOr DECREPORTS_RE_FILTER is used. Note that this parameter should look like "['Hlt2ADecision', 'Hlt2.*Decision']". """ _CLASS_NAME_PREFIX = "Spruce" _HLT2_FILTER_SOURCE_ID = 'Hlt2' def __init__( self, name, algs, prescale=1., postscale=1.0, extra_outputs=None, raw_banks=None, persistreco=False, tagging_particles=False, calo_digits=False, calo_clusters=False, pv_tracks=False, track_ancestors=False, hlt1_filter_code="", hlt2_filter_code="", monitoring_variables=("pt", "eta", "m", "vchi2", "ipchi2", "n_candidates"), ): self.hlt2_filter_code = hlt2_filter_code if hlt2_filter_code: hlt2_filter = _return_filter(f"{name}_Hlt2Filter", self.hlt2_filter_code, self._HLT2_FILTER_SOURCE_ID) algs = [hlt2_filter] + algs ##Now pass as HLT2Line with HLT1 filter super(SpruceLine, self).__init__( name=name, algs=algs, prescale=prescale, postscale=postscale, extra_outputs=extra_outputs, raw_banks=raw_banks, persistreco=persistreco, tagging_particles=tagging_particles, calo_digits=calo_digits, calo_clusters=calo_clusters, pv_tracks=pv_tracks, track_ancestors=track_ancestors, hlt1_filter_code=hlt1_filter_code, monitoring_variables=monitoring_variables) if not self.name.startswith(self._CLASS_NAME_PREFIX): raise ValueError("name {!r} does not start with {!r}".format( name, self._CLASS_NAME_PREFIX)) class PassLine(SpruceLine): """Object fully qualifying a Pass through line. A variant of `SpruceLine` that allows no physics selection. Args: hlt2_filter_code (string): If not empty, the string is used to define a HLT2 filter that is prepended to the control flow defined by `algs`. ThOr DECREPORTS_RE_FILTER is used. Note that this parameter should look like "['Hlt2ADecision', 'Hlt2.*Decision']". """ _CLASS_NAME_PREFIX = "Pass" _HLT2_FILTER_SOURCE_ID = 'Hlt2' _HLT2_EXP_FILTER_PREFIX = "Hlt2" def __init__( self, name, prescale=1., hlt2_filter_code="", ): ##Now pass as HLT2Line with no filters super(PassLine, self).__init__( name, algs=[], prescale=prescale, hlt2_filter_code=hlt2_filter_code)