Source code for opentelemetry.sdk.metrics._internal.export

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import math
import os
from abc import ABC, abstractmethod
from enum import Enum
from logging import getLogger
from os import environ, linesep
from sys import stdout
from threading import Event, Lock, RLock, Thread
from time import time_ns
from typing import IO, Callable, Dict, Iterable, Optional

from typing_extensions import final

# This kind of import is needed to avoid Sphinx errors.
import opentelemetry.sdk.metrics._internal
from opentelemetry.context import (
    _SUPPRESS_INSTRUMENTATION_KEY,
    attach,
    detach,
    set_value,
)
from opentelemetry.sdk.environment_variables import (
    OTEL_METRIC_EXPORT_INTERVAL,
    OTEL_METRIC_EXPORT_TIMEOUT,
)
from opentelemetry.sdk.metrics._internal.aggregation import (
    AggregationTemporality,
    DefaultAggregation,
)
from opentelemetry.sdk.metrics._internal.exceptions import MetricsTimeoutError
from opentelemetry.sdk.metrics._internal.instrument import (
    Counter,
    Gauge,
    Histogram,
    ObservableCounter,
    ObservableGauge,
    ObservableUpDownCounter,
    UpDownCounter,
    _Counter,
    _Gauge,
    _Histogram,
    _ObservableCounter,
    _ObservableGauge,
    _ObservableUpDownCounter,
    _UpDownCounter,
)
from opentelemetry.sdk.metrics._internal.point import MetricsData
from opentelemetry.util._once import Once

_logger = getLogger(__name__)


