Source code for opentelemetry.metrics._internal.instrument

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# pylint: disable=too-many-ancestors


from abc import ABC, abstractmethod
from dataclasses import dataclass
from logging import getLogger
from re import compile as re_compile
from typing import (
    Callable,
    Dict,
    Generator,
    Generic,
    Iterable,
    Optional,
    Sequence,
    TypeVar,
    Union,
)

# pylint: disable=unused-import; needed for typing and sphinx
from opentelemetry import metrics
from opentelemetry.metrics._internal.observation import Observation
from opentelemetry.util.types import Attributes

_logger = getLogger(__name__)

_name_regex = re_compile(r"[a-zA-Z][-_./a-zA-Z0-9]{0,254}")
_unit_regex = re_compile(r"[\x00-\x7F]{0,63}")


@dataclass(frozen=True)
class CallbackOptions:
    """Options for the callback

    Args:
        timeout_millis: Timeout for the callback's execution. If the callback does asynchronous
            work (e.g. HTTP requests), it should respect this timeout.
    """

    timeout_millis: float = 10_000


InstrumentT = TypeVar("InstrumentT", bound="Instrument")
CallbackT = Union[
    Callable[[CallbackOptions], Iterable[Observation]],
    Generator[Iterable[Observation], CallbackOptions, None],
]


class Instrument(ABC):
    """Abstract class that serves as base for all instruments."""

    @abstractmethod
    def __init__(
        self,
        name: str,
        unit: str = "",
        description: str = "",
    ) -> None:
        pass

    @staticmethod
    def _check_name_unit_description(
        name: str, unit: str, description: str
    ) -> Dict[str, Optional[str]]:
        """
        Checks the following instrument name, unit and description for
        compliance with the spec.

        Returns a dict with keys "name", "unit" and "description", the
        corresponding values will be the checked strings or `None` if the value
        is invalid. If valid, the checked strings should be used instead of the
        original values.
        """

        result: Dict[str, Optional[str]] = {}

        if _name_regex.fullmatch(name) is not None:
            result["name"] = name
        else:
            result["name"] = None

        if unit is None:
            unit = ""
        if _unit_regex.fullmatch(unit) is not None:
            result["unit"] = unit
        else:
            result["unit"] = None

        if description is None:
            result["description"] = ""
        else:
            result["description"] = description

        return result


class _ProxyInstrument(ABC, Generic[InstrumentT]):
    def __init__(
        self,
        name: str,
        unit: str = "",
        description: str = "",
    ) -> None:
        self._name = name
        self._unit = unit
        self._description = description
        self._real_instrument: Optional[InstrumentT] = None

    def on_meter_set(self, meter: "metrics.Meter") -> None:
        """Called when a real meter is set on the creating _ProxyMeter"""

        # We don't need any locking on proxy instruments because it's OK if some
        # measurements get dropped while a real backing instrument is being
        # created.
        self._real_instrument = self._create_real_instrument(meter)

    @abstractmethod
    def _create_real_instrument(self, meter: "metrics.Meter") -> InstrumentT:
        """Create an instance of the real instrument. Implement this."""


class _ProxyAsynchronousInstrument(_ProxyInstrument[InstrumentT]):
    def __init__(
        self,
        name: str,
        callbacks: Optional[Sequence[CallbackT]] = None,
        unit: str = "",
        description: str = "",
    ) -> None:
        super().__init__(name, unit, description)
        self._callbacks = callbacks


class Synchronous(Instrument):
    """Base class for all synchronous instruments"""


class Asynchronous(Instrument):
    """Base class for all asynchronous instruments"""

    @abstractmethod
    def __init__(
        self,
        name: str,
        callbacks: Optional[Sequence[CallbackT]] = None,
        unit: str = "",
        description: str = "",
    ) -> None:
        super().__init__(name, unit=unit, description=description)


class Counter(Synchronous):
    """A Counter is a synchronous `Instrument` which supports non-negative increments."""

