Source code for opentelemetry.sdk.metrics._internal.aggregation

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=too-many-lines

from abc import ABC, abstractmethod
from bisect import bisect_left
from enum import IntEnum
from logging import getLogger
from math import inf
from threading import Lock
from typing import Generic, List, Optional, Sequence, TypeVar

from opentelemetry.metrics import (
    Asynchronous,
    Counter,
    Histogram,
    Instrument,
    ObservableCounter,
    ObservableGauge,
    ObservableUpDownCounter,
    Synchronous,
    UpDownCounter,
)
from opentelemetry.sdk.metrics._internal.exponential_histogram.buckets import (
    Buckets,
)
from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.exponent_mapping import (
    ExponentMapping,
)
from opentelemetry.sdk.metrics._internal.exponential_histogram.mapping.logarithm_mapping import (
    LogarithmMapping,
)
from opentelemetry.sdk.metrics._internal.measurement import Measurement
from opentelemetry.sdk.metrics._internal.point import Buckets as BucketsPoint
from opentelemetry.sdk.metrics._internal.point import (
    ExponentialHistogramDataPoint,
    Gauge,
)
from opentelemetry.sdk.metrics._internal.point import (
    Histogram as HistogramPoint,
)
from opentelemetry.sdk.metrics._internal.point import (
    HistogramDataPoint,
    NumberDataPoint,
    Sum,
)
from opentelemetry.util.types import Attributes

_DataPointVarT = TypeVar("_DataPointVarT", NumberDataPoint, HistogramDataPoint)

_logger = getLogger(__name__)