[docs]class MetricExportResult(Enum): """Result of exporting a metric Can be any of the following values:""" SUCCESS = 0 FAILURE = 1
[docs]class MetricExporter(ABC): """Interface for exporting metrics. Interface to be implemented by services that want to export metrics received in their own format. Args: preferred_temporality: Used by `opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader` to configure exporter level preferred temporality. See `opentelemetry.sdk.metrics.export.MetricReader` for more details on what preferred temporality is. preferred_aggregation: Used by `opentelemetry.sdk.metrics.export.PeriodicExportingMetricReader` to configure exporter level preferred aggregation. See `opentelemetry.sdk.metrics.export.MetricReader` for more details on what preferred aggregation is. """ def __init__( self, preferred_temporality: Dict[type, AggregationTemporality] = None, preferred_aggregation: Dict[ type, "opentelemetry.sdk.metrics.view.Aggregation" ] = None, ) -> None: self._preferred_temporality = preferred_temporality self._preferred_aggregation = preferred_aggregation
[docs] @abstractmethod def export( self, metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs, ) -> MetricExportResult: """Exports a batch of telemetry data. Args: metrics: The list of `opentelemetry.sdk.metrics.export.Metric` objects to be exported Returns: The result of the export """
[docs] @abstractmethod def force_flush(self, timeout_millis: float = 10_000) -> bool: """ Ensure that export of any metrics currently received by the exporter are completed as soon as possible. """
[docs] @abstractmethod def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: """Shuts down the exporter. Called when the SDK is shut down. """
[docs]class ConsoleMetricExporter(MetricExporter): """Implementation of :class:`MetricExporter` that prints metrics to the console. This class can be used for diagnostic purposes. It prints the exported metrics to the console STDOUT. """ def __init__( self, out: IO = stdout, formatter: Callable[ ["opentelemetry.sdk.metrics.export.MetricsData"], str ] = lambda metrics_data: metrics_data.to_json() + linesep, preferred_temporality: Dict[type, AggregationTemporality] = None, preferred_aggregation: Dict[ type, "opentelemetry.sdk.metrics.view.Aggregation" ] = None, ): super().__init__( preferred_temporality=preferred_temporality, preferred_aggregation=preferred_aggregation, ) self.out = out self.formatter = formatter
[docs] def export( self, metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs, ) -> MetricExportResult: self.out.write(self.formatter(metrics_data)) self.out.flush() return MetricExportResult.SUCCESS
[docs] def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass
[docs] def force_flush(self, timeout_millis: float = 10_000) -> bool: return True
[docs]class MetricReader(ABC): # pylint: disable=too-many-branches """ Base class for all metric readers Args: preferred_temporality: A mapping between instrument classes and aggregation temporality. By default uses CUMULATIVE for all instrument classes. This mapping will be used to define the default aggregation temporality of every instrument class. If the user wants to make a change in the default aggregation temporality of an instrument class, it is enough to pass here a dictionary whose keys are the instrument classes and the values are the corresponding desired aggregation temporalities of the classes that the user wants to change, not all of them. The classes not included in the passed dictionary will retain their association to their default aggregation temporalities. preferred_aggregation: A mapping between instrument classes and aggregation instances. By default maps all instrument classes to an instance of `DefaultAggregation`. This mapping will be used to define the default aggregation of every instrument class. If the user wants to make a change in the default aggregation of an instrument class, it is enough to pass here a dictionary whose keys are the instrument classes and the values are the corresponding desired aggregation for the instrument classes that the user wants to change, not necessarily all of them. The classes not included in the passed dictionary will retain their association to their default aggregations. The aggregation defined here will be overridden by an aggregation defined by a view that is not `DefaultAggregation`. .. document protected _receive_metrics which is a intended to be overridden by subclass .. automethod:: _receive_metrics """ def __init__( self, preferred_temporality: Dict[type, AggregationTemporality] = None, preferred_aggregation: Dict[ type, "opentelemetry.sdk.metrics.view.Aggregation" ] = None, ) -> None: self._collect: Callable[ [ "opentelemetry.sdk.metrics.export.MetricReader", AggregationTemporality, ], Iterable["opentelemetry.sdk.metrics.export.Metric"], ] = None self._instrument_class_temporality = { _Counter: AggregationTemporality.CUMULATIVE, _UpDownCounter: AggregationTemporality.CUMULATIVE, _Histogram: AggregationTemporality.CUMULATIVE, _Gauge: AggregationTemporality.CUMULATIVE, _ObservableCounter: AggregationTemporality.CUMULATIVE, _ObservableUpDownCounter: AggregationTemporality.CUMULATIVE, _ObservableGauge: AggregationTemporality.CUMULATIVE, } if preferred_temporality is not None: for temporality in preferred_temporality.values(): if temporality not in ( AggregationTemporality.CUMULATIVE, AggregationTemporality.DELTA, ): raise Exception( f"Invalid temporality value found {temporality}" ) if preferred_temporality is not None: for typ, temporality in preferred_temporality.items(): if typ is Counter: self._instrument_class_temporality[_Counter] = temporality elif typ is UpDownCounter: self._instrument_class_temporality[ _UpDownCounter ] = temporality elif typ is Histogram: self._instrument_class_temporality[ _Histogram ] = temporality elif typ is Gauge: self._instrument_class_temporality[_Gauge] = temporality elif typ is ObservableCounter: self._instrument_class_temporality[ _ObservableCounter ] = temporality elif typ is ObservableUpDownCounter: self._instrument_class_temporality[ _ObservableUpDownCounter ] = temporality elif typ is ObservableGauge: self._instrument_class_temporality[ _ObservableGauge ] = temporality else: raise Exception(f"Invalid instrument class found {typ}") self._preferred_temporality = preferred_temporality self._instrument_class_aggregation = { _Counter: DefaultAggregation(), _UpDownCounter: DefaultAggregation(), _Histogram: DefaultAggregation(), _Gauge: DefaultAggregation(), _ObservableCounter: DefaultAggregation(), _ObservableUpDownCounter: DefaultAggregation(), _ObservableGauge: DefaultAggregation(), } if preferred_aggregation is not None: for typ, aggregation in preferred_aggregation.items(): if typ is Counter: self._instrument_class_aggregation[_Counter] = aggregation elif typ is UpDownCounter: self._instrument_class_aggregation[ _UpDownCounter ] = aggregation elif typ is Histogram: self._instrument_class_aggregation[ _Histogram ] = aggregation elif typ is Gauge: self._instrument_class_aggregation[_Gauge] = aggregation elif typ is ObservableCounter: self._instrument_class_aggregation[ _ObservableCounter ] = aggregation elif typ is ObservableUpDownCounter: self._instrument_class_aggregation[ _ObservableUpDownCounter ] = aggregation elif typ is ObservableGauge: self._instrument_class_aggregation[ _ObservableGauge ] = aggregation else: raise Exception(f"Invalid instrument class found {typ}")
[docs] @final def collect(self, timeout_millis: float = 10_000) -> None: """Collects the metrics from the internal SDK state and invokes the `_receive_metrics` with the collection. Args: timeout_millis: Amount of time in milliseconds before this function raises a timeout error. If any of the underlying ``collect`` methods called by this method fails by any reason (including timeout) an exception will be raised detailing the individual errors that caused this function to fail. """ if self._collect is None: _logger.warning( "Cannot call collect on a MetricReader until it is registered on a MeterProvider" ) return metrics = self._collect(self, timeout_millis=timeout_millis) if metrics is not None: self._receive_metrics( metrics, timeout_millis=timeout_millis, )
@final def _set_collect_callback( self, func: Callable[ [ "opentelemetry.sdk.metrics.export.MetricReader", AggregationTemporality, ], Iterable["opentelemetry.sdk.metrics.export.Metric"], ], ) -> None: """This function is internal to the SDK. It should not be called or overridden by users""" self._collect = func
[docs] @abstractmethod def _receive_metrics( self, metrics_data: "opentelemetry.sdk.metrics.export.MetricsData", timeout_millis: float = 10_000, **kwargs, ) -> None: """Called by `MetricReader.collect` when it receives a batch of metrics"""
[docs] def force_flush(self, timeout_millis: float = 10_000) -> bool: self.collect(timeout_millis=timeout_millis) return True
[docs] @abstractmethod def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: """Shuts down the MetricReader. This method provides a way for the MetricReader to do any cleanup required. A metric reader can only be shutdown once, any subsequent calls are ignored and return failure status. When a `MetricReader` is registered on a :class:`~opentelemetry.sdk.metrics.MeterProvider`, :meth:`~opentelemetry.sdk.metrics.MeterProvider.shutdown` will invoke this automatically. """
[docs]class InMemoryMetricReader(MetricReader): """Implementation of `MetricReader` that returns its metrics from :func:`get_metrics_data`. This is useful for e.g. unit tests. """ def __init__( self, preferred_temporality: Dict[type, AggregationTemporality] = None, preferred_aggregation: Dict[ type, "opentelemetry.sdk.metrics.view.Aggregation" ] = None, ) -> None: super().__init__( preferred_temporality=preferred_temporality, preferred_aggregation=preferred_aggregation, ) self._lock = RLock() self._metrics_data: ( "opentelemetry.sdk.metrics.export.MetricsData" ) = None
[docs] def get_metrics_data( self, ) -> ("opentelemetry.sdk.metrics.export.MetricsData"): """Reads and returns current metrics from the SDK""" with self._lock: self.collect() metrics_data = self._metrics_data self._metrics_data = None return metrics_data
def _receive_metrics( self, metrics_data: "opentelemetry.sdk.metrics.export.MetricsData", timeout_millis: float = 10_000, **kwargs, ) -> None: with self._lock: self._metrics_data = metrics_data
[docs] def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: pass
[docs]class PeriodicExportingMetricReader(MetricReader): """`PeriodicExportingMetricReader` is an implementation of `MetricReader` that collects metrics based on a user-configurable time interval, and passes the metrics to the configured exporter. If the time interval is set to `math.inf`, the reader will not invoke periodic collection. The configured exporter's :py:meth:`~MetricExporter.export` method will not be called concurrently. """ def __init__( self, exporter: MetricExporter, export_interval_millis: Optional[float] = None, export_timeout_millis: Optional[float] = None, ) -> None: # PeriodicExportingMetricReader defers to exporter for configuration super().__init__( preferred_temporality=exporter._preferred_temporality, preferred_aggregation=exporter._preferred_aggregation, ) # This lock is held whenever calling self._exporter.export() to prevent concurrent # execution of MetricExporter.export() # https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/metrics/sdk.md#exportbatch self._export_lock = Lock() self._exporter = exporter if export_interval_millis is None: try: export_interval_millis = float( environ.get(OTEL_METRIC_EXPORT_INTERVAL, 60000) ) except ValueError: _logger.warning( "Found invalid value for export interval, using default" ) export_interval_millis = 60000 if export_timeout_millis is None: try: export_timeout_millis = float( environ.get(OTEL_METRIC_EXPORT_TIMEOUT, 30000) ) except ValueError: _logger.warning( "Found invalid value for export timeout, using default" ) export_timeout_millis = 30000 self._export_interval_millis = export_interval_millis self._export_timeout_millis = export_timeout_millis self._shutdown = False self._shutdown_event = Event() self._shutdown_once = Once() self._daemon_thread = None if ( self._export_interval_millis > 0 and self._export_interval_millis < math.inf ): self._daemon_thread = Thread( name="OtelPeriodicExportingMetricReader", target=self._ticker, daemon=True, ) self._daemon_thread.start() if hasattr(os, "register_at_fork"): os.register_at_fork( after_in_child=self._at_fork_reinit ) # pylint: disable=protected-access elif self._export_interval_millis <= 0: raise ValueError( f"interval value {self._export_interval_millis} is invalid \ and needs to be larger than zero." ) def _at_fork_reinit(self): self._daemon_thread = Thread( name="OtelPeriodicExportingMetricReader", target=self._ticker, daemon=True, ) self._daemon_thread.start() def _ticker(self) -> None: interval_secs = self._export_interval_millis / 1e3 while not self._shutdown_event.wait(interval_secs): try: self.collect(timeout_millis=self._export_timeout_millis) except MetricsTimeoutError: _logger.warning( "Metric collection timed out. Will try again after %s seconds", interval_secs, exc_info=True, ) # one last collection below before shutting down completely self.collect(timeout_millis=self._export_interval_millis) def _receive_metrics( self, metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs, ) -> None: token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) try: with self._export_lock: self._exporter.export( metrics_data, timeout_millis=timeout_millis ) except Exception as e: # pylint: disable=broad-except,invalid-name _logger.exception("Exception while exporting metrics %s", str(e)) detach(token)
[docs] def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: deadline_ns = time_ns() + timeout_millis * 10**6 def _shutdown(): self._shutdown = True did_set = self._shutdown_once.do_once(_shutdown) if not did_set: _logger.warning("Can't shutdown multiple times") return self._shutdown_event.set() if self._daemon_thread: self._daemon_thread.join( timeout=(deadline_ns - time_ns()) / 10**9 ) self._exporter.shutdown(timeout=(deadline_ns - time_ns()) / 10**6)
[docs] def force_flush(self, timeout_millis: float = 10_000) -> bool: super().force_flush(timeout_millis=timeout_millis) self._exporter.force_flush(timeout_millis=timeout_millis) return True