###############################################################################
# (c) Copyright 2021 CERN for the benefit of the LHCb Collaboration #
# #
# This software is distributed under the terms of the GNU General Public #
# Licence version 3 (GPL Version 3), copied verbatim in the file "COPYING". #
# #
# In applying this licence, CERN does not waive the privileges and immunities #
# granted to it by virtue of its status as an Intergovernmental Organization #
# or submit itself to any jurisdiction. #
###############################################################################
import logging
import Functors as F
from PyConf.Algorithms import (
DeterministicPrescaler,
VoidFilter,
)
from PyConf.application import make_odin
from PyConf.components import Algorithm
from PyConf.reading import get_decreports
from PyConf.control_flow import CompositeNode, NodeLogic
from Moore.selreports import (
UnconvertableAlgorithmError,
convert_output as convert_output_for_selreports,
)
from .monitoring import monitoring, run_default_monitoring
#: Ending common to all line decision names
DECISION_SUFFIX = "Decision"
log = logging.getLogger(__name__)
def _producer(datahandle_or_producer):
try:
return datahandle_or_producer.producer
except AttributeError:
return datahandle_or_producer
def _node_basics(node):
"""Return the flattened list of all non-node descendents of `node`."""
basics = []
for child in node.children:
if isinstance(child, CompositeNode):
basics += _node_basics(child)
else:
basics.append(child)
return basics
def _remove_decision_output(algs, decision_node):
"""Return `algs` with any children of `decision_node` removed.
"""
decision_algs = set(_node_basics(decision_node))
return [a for a in map(_producer, algs) if a not in decision_algs]
def _return_filter(name, hlt_filter_code, filter_source_id):
if not isinstance(hlt_filter_code, list):
hlt_filter_code = [hlt_filter_code]
assert all(
'Decision' in line for line in
hlt_filter_code), "hlt_filter_code must use the line DECISION_SUFFIX"
if not all(filter_source_id in line for line in hlt_filter_code):
raise ValueError(
"A {!r} filter can only interpret {!r} line filters.".format(
filter_source_id, filter_source_id))
line_regex = "|".join(line for line in hlt_filter_code)
return VoidFilter(
name=name,
allow_duplicate_instances_with_distinct_names=True,
Cut=F.DECREPORTS_RE_FILTER(
Regex=line_regex, DecReports=get_decreports(filter_source_id)))
[docs]class DecisionLine(object):
"""Object fully qualifying an HLT line.
The control flow of the line, a `PyConf.control_flow.CompositeNode`, is
exposed as the `node` property. It will run the given `algs` prepended
with a `DeterministicPrescaler` (even if the prescale is 1), all combined
with `PyConf.control_flow.NodeLogic.LAZY_AND` logic.
The attributes of this object should be considered immutable.
Args:
name (str): name of the line
algs (iterable of `Algorithm`): control flow of the line
prescale (float): accept fraction of the prescaler
postscale (float): accept fraction of the postscaler
Attributes:
name (str): name of the line
node (`PyConf.control_flow.CompositeNode`): full control flow of the
line; if this CF node is positive, the line is said to have
'fired'
output_producer (`Algorithm`): the last algorithm in the line's
control flow. On a positive CF decision the application may wish
to use this output, e.g. to fill selection reports or luminosity
banks
"""
def __init__(self, name, algs, prescale=1.0, postscale=1.0):
# Line names ending with "Decision" would be confusing, so forbid it
if name.endswith(DECISION_SUFFIX):
raise ValueError("line name ({}) should not end with {}".format(
name, DECISION_SUFFIX))
self.name = name
self.node = self._decision_node(self.name, algs, prescale, postscale)
self.output_producer = self._output_producer(self.node)
@property
def decision_name(self):
"""Decision name."""
return self.name + DECISION_SUFFIX
@property
def produces_output(self):
"""Return True if this line produces output."""
return self.output_producer is not None
def __str__(self):
return (f"<{type(self).__name__} with name \"{self.name}\"" +
f" at {id(self):#0x}>")
@staticmethod
def _decision_node(name, algs, prescale, postscale):
"""Return a `CompositeNode` that evaluates this line's decision."""
prescaler = DeterministicPrescaler(
name=f"{name}_Prescaler",
AcceptFraction=float(
prescale
), # make sure prescale is not interpreted as 'int' because it changes the hash computation...
SeedName=name + "Prescaler",
ODINLocation=make_odin())
postscaler = DeterministicPrescaler(
name=f"{name}_Postscaler",
AcceptFraction=float(postscale),
SeedName=name + "Postscaler",
ODINLocation=make_odin())
node = CompositeNode(name, (prescaler, ) + tuple(algs) + (
(postscaler, ) if postscale < 1.0 else tuple()))
return node
@staticmethod
def _output_producer(node):
"""Return the producer that defines the 'output data' of this line.
The producer is defined as the last child in the control flow node,
i.e. the last item passed as the `algs` argument to the
`DecisionLine` constructor. In cases where the last algorithms are monitoring
algorithm, the last producer is searched for.
How the output data of the producer is used depends on the application.
If the producer creates no output, None is returned.
"""
for c in filter(
lambda c: isinstance(c, Algorithm) and not c.typename.startswith('Monitor_'),
reversed(node.children)):
last = c
break
# Could in principle have control node here; will deal with this use
# case if it arises
# TODO: we are already checking that "last" is an algorithm...
assert isinstance(last, Algorithm), last
# If the last algorithm produces nothing, there is no 'producer'
return last if last.outputs else None
# FIXME rename to Hlt2LumiNanofyLine
class Hlt2LuminosityLine(DecisionLine):
"""Line that propagates and "nanofies" lumi events.
Hlt2LuminosityLine is used to propagate and "nanofy" the lumi events
selected in HLT 1. "Nanofy" means that the line defines the exact
set of raw banks to be saved independently from what the default for
a stream is. This special treatment happens in config.py:stream_writer().
The output of this line is only used to count the number of lumi
events in physics streams. Therefore, only the minimum necessary
banks are requested: ODIN and HltRoutingBits.
"""
_HLT1_FILTER_SOURCE_ID = "Hlt1"
_instantiated = False
def __init__(
self,
algs=[],
hlt1_filter_code="Hlt1ODINLumiDecision",
):
if Hlt2LuminosityLine._instantiated:
raise RuntimeError("Can instantiate Hlt2LuminosityLine only once")
Hlt2LuminosityLine._instantiated = True
name = "Hlt2Lumi"
self.hlt1_filter_code = hlt1_filter_code
if hlt1_filter_code:
hlt1_filter = _return_filter(f"{name}_Hlt1Filter",
self.hlt1_filter_code,
self._HLT1_FILTER_SOURCE_ID)
algs = [hlt1_filter] + algs
self.raw_banks = ["ODIN", "HltRoutingBits"]
super(Hlt2LuminosityLine, self).__init__(
name, algs, prescale=1.0, postscale=1.0)
class Hlt2LumiCountersLine(DecisionLine):
"""Line that computes HLT2 lumi counters.
Hlt2LumiCountersLine is used to compute luminosity counters and
merge them into an HltLumiSummary object. This is then consumed by
the HltLumiWriter raw bank encoder.
"""
_HLT1_FILTER_SOURCE_ID = "Hlt1"
_instantiated = False
def __init__(
self,
algs,
output,
hlt1_filter_code="Hlt1ODINLumiDecision",
raw_banks=None,
):
if Hlt2LumiCountersLine._instantiated:
raise RuntimeError(
"Can instantiate Hlt2LumiCountersLine only once")
Hlt2LumiCountersLine._instantiated = True
name = "Hlt2LumiCounters"
self.hlt1_filter_code = hlt1_filter_code
if hlt1_filter_code:
hlt1_filter = _return_filter(f"{name}_Hlt1Filter",
self.hlt1_filter_code,
self._HLT1_FILTER_SOURCE_ID)
algs = [hlt1_filter] + algs
self.raw_banks = tuple(sorted(set(raw_banks or []), key=hash))
# FIXME remove key=hash everywhere in this file...
super(Hlt2LumiCountersLine, self).__init__(
name, algs, prescale=1.0, postscale=1.0)
if "LHCb::RawBank" not in output.type:
raise TypeError(
f"The output producer must produce a RawBank view but got {format(output.type)}"
)
self.output_producer = output.producer
class Hlt1Line(DecisionLine):
"""Object fully qualifying an HLT1 line.
Extends `DecisionLine` with control flow that ensures additional objects
are created for later persistence on a positive decision.
Args:
name (str): name of the line; must begin with `Hlt1`.
algs (iterable of `Algorithm`): control flow of the line
prescale (float): accept fraction of the prescaler
postscale (float): accept fraction of the postscaler
Attributes:
objects_to_persist (list of DataHandle): Objects which this lines
requests to be persisted on a positive decision. The control flow
of the line will guarantee that these objects will exist in that
case.
"""
def __init__(self, name, algs, prescale=1., postscale=1.0, raw_banks=None):
super(Hlt1Line, self).__init__(name, algs, prescale, postscale)
assert self.name.startswith("Hlt1")
# The line guarantees that these objects will be present in the TES if
# this line made a positive decision
self.objects_to_persist = []
self.raw_banks = tuple(sorted(set(raw_banks or []), key=hash))
if self.produces_output:
output_node, self.objects_to_persist = self._output_node(
self.node, self.output_producer)
# Wrap the decision and output nodes
if output_node is not None:
self.node = CompositeNode(self.name + "DecisionWithOutput",
(self.node, output_node))
def to_json(self):
return (self.name, self.node)
@staticmethod
def _output_node(decision_node, output_producer):
try:
algs = [convert_output_for_selreports(output_producer)]
objects_to_persist = list(algs[-1].outputs.values())
except UnconvertableAlgorithmError as e:
log.warning(
"Cannot convert output objects for line {}; SelReports will be unavailable: {}"
.format(decision_node.name, e))
algs = []
objects_to_persist = []
# Algorithms already in the decision CF don't need to be in the
# output CF. Although one can imagine a scheduler that allows this
# type of duplication, it is currently not permitted under an
# ordered control flow node (see LHCb#108)
algs = _remove_decision_output(algs, decision_node)
if algs:
output_node = CompositeNode(
decision_node.name + "Output",
algs,
combine_logic=NodeLogic.NONLAZY_OR,
force_order=False)
else:
output_node = None
return output_node, objects_to_persist
[docs]class Hlt2Line(DecisionLine):
"""Object fully qualifying an HLT2 line.
Extends `DecisionLine` with control flow that ensures additional objects
are created for later persistence on a positive decision. This supports
extra outputs (TurboSP) and reconstruction persistence
(Turbo++/PersistReco).
Args:
name (str): name of the line; must begin with `Hlt2`.
algs (iterable of `Algorithm`): control flow of the line
prescale (float): accept fraction of the prescaler
postscale (float): accept fraction of the postscaler
extra_outputs (iterable of 2-tuple): List of `(name, DataHandle)` pairs.
persistreco (bool): If True, request HLT2 reconstruction persistence.
hlt1_filter_code (list(str)): string used to define a HLT1 filter.
Attributes:
objects_to_persist (list of DataHandle): Objects which this lines
requests to be persisted on a positive decision. The control flow
of the line will guarantee that these objects will exist in that
case.
extra_outputs (iterable of 2-tuple): List of `(name, DataHandle)`
pairs which this line requests are persisted on a positive
decision. The name is just an identifier, which can be used by
the application to construct a TES path when persisting each
extra output
persistreco (bool): If True, this line requests HLT2 reconstruction
persistence on a positive decision. The CF of the line (the
`node` attribute) will ensure that the reconstruction algorithms
that produce the reconstruction objects will run when the line
fires
locations_to_move (dict of strings): Dictionary of TES paths, which
the persistence code will need to move (keys) and the paths they
should be moved to (values). This is currently on filled in case
a FlavourTag object is found in extra_outputs.
hlt1_filter_code (list(str)): If not empty, the string is used to define a
HLT1 filter that is prepended to the control flow defined by `algs`.
If empty, a default filter according to 'get_default_hlt1_filter_code_for_hlt2' is applied.
ThOr DECREPORTS_RE_FILTER is used. Note that this parameter should look
like "['Hlt1ADecision', 'Hlt1.*Decision']".
monitoring_variables (tuple(str)): Variables to create default monitoring plots for.
These variables need to be implemented in .monitoring.py
"""
_CLASS_NAME_PREFIX = "Hlt2"
_HLT1_FILTER_SOURCE_ID = "Hlt1"
def __init__(
self,
name,
algs,
prescale=1.,
postscale=1.0,
extra_outputs=None,
persistreco=False,
tagging_particles=False,
calo_digits=False,
calo_clusters=False,
pv_tracks=False,
track_ancestors=False,
raw_banks=None,
hlt1_filter_code="",
monitoring_variables=("pt", "eta", "m", "vchi2", "ipchi2",
"n_candidates"),
):
# import needs to be here to pass tests
from Hlt2Conf.settings.defaults import get_default_hlt1_filter_code_for_hlt2
self.hlt1_filter_code = hlt1_filter_code
if hlt1_filter_code:
hlt1_filter_seq = [
_return_filter(f"{name}_Hlt1Filter", self.hlt1_filter_code,
self._HLT1_FILTER_SOURCE_ID)
]
elif get_default_hlt1_filter_code_for_hlt2() != "":
self.hlt1_filter_code = get_default_hlt1_filter_code_for_hlt2()
hlt1_filter_seq = [
_return_filter("Default_Hlt1Filter", self.hlt1_filter_code,
self._HLT1_FILTER_SOURCE_ID)
]
else:
hlt1_filter_seq = []
algs = hlt1_filter_seq + algs
self.monitoring_variables = monitoring_variables
# the first is a local switch for every instance of Hlt2Line, the second a global one (`with run_default_monitoring.bind(run=False)`).
if self.monitoring_variables and run_default_monitoring():
monitoring_algs = monitoring(algs, name, self.monitoring_variables)
if len(monitoring_algs) == 0:
log.debug(f"No default monitoring for {name}")
else:
algs += monitoring_algs
super(Hlt2Line, self).__init__(name, algs, prescale, postscale)
if not self.name.startswith(self._CLASS_NAME_PREFIX):
raise ValueError("name {!r} does not start with {!r}".format(
name, self._CLASS_NAME_PREFIX))
self.extra_outputs = tuple(sorted(set(extra_outputs or []), key=hash))
self.raw_banks = tuple(sorted(set(raw_banks or []), key=hash))
self.persistreco = persistreco
self.tagging_particles = tagging_particles
self.calo_digits = calo_digits
self.calo_clusters = calo_clusters
self.pv_tracks = True if persistreco else pv_tracks
self.track_ancestors = track_ancestors
# The line guarantees that these objects will be present in the TES if
# this line made a positive decision
self.objects_to_persist = []
if self.produces_output:
output_node, self.objects_to_persist, self.locations_to_move = self._output_node(
self.node, self.output_producer, self.extra_outputs,
self.persistreco, self.tagging_particles)
# Wrap the decision and output nodes
if output_node is not None:
self.node = CompositeNode(self.name + "DecisionWithOutput",
(self.node, output_node))
def to_json(self):
return (self.name, self.node, self.extra_outputs, self.persistreco,
self.tagging_particles)
##Overwrite `produces_output` so that pass through lines with `persistreco==True` or `extra_outputs` count as `physics_lines` in `config.py`
@property
def produces_output(self):
"""Return True if this line produces output OR requests reconstruction OR requests extra_outputs."""
return self.output_producer is not None or self.persistreco or self.tagging_particles or self.extra_outputs is not None
@staticmethod
def _output_node(decision_node, output_producer, extra_outputs,
persistreco, tagging_particles):
# Build 2-tuple of (output path components, output producer) for main candidate of the HLT2 line
main_output = (output_producer, (decision_node.name, ))
# Build list of 2-tuple of (output path components, output producer) for the extra_outputs
additional_outputs = []
for prefix, output in extra_outputs:
additional_outputs.append((output, (decision_node.name, prefix)))
algs, objects_to_persist, locations_to_move = Hlt2Line._line_outputs(
main_output, additional_outputs, persistreco, tagging_particles)
# Algorithms already in the decision CF don't need to be in the
# output CF. Although one can imagine a scheduler that allows this
# type of duplication, it is currently not permitted under an
# ordered control flow node (see LHCb#108)
algs = _remove_decision_output(algs, decision_node)
if algs:
# Run all output producers in a separate CF node, and record all of
# their outputs so Moore can persist them
output_node = CompositeNode(
decision_node.name + "Output",
algs,
combine_logic=NodeLogic.NONLAZY_OR,
force_order=False)
else:
output_node = None
return output_node, objects_to_persist, locations_to_move
@staticmethod
#def _line_outputs(outputs_locations, persistreco):
def _line_outputs(main_output_location, additional_output_locations,
persistreco, tagging_particles):
"""Return output-producing algorithms and their outputs.
Args:
main_outputs_location (2-tuple): 2-tuple of a DataHandle and location
components tuple for the main candidate of the HLT2 line. The semantics are
that this line wishes to have the object represented by the
DataHandle to be persisted at the TES location represented by
the location tuple (an N-tuple of str).
additional_outputs_locations (list of 2-tuple): Each entry is a 2-tuple of
a DataHandle and location components tuple. The semantics are
that this line wishes to have the object represented by the
DataHandle to be persisted at the TES location represented by
the location tuple (an N-tuple of str).
persistreco (bool): If True, this line requests the full HLT2
reconstruction be persisted along with its other outputs.
tagging_particles (bool): If True, this line requests the proto
particles for tagging be persisted along with its other outputs.
Returns:
algs (list of Algorithm): Output data producer algorithms which
this line's CF must run on a positive decision, in order to
produce the output objects it has requested.
objects_to_persist (list of DataHandle): Output objects to be
persisted on a positive decision.
Note:
There is an implicit understanding between the logic here and
that implemented in `Moore.persistence`. The latter can only
support persisting certain types of objects, and the logic here
performs some basic configuration-time checking to ensure lines
do not request objects which Moore does not know how to persist.
As of this writing, line outputs and extra outputs can only be
LHCb::Particle producers and LHCb::FlavourTags producers. If
other objects are to be supported,
this method must be updated alongside the cloner algorithms
configured in `Moore.persistence.cloning`.
"""
from RecoConf.reconstruction_objects import reconstruction
from .persistence.particle_moving import (
CopyParticles, CopyFlavourTags, is_particle_producer,
particle_output, is_flavourtag_producer, flavourtag_output)
from .persistence.persistreco import persistreco_line_outputs
reco = reconstruction()
pvs = reco["PVs"]
rec_summary = reco["RecSummary"]
algs = []
objects_to_persist = []
locations_to_move = {}
main_output, main_location_components = main_output_location
main_producer = _producer(main_output)
if main_output is not None:
# Branch on the C++ type of the outputs of the producer
# For legacy LHCb::Particle outputs, we must navigate back to the
# producer to determine whether we should run the LHCb::Particle
# moving algorithm
# Copy the main output of the HLT2 line and add the moved container
# the list of objects to be persisted.
if is_particle_producer(main_producer):
main_particles = particle_output(main_producer)
main_mover = CopyParticles(main_particles,
main_location_components)
algs.append(main_mover)
objects_to_persist += main_mover.outputs.values()
else:
log.warning(
"Unsupported type {} for line output {} (for location {}); this will not be persisted"
.format(main_output.type, main_output,
main_location_components))
# Loop over the extra outputs and copy them, then add the moved containers
# the list of objects to be persisted.
for output, location_components in additional_output_locations:
producer = _producer(output)
if is_particle_producer(producer):
particles = particle_output(producer)
mover = CopyParticles(particles, location_components)
algs.append(mover)
objects_to_persist += mover.outputs.values()
elif is_flavourtag_producer(producer):
if not is_particle_producer(main_producer):
log.warning(
"Trying to persist FlavourTag but the last algorithm in the line is not a particle producer. Therefor cannot persist FlavourTags objects."
)
else:
flavourtags = flavourtag_output(producer)
mover = CopyFlavourTags(flavourtags, location_components)
algs.append(mover)
objects_to_persist += mover.outputs.values()
else:
log.warning(
"Unsupported type {} for line output {} (for location {}); this will not be persisted"
.format(output.type, output, location_components))
if persistreco:
pr_objs = list(persistreco_line_outputs(reco).values())
algs += pr_objs
objects_to_persist += pr_objs
if tagging_particles and not persistreco:
ft_objs = [reco["LongProtos"], reco["UpstreamProtos"]]
algs += ft_objs
objects_to_persist += ft_objs
# All Turbo lines (lines producing output) should always get PVs and the RecSummary
if objects_to_persist:
if pvs not in algs:
algs.append(pvs)
objects_to_persist.append(pvs)
if rec_summary not in algs: algs.append(rec_summary)
objects_to_persist.append(rec_summary)
return algs, objects_to_persist, locations_to_move
class SpruceLine(Hlt2Line):
"""Object fully qualifying a Sprucing line.
A variant of `Hlt2Line` for Sprucing selections.
Args:
All arguments of `Hlt2Line` and the following extra:
hlt2_filter_code (list(str)): If not empty, the list of strings is used to define a
HLT2 filter that is prepended to the control flow defined by `algs`.
ThOr DECREPORTS_RE_FILTER is used. Note that this parameter should look
like "['Hlt2ADecision', 'Hlt2.*Decision']".
"""
_CLASS_NAME_PREFIX = "Spruce"
_HLT2_FILTER_SOURCE_ID = 'Hlt2'
def __init__(
self,
name,
algs,
prescale=1.,
postscale=1.0,
extra_outputs=None,
raw_banks=None,
persistreco=False,
tagging_particles=False,
calo_digits=False,
calo_clusters=False,
pv_tracks=False,
track_ancestors=False,
hlt1_filter_code="",
hlt2_filter_code="",
monitoring_variables=("pt", "eta", "m", "vchi2", "ipchi2",
"n_candidates"),
):
self.hlt2_filter_code = hlt2_filter_code
if hlt2_filter_code:
hlt2_filter = _return_filter(f"{name}_Hlt2Filter",
self.hlt2_filter_code,
self._HLT2_FILTER_SOURCE_ID)
algs = [hlt2_filter] + algs
##Now pass as HLT2Line with HLT1 filter
super(SpruceLine, self).__init__(
name=name,
algs=algs,
prescale=prescale,
postscale=postscale,
extra_outputs=extra_outputs,
raw_banks=raw_banks,
persistreco=persistreco,
tagging_particles=tagging_particles,
calo_digits=calo_digits,
calo_clusters=calo_clusters,
pv_tracks=pv_tracks,
track_ancestors=track_ancestors,
hlt1_filter_code=hlt1_filter_code,
monitoring_variables=monitoring_variables)
if not self.name.startswith(self._CLASS_NAME_PREFIX):
raise ValueError("name {!r} does not start with {!r}".format(
name, self._CLASS_NAME_PREFIX))
class PassLine(SpruceLine):
"""Object fully qualifying a Pass through line.
A variant of `SpruceLine` that allows no physics selection.
Args:
hlt2_filter_code (string): If not empty, the string is used to define a
HLT2 filter that is prepended to the control flow defined by `algs`.
ThOr DECREPORTS_RE_FILTER is used. Note that this parameter should look
like "['Hlt2ADecision', 'Hlt2.*Decision']".
"""
_CLASS_NAME_PREFIX = "Pass"
_HLT2_FILTER_SOURCE_ID = 'Hlt2'
_HLT2_EXP_FILTER_PREFIX = "Hlt2"
def __init__(
self,
name,
prescale=1.,
hlt2_filter_code="",
):
##Now pass as HLT2Line with no filters
super(PassLine, self).__init__(
name,
algs=[],
prescale=prescale,
hlt2_filter_code=hlt2_filter_code)