[docs] @abstractmethod def add( self, amount: Union[int, float], attributes: Optional[Attributes] = None, ) -> None: pass
class NoOpCounter(Counter): """No-op implementation of `Counter`.""" def __init__( self, name: str, unit: str = "", description: str = "", ) -> None: super().__init__(name, unit=unit, description=description)
[docs] def add( self, amount: Union[int, float], attributes: Optional[Attributes] = None, ) -> None: return super().add(amount, attributes=attributes)
class _ProxyCounter(_ProxyInstrument[Counter], Counter): def add( self, amount: Union[int, float], attributes: Optional[Attributes] = None, ) -> None: if self._real_instrument: self._real_instrument.add(amount, attributes) def _create_real_instrument(self, meter: "metrics.Meter") -> Counter: return meter.create_counter(self._name, self._unit, self._description) class UpDownCounter(Synchronous): """An UpDownCounter is a synchronous `Instrument` which supports increments and decrements."""
[docs] @abstractmethod def add( self, amount: Union[int, float], attributes: Optional[Attributes] = None, ) -> None: pass
class NoOpUpDownCounter(UpDownCounter): """No-op implementation of `UpDownCounter`.""" def __init__( self, name: str, unit: str = "", description: str = "", ) -> None: super().__init__(name, unit=unit, description=description)
[docs] def add( self, amount: Union[int, float], attributes: Optional[Attributes] = None, ) -> None: return super().add(amount, attributes=attributes)
class _ProxyUpDownCounter(_ProxyInstrument[UpDownCounter], UpDownCounter): def add( self, amount: Union[int, float], attributes: Optional[Attributes] = None, ) -> None: if self._real_instrument: self._real_instrument.add(amount, attributes) def _create_real_instrument(self, meter: "metrics.Meter") -> UpDownCounter: return meter.create_up_down_counter( self._name, self._unit, self._description ) class ObservableCounter(Asynchronous): """An ObservableCounter is an asynchronous `Instrument` which reports monotonically increasing value(s) when the instrument is being observed. """ class NoOpObservableCounter(ObservableCounter): """No-op implementation of `ObservableCounter`.""" def __init__( self, name: str, callbacks: Optional[Sequence[CallbackT]] = None, unit: str = "", description: str = "", ) -> None: super().__init__(name, callbacks, unit=unit, description=description) class _ProxyObservableCounter( _ProxyAsynchronousInstrument[ObservableCounter], ObservableCounter ): def _create_real_instrument( self, meter: "metrics.Meter" ) -> ObservableCounter: return meter.create_observable_counter( self._name, self._callbacks, self._unit, self._description ) class ObservableUpDownCounter(Asynchronous): """An ObservableUpDownCounter is an asynchronous `Instrument` which reports additive value(s) (e.g. the process heap size - it makes sense to report the heap size from multiple processes and sum them up, so we get the total heap usage) when the instrument is being observed. """ class NoOpObservableUpDownCounter(ObservableUpDownCounter): """No-op implementation of `ObservableUpDownCounter`.""" def __init__( self, name: str, callbacks: Optional[Sequence[CallbackT]] = None, unit: str = "", description: str = "", ) -> None: super().__init__(name, callbacks, unit=unit, description=description) class _ProxyObservableUpDownCounter( _ProxyAsynchronousInstrument[ObservableUpDownCounter], ObservableUpDownCounter, ): def _create_real_instrument( self, meter: "metrics.Meter" ) -> ObservableUpDownCounter: return meter.create_observable_up_down_counter( self._name, self._callbacks, self._unit, self._description ) class Histogram(Synchronous): """Histogram is a synchronous `Instrument` which can be used to report arbitrary values that are likely to be statistically meaningful. It is intended for statistics such as histograms, summaries, and percentile. """
[docs] @abstractmethod def record( self, amount: Union[int, float], attributes: Optional[Attributes] = None, ) -> None: pass
class NoOpHistogram(Histogram): """No-op implementation of `Histogram`.""" def __init__( self, name: str, unit: str = "", description: str = "", ) -> None: super().__init__(name, unit=unit, description=description)
[docs] def record( self, amount: Union[int, float], attributes: Optional[Attributes] = None, ) -> None: return super().record(amount, attributes=attributes)
class _ProxyHistogram(_ProxyInstrument[Histogram], Histogram): def record( self, amount: Union[int, float], attributes: Optional[Attributes] = None, ) -> None: if self._real_instrument: self._real_instrument.record(amount, attributes) def _create_real_instrument(self, meter: "metrics.Meter") -> Histogram: return meter.create_histogram( self._name, self._unit, self._description ) class ObservableGauge(Asynchronous): """Asynchronous Gauge is an asynchronous `Instrument` which reports non-additive value(s) (e.g. the room temperature - it makes no sense to report the temperature value from multiple rooms and sum them up) when the instrument is being observed. """ class NoOpObservableGauge(ObservableGauge): """No-op implementation of `ObservableGauge`.""" def __init__( self, name: str, callbacks: Optional[Sequence[CallbackT]] = None, unit: str = "", description: str = "", ) -> None: super().__init__(name, callbacks, unit=unit, description=description) class _ProxyObservableGauge( _ProxyAsynchronousInstrument[ObservableGauge], ObservableGauge, ): def _create_real_instrument( self, meter: "metrics.Meter" ) -> ObservableGauge: return meter.create_observable_gauge( self._name, self._callbacks, self._unit, self._description )