[docs]class AggregationTemporality(IntEnum): """ The temporality to use when aggregating data. Can be one of the following values: """ UNSPECIFIED = 0 DELTA = 1 CUMULATIVE = 2
class _Aggregation(ABC, Generic[_DataPointVarT]): def __init__(self, attributes: Attributes): self._lock = Lock() self._attributes = attributes self._previous_point = None @abstractmethod def aggregate(self, measurement: Measurement) -> None: pass @abstractmethod def collect( self, aggregation_temporality: AggregationTemporality, collection_start_nano: int, ) -> Optional[_DataPointVarT]: pass class _DropAggregation(_Aggregation): def aggregate(self, measurement: Measurement) -> None: pass def collect( self, aggregation_temporality: AggregationTemporality, collection_start_nano: int, ) -> Optional[_DataPointVarT]: pass class _SumAggregation(_Aggregation[Sum]): def __init__( self, attributes: Attributes, instrument_is_monotonic: bool, instrument_temporality: AggregationTemporality, start_time_unix_nano: int, ): super().__init__(attributes) self._start_time_unix_nano = start_time_unix_nano self._instrument_temporality = instrument_temporality self._instrument_is_monotonic = instrument_is_monotonic if self._instrument_temporality is AggregationTemporality.DELTA: self._value = 0 else: self._value = None def aggregate(self, measurement: Measurement) -> None: with self._lock: if self._value is None: self._value = 0 self._value = self._value + measurement.value def collect( self, aggregation_temporality: AggregationTemporality, collection_start_nano: int, ) -> Optional[NumberDataPoint]: """ Atomically return a point for the current value of the metric and reset the aggregation value. """ if self._instrument_temporality is AggregationTemporality.DELTA: with self._lock: value = self._value start_time_unix_nano = self._start_time_unix_nano self._value = 0 self._start_time_unix_nano = collection_start_nano else: with self._lock: if self._value is None: return None value = self._value self._value = None start_time_unix_nano = self._start_time_unix_nano current_point = NumberDataPoint( attributes=self._attributes, start_time_unix_nano=start_time_unix_nano, time_unix_nano=collection_start_nano, value=value, ) if self._previous_point is None or ( self._instrument_temporality is aggregation_temporality ): # Output DELTA for a synchronous instrument # Output CUMULATIVE for an asynchronous instrument self._previous_point = current_point return current_point if aggregation_temporality is AggregationTemporality.DELTA: # Output temporality DELTA for an asynchronous instrument value = current_point.value - self._previous_point.value output_start_time_unix_nano = self._previous_point.time_unix_nano else: # Output CUMULATIVE for a synchronous instrument value = current_point.value + self._previous_point.value output_start_time_unix_nano = ( self._previous_point.start_time_unix_nano ) current_point = NumberDataPoint( attributes=self._attributes, start_time_unix_nano=output_start_time_unix_nano, time_unix_nano=current_point.time_unix_nano, value=value, ) self._previous_point = current_point return current_point class _LastValueAggregation(_Aggregation[Gauge]): def __init__(self, attributes: Attributes): super().__init__(attributes) self._value = None def aggregate(self, measurement: Measurement): with self._lock: self._value = measurement.value def collect( self, aggregation_temporality: AggregationTemporality, collection_start_nano: int, ) -> Optional[_DataPointVarT]: """ Atomically return a point for the current value of the metric. """ with self._lock: if self._value is None: return None value = self._value self._value = None return NumberDataPoint( attributes=self._attributes, start_time_unix_nano=0, time_unix_nano=collection_start_nano, value=value, ) class _ExplicitBucketHistogramAggregation(_Aggregation[HistogramPoint]): def __init__( self, attributes: Attributes, start_time_unix_nano: int, boundaries: Sequence[float] = ( 0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0, 7500.0, 10000.0, ), record_min_max: bool = True, ): super().__init__(attributes) self._boundaries = tuple(boundaries) self._bucket_counts = self._get_empty_bucket_counts() self._min = inf self._max = -inf self._sum = 0 self._record_min_max = record_min_max self._start_time_unix_nano = start_time_unix_nano # It is assumed that the "natural" aggregation temporality for a # Histogram instrument is DELTA, like the "natural" aggregation # temporality for a Counter is DELTA and the "natural" aggregation # temporality for an ObservableCounter is CUMULATIVE. self._instrument_temporality = AggregationTemporality.DELTA def _get_empty_bucket_counts(self) -> List[int]: return [0] * (len(self._boundaries) + 1) def aggregate(self, measurement: Measurement) -> None: value = measurement.value if self._record_min_max: self._min = min(self._min, value) self._max = max(self._max, value) self._sum += value self._bucket_counts[bisect_left(self._boundaries, value)] += 1 def collect( self, aggregation_temporality: AggregationTemporality, collection_start_nano: int, ) -> Optional[_DataPointVarT]: """ Atomically return a point for the current value of the metric. """ with self._lock: if not any(self._bucket_counts): return None bucket_counts = self._bucket_counts start_time_unix_nano = self._start_time_unix_nano sum_ = self._sum max_ = self._max min_ = self._min self._bucket_counts = self._get_empty_bucket_counts() self._start_time_unix_nano = collection_start_nano self._sum = 0 self._min = inf self._max = -inf current_point = HistogramDataPoint( attributes=self._attributes, start_time_unix_nano=start_time_unix_nano, time_unix_nano=collection_start_nano, count=sum(bucket_counts), sum=sum_, bucket_counts=tuple(bucket_counts), explicit_bounds=self._boundaries, min=min_, max=max_, ) if self._previous_point is None or ( self._instrument_temporality is aggregation_temporality ): self._previous_point = current_point return current_point max_ = current_point.max min_ = current_point.min if aggregation_temporality is AggregationTemporality.CUMULATIVE: start_time_unix_nano = self._previous_point.start_time_unix_nano sum_ = current_point.sum + self._previous_point.sum # Only update min/max on delta -> cumulative max_ = max(current_point.max, self._previous_point.max) min_ = min(current_point.min, self._previous_point.min) bucket_counts = [ curr_count + prev_count for curr_count, prev_count in zip( current_point.bucket_counts, self._previous_point.bucket_counts, ) ] else: start_time_unix_nano = self._previous_point.time_unix_nano sum_ = current_point.sum - self._previous_point.sum bucket_counts = [ curr_count - prev_count for curr_count, prev_count in zip( current_point.bucket_counts, self._previous_point.bucket_counts, ) ] current_point = HistogramDataPoint( attributes=self._attributes, start_time_unix_nano=start_time_unix_nano, time_unix_nano=current_point.time_unix_nano, count=sum(bucket_counts), sum=sum_, bucket_counts=tuple(bucket_counts), explicit_bounds=current_point.explicit_bounds, min=min_, max=max_, ) self._previous_point = current_point return current_point # pylint: disable=protected-access class _ExponentialBucketHistogramAggregation(_Aggregation[HistogramPoint]): # _min_max_size and _max_max_size are the smallest and largest values # the max_size parameter may have, respectively. # _min_max_size is is the smallest reasonable value which is small enough # to contain the entire normal floating point range at the minimum scale. _min_max_size = 2 # _max_max_size is an arbitrary limit meant to limit accidental creation of # giant exponential bucket histograms. _max_max_size = 16384 def __init__( self, attributes: Attributes, start_time_unix_nano: int, # This is the default maximum number of buckets per positive or # negative number range. The value 160 is specified by OpenTelemetry. # See the derivation here: # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exponential-bucket-histogram-aggregation) max_size: int = 160, ): super().__init__(attributes) # max_size is the maximum capacity of the positive and negative # buckets. if max_size < self._min_max_size: raise ValueError( f"Buckets max size {max_size} is smaller than " "minimum max size {self._min_max_size}" ) if max_size > self._max_max_size: raise ValueError( f"Buckets max size {max_size} is larger than " "maximum max size {self._max_max_size}" ) self._max_size = max_size # _sum is the sum of all the values aggregated by this aggregator. self._sum = 0 # _count is the count of all calls to aggregate. self._count = 0 # _zero_count is the count of all the calls to aggregate when the value # to be aggregated is exactly 0. self._zero_count = 0 # _min is the smallest value aggregated by this aggregator. self._min = inf # _max is the smallest value aggregated by this aggregator. self._max = -inf # _positive holds the positive values. self._positive = Buckets() # _negative holds the negative values by their absolute value. self._negative = Buckets() # _mapping corresponds to the current scale, is shared by both the # positive and negative buckets. self._mapping = LogarithmMapping(LogarithmMapping._max_scale) self._instrument_temporality = AggregationTemporality.DELTA self._start_time_unix_nano = start_time_unix_nano self._previous_scale = None self._previous_start_time_unix_nano = None self._previous_sum = None self._previous_max = None self._previous_min = None self._previous_positive = None self._previous_negative = None def aggregate(self, measurement: Measurement) -> None: # pylint: disable=too-many-branches,too-many-statements, too-many-locals with self._lock: value = measurement.value # 0. Set the following attributes: # _min # _max # _count # _zero_count # _sum if value < self._min: self._min = value if value > self._max: self._max = value self._count += 1 if value == 0: self._zero_count += 1 # No need to do anything else if value is zero, just increment the # zero count. return self._sum += value # 1. Use the positive buckets for positive values and the negative # buckets for negative values. if value > 0: buckets = self._positive else: # Both exponential and logarithm mappings use only positive values # so the absolute value is used here. value = -value buckets = self._negative # 2. Compute the index for the value at the current scale. index = self._mapping.map_to_index(value) # IncrementIndexBy starts here # 3. Determine if a change of scale is needed. is_rescaling_needed = False if len(buckets) == 0: buckets.index_start = index buckets.index_end = index buckets.index_base = index elif ( index < buckets.index_start and (buckets.index_end - index) >= self._max_size ): is_rescaling_needed = True low = index high = buckets.index_end elif ( index > buckets.index_end and (index - buckets.index_start) >= self._max_size ): is_rescaling_needed = True low = buckets.index_start high = index # 4. Rescale the mapping if needed. if is_rescaling_needed: self._downscale( self._get_scale_change(low, high), self._positive, self._negative, ) index = self._mapping.map_to_index(value) # 5. If the index is outside # [buckets.index_start, buckets.index_end] readjust the buckets # boundaries or add more buckets. if index < buckets.index_start: span = buckets.index_end - index if span >= len(buckets.counts): buckets.grow(span + 1, self._max_size) buckets.index_start = index elif index > buckets.index_end: span = index - buckets.index_start if span >= len(buckets.counts): buckets.grow(span + 1, self._max_size) buckets.index_end = index # 6. Compute the index of the bucket to be incremented. bucket_index = index - buckets.index_base if bucket_index < 0: bucket_index += len(buckets.counts) # 7. Increment the bucket. buckets.increment_bucket(bucket_index) def collect( self, aggregation_temporality: AggregationTemporality, collection_start_nano: int, ) -> Optional[_DataPointVarT]: """ Atomically return a point for the current value of the metric. """ # pylint: disable=too-many-statements, too-many-locals with self._lock: if self._count == 0: return None current_negative = self._negative current_positive = self._positive current_zero_count = self._zero_count current_count = self._count current_start_time_unix_nano = self._start_time_unix_nano current_sum = self._sum current_max = self._max if current_max == -inf: current_max = None current_min = self._min if current_min == inf: current_min = None if self._count == self._zero_count: current_scale = 0 else: current_scale = self._mapping.scale self._negative = Buckets() self._positive = Buckets() self._start_time_unix_nano = collection_start_nano self._sum = 0 self._count = 0 self._zero_count = 0 self._min = inf self._max = -inf current_point = ExponentialHistogramDataPoint( attributes=self._attributes, start_time_unix_nano=current_start_time_unix_nano, time_unix_nano=collection_start_nano, count=current_count, sum=current_sum, scale=current_scale, zero_count=current_zero_count, positive=BucketsPoint( offset=current_positive.offset, bucket_counts=current_positive.counts, ), negative=BucketsPoint( offset=current_negative.offset, bucket_counts=current_negative.counts, ), # FIXME: Find the right value for flags flags=0, min=current_min, max=current_max, ) if self._previous_scale is None or ( self._instrument_temporality is aggregation_temporality ): self._previous_scale = current_scale self._previous_start_time_unix_nano = ( current_start_time_unix_nano ) self._previous_max = current_max self._previous_min = current_min self._previous_sum = current_sum self._previous_positive = current_positive self._previous_negative = current_negative return current_point min_scale = min(self._previous_scale, current_scale) low_positive, high_positive = self._get_low_high_previous_current( self._previous_positive, current_positive, min_scale ) low_negative, high_negative = self._get_low_high_previous_current( self._previous_negative, current_negative, min_scale ) min_scale = min( min_scale - self._get_scale_change(low_positive, high_positive), min_scale - self._get_scale_change(low_negative, high_negative), ) # FIXME Go implementation checks if the histogram (not the mapping # but the histogram) has a count larger than zero, if not, scale # (the histogram scale) would be zero. See exponential.go 191 self._downscale( self._mapping.scale - min_scale, self._previous_positive, self._previous_negative, ) if aggregation_temporality is AggregationTemporality.CUMULATIVE: start_time_unix_nano = self._previous_start_time_unix_nano sum_ = current_sum + self._previous_sum # Only update min/max on delta -> cumulative max_ = max(current_max, self._previous_max) min_ = min(current_min, self._previous_min) self._merge( self._previous_positive, current_positive, current_scale, min_scale, aggregation_temporality, ) self._merge( self._previous_negative, current_negative, current_scale, min_scale, aggregation_temporality, ) else: start_time_unix_nano = self._previous_start_time_unix_nano sum_ = current_sum - self._previous_sum max_ = current_max min_ = current_min self._merge( self._previous_positive, current_positive, current_scale, min_scale, aggregation_temporality, ) self._merge( self._previous_negative, current_negative, current_scale, min_scale, aggregation_temporality, ) current_point = ExponentialHistogramDataPoint( attributes=self._attributes, start_time_unix_nano=start_time_unix_nano, time_unix_nano=collection_start_nano, count=current_count, sum=sum_, scale=current_scale, zero_count=current_zero_count, positive=BucketsPoint( offset=current_positive.offset, bucket_counts=current_positive.counts, ), negative=BucketsPoint( offset=current_negative.offset, bucket_counts=current_negative.counts, ), # FIXME: Find the right value for flags flags=0, min=min_, max=max_, ) self._previous_scale = current_scale self._previous_positive = current_positive self._previous_negative = current_negative self._previous_start_time_unix_nano = current_start_time_unix_nano self._previous_sum = current_sum return current_point def _get_low_high_previous_current( self, previous_point_buckets, current_point_buckets, min_scale ): (previous_point_low, previous_point_high) = self._get_low_high( previous_point_buckets, min_scale ) (current_point_low, current_point_high) = self._get_low_high( current_point_buckets, min_scale ) if current_point_low > current_point_high: low = previous_point_low high = previous_point_high elif previous_point_low > previous_point_high: low = current_point_low high = current_point_high else: low = min(previous_point_low, current_point_low) high = max(previous_point_high, current_point_high) return low, high def _get_low_high(self, buckets, min_scale): if buckets.counts == [0]: return 0, -1 shift = self._mapping._scale - min_scale return buckets.index_start >> shift, buckets.index_end >> shift def _get_scale_change(self, low, high): change = 0 while high - low >= self._max_size: high = high >> 1 low = low >> 1 change += 1 return change def _downscale(self, change: int, positive, negative): if change == 0: return if change < 0: raise Exception("Invalid change of scale") new_scale = self._mapping.scale - change positive.downscale(change) negative.downscale(change) if new_scale <= 0: mapping = ExponentMapping(new_scale) else: mapping = LogarithmMapping(new_scale) self._mapping = mapping def _merge( self, previous_buckets, current_buckets, current_scale, min_scale, aggregation_temporality, ): current_change = current_scale - min_scale for current_bucket_index, current_bucket in enumerate( current_buckets.counts ): if current_bucket == 0: continue # Not considering the case where len(previous_buckets) == 0. This # would not happen because self._previous_point is only assigned to # an ExponentialHistogramDataPoint object if self._count != 0. index = ( current_buckets.offset + current_bucket_index ) >> current_change if index < previous_buckets.index_start: span = previous_buckets.index_end - index if span >= self._max_size: raise Exception("Incorrect merge scale") if span >= len(previous_buckets.counts): previous_buckets.grow(span + 1, self._max_size) previous_buckets.index_start = index if index > previous_buckets.index_end: span = index - previous_buckets.index_end if span >= self._max_size: raise Exception("Incorrect merge scale") if span >= len(previous_buckets.counts): previous_buckets.grow(span + 1, self._max_size) previous_buckets.index_end = index bucket_index = index - previous_buckets.index_base if bucket_index < 0: bucket_index += len(previous_buckets.counts) if aggregation_temporality is AggregationTemporality.DELTA: current_bucket = -current_bucket previous_buckets.increment_bucket( bucket_index, increment=current_bucket )
[docs]class Aggregation(ABC): """ Base class for all aggregation types. """ @abstractmethod def _create_aggregation( self, instrument: Instrument, attributes: Attributes, start_time_unix_nano: int, ) -> _Aggregation: """Creates an aggregation"""
[docs]class DefaultAggregation(Aggregation): """ The default aggregation to be used in a `View`. This aggregation will create an actual aggregation depending on the instrument type, as specified next: ==================================================== ==================================== Instrument Aggregation ==================================================== ==================================== `opentelemetry.sdk.metrics.Counter` `SumAggregation` `opentelemetry.sdk.metrics.UpDownCounter` `SumAggregation` `opentelemetry.sdk.metrics.ObservableCounter` `SumAggregation` `opentelemetry.sdk.metrics.ObservableUpDownCounter` `SumAggregation` `opentelemetry.sdk.metrics.Histogram` `ExplicitBucketHistogramAggregation` `opentelemetry.sdk.metrics.ObservableGauge` `LastValueAggregation` ==================================================== ==================================== """ def _create_aggregation( self, instrument: Instrument, attributes: Attributes, start_time_unix_nano: int, ) -> _Aggregation: # pylint: disable=too-many-return-statements if isinstance(instrument, Counter): return _SumAggregation( attributes, instrument_is_monotonic=True, instrument_temporality=AggregationTemporality.DELTA, start_time_unix_nano=start_time_unix_nano, ) if isinstance(instrument, UpDownCounter): return _SumAggregation( attributes, instrument_is_monotonic=False, instrument_temporality=AggregationTemporality.DELTA, start_time_unix_nano=start_time_unix_nano, ) if isinstance(instrument, ObservableCounter): return _SumAggregation( attributes, instrument_is_monotonic=True, instrument_temporality=AggregationTemporality.CUMULATIVE, start_time_unix_nano=start_time_unix_nano, ) if isinstance(instrument, ObservableUpDownCounter): return _SumAggregation( attributes, instrument_is_monotonic=False, instrument_temporality=AggregationTemporality.CUMULATIVE, start_time_unix_nano=start_time_unix_nano, ) if isinstance(instrument, Histogram): return _ExplicitBucketHistogramAggregation( attributes, start_time_unix_nano ) if isinstance(instrument, ObservableGauge): return _LastValueAggregation(attributes) raise Exception(f"Invalid instrument type {type(instrument)} found")
class ExponentialBucketHistogramAggregation(Aggregation): def __init__( self, max_size: int = 160, ): self._max_size = max_size def _create_aggregation( self, instrument: Instrument, attributes: Attributes, start_time_unix_nano: int, ) -> _Aggregation: return _ExponentialBucketHistogramAggregation( attributes, start_time_unix_nano, max_size=self._max_size, )
[docs]class ExplicitBucketHistogramAggregation(Aggregation): """This aggregation informs the SDK to collect: - Count of Measurement values falling within explicit bucket boundaries. - Arithmetic sum of Measurement values in population. This SHOULD NOT be collected when used with instruments that record negative measurements, e.g. UpDownCounter or ObservableGauge. - Min (optional) Measurement value in population. - Max (optional) Measurement value in population. Args: boundaries: Array of increasing values representing explicit bucket boundary values. record_min_max: Whether to record min and max. """ def __init__( self, boundaries: Sequence[float] = ( 0.0, 5.0, 10.0, 25.0, 50.0, 75.0, 100.0, 250.0, 500.0, 750.0, 1000.0, 2500.0, 5000.0, 7500.0, 10000.0, ), record_min_max: bool = True, ) -> None: self._boundaries = boundaries self._record_min_max = record_min_max def _create_aggregation( self, instrument: Instrument, attributes: Attributes, start_time_unix_nano: int, ) -> _Aggregation: return _ExplicitBucketHistogramAggregation( attributes, start_time_unix_nano, self._boundaries, self._record_min_max, )
[docs]class SumAggregation(Aggregation): """This aggregation informs the SDK to collect: - The arithmetic sum of Measurement values. """ def _create_aggregation( self, instrument: Instrument, attributes: Attributes, start_time_unix_nano: int, ) -> _Aggregation: temporality = AggregationTemporality.UNSPECIFIED if isinstance(instrument, Synchronous): temporality = AggregationTemporality.DELTA elif isinstance(instrument, Asynchronous): temporality = AggregationTemporality.CUMULATIVE return _SumAggregation( attributes, isinstance(instrument, (Counter, ObservableCounter)), temporality, start_time_unix_nano, )
[docs]class LastValueAggregation(Aggregation): """ This aggregation informs the SDK to collect: - The last Measurement. - The timestamp of the last Measurement. """ def _create_aggregation( self, instrument: Instrument, attributes: Attributes, start_time_unix_nano: int, ) -> _Aggregation: return _LastValueAggregation(attributes)
[docs]class DropAggregation(Aggregation): """Using this aggregation will make all measurements be ignored.""" def _create_aggregation( self, instrument: Instrument, attributes: Attributes, start_time_unix_nano: int, ) -> _Aggregation: return _DropAggregation(attributes)