# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-ancestors
"""
The OpenTelemetry metrics API describes the classes used to generate
metrics.
The :class:`.MeterProvider` provides users access to the :class:`.Meter` which in
turn is used to create :class:`.Instrument` objects. The :class:`.Instrument` objects are
used to record measurements.
This module provides abstract (i.e. unimplemented) classes required for
metrics, and a concrete no-op implementation :class:`.NoOpMeter` that allows applications
to use the API package alone without a supporting implementation.
To get a meter, you need to provide the package name from which you are
calling the meter APIs to OpenTelemetry by calling `MeterProvider.get_meter`
with the calling instrumentation name and the version of your package.
The following code shows how to obtain a meter using the global :class:`.MeterProvider`::
from opentelemetry.metrics import get_meter
meter = get_meter("example-meter")
counter = meter.create_counter("example-counter")
.. versionadded:: 1.10.0
"""
from abc import ABC, abstractmethod
from logging import getLogger
from os import environ
from threading import Lock
from typing import List, Optional, Sequence, Set, Tuple, Union, cast
from opentelemetry.environment_variables import OTEL_PYTHON_METER_PROVIDER
from opentelemetry.metrics._internal.instrument import (
CallbackT,
Counter,
Histogram,
NoOpCounter,
NoOpHistogram,
NoOpObservableCounter,
NoOpObservableGauge,
NoOpObservableUpDownCounter,
NoOpUpDownCounter,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
_ProxyCounter,
_ProxyHistogram,
_ProxyObservableCounter,
_ProxyObservableGauge,
_ProxyObservableUpDownCounter,
_ProxyUpDownCounter,
)
from opentelemetry.util._once import Once
from opentelemetry.util._providers import _load_provider
_logger = getLogger(__name__)
_ProxyInstrumentT = Union[
_ProxyCounter,
_ProxyHistogram,
_ProxyObservableCounter,
_ProxyObservableGauge,
_ProxyObservableUpDownCounter,
_ProxyUpDownCounter,
]
class MeterProvider(ABC):
"""
MeterProvider is the entry point of the API. It provides access to `Meter` instances.
"""
[docs] @abstractmethod
def get_meter(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> "Meter":
"""Returns a `Meter` for use by the given instrumentation library.
For any two calls it is undefined whether the same or different
`Meter` instances are returned, even for different library names.
This function may return different `Meter` types (e.g. a no-op meter
vs. a functional meter).
Args:
name: The name of the instrumenting module.
``__name__`` may not be used as this can result in
different meter names if the meters are in different files.
It is better to use a fixed string that can be imported where
needed and used consistently as the name of the meter.
This should *not* be the name of the module that is
instrumented but the name of the module doing the instrumentation.
E.g., instead of ``"requests"``, use
``"opentelemetry.instrumentation.requests"``.
version: Optional. The version string of the
instrumenting library. Usually this should be the same as
``importlib.metadata.version(instrumenting_library_name)``.
schema_url: Optional. Specifies the Schema URL of the emitted telemetry.
"""
class NoOpMeterProvider(MeterProvider):
"""The default MeterProvider used when no MeterProvider implementation is available."""
[docs] def get_meter(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> "Meter":
"""Returns a NoOpMeter."""
super().get_meter(name, version=version, schema_url=schema_url)
return NoOpMeter(name, version=version, schema_url=schema_url)
class _ProxyMeterProvider(MeterProvider):
def __init__(self) -> None:
self._lock = Lock()
self._meters: List[_ProxyMeter] = []
self._real_meter_provider: Optional[MeterProvider] = None
def get_meter(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> "Meter":
with self._lock:
if self._real_meter_provider is not None:
return self._real_meter_provider.get_meter(
name, version, schema_url
)
meter = _ProxyMeter(name, version=version, schema_url=schema_url)
self._meters.append(meter)
return meter
def on_set_meter_provider(self, meter_provider: MeterProvider) -> None:
with self._lock:
self._real_meter_provider = meter_provider
for meter in self._meters:
meter.on_set_meter_provider(meter_provider)
class Meter(ABC):
"""Handles instrument creation.
This class provides methods for creating instruments which are then
used to produce measurements.
"""
def __init__(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> None:
super().__init__()
self._name = name
self._version = version
self._schema_url = schema_url
self._instrument_ids: Set[str] = set()
self._instrument_ids_lock = Lock()
@property
def name(self) -> str:
"""
The name of the instrumenting module.
"""
return self._name
@property
def version(self) -> Optional[str]:
"""
The version string of the instrumenting library.
"""
return self._version
@property
def schema_url(self) -> Optional[str]:
"""
Specifies the Schema URL of the emitted telemetry
"""
return self._schema_url
def _is_instrument_registered(
self, name: str, type_: type, unit: str, description: str
) -> Tuple[bool, str]:
"""
Check if an instrument with the same name, type, unit and description
has been registered already.
Returns a tuple. The first value is `True` if the instrument has been
registered already, `False` otherwise. The second value is the
instrument id.
"""
instrument_id = ",".join(
[name.strip().lower(), type_.__name__, unit, description]
)
result = False
with self._instrument_ids_lock:
if instrument_id in self._instrument_ids:
result = True
else:
self._instrument_ids.add(instrument_id)
return (result, instrument_id)
[docs] @abstractmethod
def create_counter(
self,
name: str,
unit: str = "",
description: str = "",
) -> Counter:
"""Creates a `Counter` instrument
Args:
name: The name of the instrument to be created
unit: The unit for observations this instrument reports. For
example, ``By`` for bytes. UCUM units are recommended.
description: A description for this instrument and what it measures.
"""
[docs] @abstractmethod
def create_up_down_counter(
self,
name: str,
unit: str = "",
description: str = "",
) -> UpDownCounter:
"""Creates an `UpDownCounter` instrument
Args:
name: The name of the instrument to be created
unit: The unit for observations this instrument reports. For
example, ``By`` for bytes. UCUM units are recommended.
description: A description for this instrument and what it measures.
"""
[docs] @abstractmethod
def create_observable_counter(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableCounter:
"""Creates an `ObservableCounter` instrument
An observable counter observes a monotonically increasing count by calling provided
callbacks which accept a :class:`~opentelemetry.metrics.CallbackOptions` and return
multiple :class:`~opentelemetry.metrics.Observation`.
For example, an observable counter could be used to report system CPU
time periodically. Here is a basic implementation::
def cpu_time_callback(options: CallbackOptions) -> Iterable[Observation]:
observations = []
with open("/proc/stat") as procstat:
procstat.readline() # skip the first line
for line in procstat:
if not line.startswith("cpu"): break
cpu, *states = line.split()
observations.append(Observation(int(states[0]) // 100, {"cpu": cpu, "state": "user"}))
observations.append(Observation(int(states[1]) // 100, {"cpu": cpu, "state": "nice"}))
observations.append(Observation(int(states[2]) // 100, {"cpu": cpu, "state": "system"}))
# ... other states
return observations
meter.create_observable_counter(
"system.cpu.time",
callbacks=[cpu_time_callback],
unit="s",
description="CPU time"
)
To reduce memory usage, you can use generator callbacks instead of
building the full list::
def cpu_time_callback(options: CallbackOptions) -> Iterable[Observation]:
with open("/proc/stat") as procstat:
procstat.readline() # skip the first line
for line in procstat:
if not line.startswith("cpu"): break
cpu, *states = line.split()
yield Observation(int(states[0]) // 100, {"cpu": cpu, "state": "user"})
yield Observation(int(states[1]) // 100, {"cpu": cpu, "state": "nice"})
# ... other states
Alternatively, you can pass a sequence of generators directly instead of a sequence of
callbacks, which each should return iterables of :class:`~opentelemetry.metrics.Observation`::
def cpu_time_callback(states_to_include: set[str]) -> Iterable[Iterable[Observation]]:
# accept options sent in from OpenTelemetry
options = yield
while True:
observations = []
with open("/proc/stat") as procstat:
procstat.readline() # skip the first line
for line in procstat:
if not line.startswith("cpu"): break
cpu, *states = line.split()
if "user" in states_to_include:
observations.append(Observation(int(states[0]) // 100, {"cpu": cpu, "state": "user"}))
if "nice" in states_to_include:
observations.append(Observation(int(states[1]) // 100, {"cpu": cpu, "state": "nice"}))
# ... other states
# yield the observations and receive the options for next iteration
options = yield observations
meter.create_observable_counter(
"system.cpu.time",
callbacks=[cpu_time_callback({"user", "system"})],
unit="s",
description="CPU time"
)
The :class:`~opentelemetry.metrics.CallbackOptions` contain a timeout which the
callback should respect. For example if the callback does asynchronous work, like
making HTTP requests, it should respect the timeout::
def scrape_http_callback(options: CallbackOptions) -> Iterable[Observation]:
r = requests.get('http://scrapethis.com', timeout=options.timeout_millis / 10**3)
for value in r.json():
yield Observation(value)
Args:
name: The name of the instrument to be created
callbacks: A sequence of callbacks that return an iterable of
:class:`~opentelemetry.metrics.Observation`. Alternatively, can be a sequence of generators that each
yields iterables of :class:`~opentelemetry.metrics.Observation`.
unit: The unit for observations this instrument reports. For
example, ``By`` for bytes. UCUM units are recommended.
description: A description for this instrument and what it measures.
"""
[docs] @abstractmethod
def create_histogram(
self,
name: str,
unit: str = "",
description: str = "",
) -> Histogram:
"""Creates a :class:`~opentelemetry.metrics.Histogram` instrument
Args:
name: The name of the instrument to be created
unit: The unit for observations this instrument reports. For
example, ``By`` for bytes. UCUM units are recommended.
description: A description for this instrument and what it measures.
"""
[docs] @abstractmethod
def create_observable_gauge(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableGauge:
"""Creates an `ObservableGauge` instrument
Args:
name: The name of the instrument to be created
callbacks: A sequence of callbacks that return an iterable of
:class:`~opentelemetry.metrics.Observation`. Alternatively, can be a generator that yields iterables
of :class:`~opentelemetry.metrics.Observation`.
unit: The unit for observations this instrument reports. For
example, ``By`` for bytes. UCUM units are recommended.
description: A description for this instrument and what it measures.
"""
[docs] @abstractmethod
def create_observable_up_down_counter(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableUpDownCounter:
"""Creates an `ObservableUpDownCounter` instrument
Args:
name: The name of the instrument to be created
callbacks: A sequence of callbacks that return an iterable of
:class:`~opentelemetry.metrics.Observation`. Alternatively, can be a generator that yields iterables
of :class:`~opentelemetry.metrics.Observation`.
unit: The unit for observations this instrument reports. For
example, ``By`` for bytes. UCUM units are recommended.
description: A description for this instrument and what it measures.
"""
class _ProxyMeter(Meter):
def __init__(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> None:
super().__init__(name, version=version, schema_url=schema_url)
self._lock = Lock()
self._instruments: List[_ProxyInstrumentT] = []
self._real_meter: Optional[Meter] = None
def on_set_meter_provider(self, meter_provider: MeterProvider) -> None:
"""Called when a real meter provider is set on the creating _ProxyMeterProvider
Creates a real backing meter for this instance and notifies all created
instruments so they can create real backing instruments.
"""
real_meter = meter_provider.get_meter(
self._name, self._version, self._schema_url
)
with self._lock:
self._real_meter = real_meter
# notify all proxy instruments of the new meter so they can create
# real instruments to back themselves
for instrument in self._instruments:
instrument.on_meter_set(real_meter)
def create_counter(
self,
name: str,
unit: str = "",
description: str = "",
) -> Counter:
with self._lock:
if self._real_meter:
return self._real_meter.create_counter(name, unit, description)
proxy = _ProxyCounter(name, unit, description)
self._instruments.append(proxy)
return proxy
def create_up_down_counter(
self,
name: str,
unit: str = "",
description: str = "",
) -> UpDownCounter:
with self._lock:
if self._real_meter:
return self._real_meter.create_up_down_counter(
name, unit, description
)
proxy = _ProxyUpDownCounter(name, unit, description)
self._instruments.append(proxy)
return proxy
def create_observable_counter(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableCounter:
with self._lock:
if self._real_meter:
return self._real_meter.create_observable_counter(
name, callbacks, unit, description
)
proxy = _ProxyObservableCounter(
name, callbacks, unit=unit, description=description
)
self._instruments.append(proxy)
return proxy
def create_histogram(
self,
name: str,
unit: str = "",
description: str = "",
) -> Histogram:
with self._lock:
if self._real_meter:
return self._real_meter.create_histogram(
name, unit, description
)
proxy = _ProxyHistogram(name, unit, description)
self._instruments.append(proxy)
return proxy
def create_observable_gauge(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableGauge:
with self._lock:
if self._real_meter:
return self._real_meter.create_observable_gauge(
name, callbacks, unit, description
)
proxy = _ProxyObservableGauge(
name, callbacks, unit=unit, description=description
)
self._instruments.append(proxy)
return proxy
def create_observable_up_down_counter(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableUpDownCounter:
with self._lock:
if self._real_meter:
return self._real_meter.create_observable_up_down_counter(
name,
callbacks,
unit,
description,
)
proxy = _ProxyObservableUpDownCounter(
name, callbacks, unit=unit, description=description
)
self._instruments.append(proxy)
return proxy
class NoOpMeter(Meter):
"""The default Meter used when no Meter implementation is available.
All operations are no-op.
"""
[docs] def create_counter(
self,
name: str,
unit: str = "",
description: str = "",
) -> Counter:
"""Returns a no-op Counter."""
super().create_counter(name, unit=unit, description=description)
if self._is_instrument_registered(
name, NoOpCounter, unit, description
)[0]:
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
Counter.__name__,
unit,
description,
)
return NoOpCounter(name, unit=unit, description=description)
[docs] def create_up_down_counter(
self,
name: str,
unit: str = "",
description: str = "",
) -> UpDownCounter:
"""Returns a no-op UpDownCounter."""
super().create_up_down_counter(
name, unit=unit, description=description
)
if self._is_instrument_registered(
name, NoOpUpDownCounter, unit, description
)[0]:
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
UpDownCounter.__name__,
unit,
description,
)
return NoOpUpDownCounter(name, unit=unit, description=description)
[docs] def create_observable_counter(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableCounter:
"""Returns a no-op ObservableCounter."""
super().create_observable_counter(
name, callbacks, unit=unit, description=description
)
if self._is_instrument_registered(
name, NoOpObservableCounter, unit, description
)[0]:
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
ObservableCounter.__name__,
unit,
description,
)
return NoOpObservableCounter(
name,
callbacks,
unit=unit,
description=description,
)
[docs] def create_histogram(
self,
name: str,
unit: str = "",
description: str = "",
) -> Histogram:
"""Returns a no-op Histogram."""
super().create_histogram(name, unit=unit, description=description)
if self._is_instrument_registered(
name, NoOpHistogram, unit, description
)[0]:
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
Histogram.__name__,
unit,
description,
)
return NoOpHistogram(name, unit=unit, description=description)
[docs] def create_observable_gauge(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableGauge:
"""Returns a no-op ObservableGauge."""
super().create_observable_gauge(
name, callbacks, unit=unit, description=description
)
if self._is_instrument_registered(
name, NoOpObservableGauge, unit, description
)[0]:
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
ObservableGauge.__name__,
unit,
description,
)
return NoOpObservableGauge(
name,
callbacks,
unit=unit,
description=description,
)
[docs] def create_observable_up_down_counter(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableUpDownCounter:
"""Returns a no-op ObservableUpDownCounter."""
super().create_observable_up_down_counter(
name, callbacks, unit=unit, description=description
)
if self._is_instrument_registered(
name, NoOpObservableUpDownCounter, unit, description
)[0]:
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
ObservableUpDownCounter.__name__,
unit,
description,
)
return NoOpObservableUpDownCounter(
name,
callbacks,
unit=unit,
description=description,
)
_METER_PROVIDER_SET_ONCE = Once()
_METER_PROVIDER: Optional[MeterProvider] = None
_PROXY_METER_PROVIDER = _ProxyMeterProvider()
def get_meter(
name: str,
version: str = "",
meter_provider: Optional[MeterProvider] = None,
) -> "Meter":
"""Returns a `Meter` for use by the given instrumentation library.
This function is a convenience wrapper for
`opentelemetry.metrics.MeterProvider.get_meter`.
If meter_provider is omitted the current configured one is used.
"""
if meter_provider is None:
meter_provider = get_meter_provider()
return meter_provider.get_meter(name, version)
def _set_meter_provider(meter_provider: MeterProvider, log: bool) -> None:
def set_mp() -> None:
global _METER_PROVIDER # pylint: disable=global-statement
_METER_PROVIDER = meter_provider
# gives all proxies real instruments off the newly set meter provider
_PROXY_METER_PROVIDER.on_set_meter_provider(meter_provider)
did_set = _METER_PROVIDER_SET_ONCE.do_once(set_mp)
if log and not did_set:
_logger.warning("Overriding of current MeterProvider is not allowed")
def set_meter_provider(meter_provider: MeterProvider) -> None:
"""Sets the current global :class:`~.MeterProvider` object.
This can only be done once, a warning will be logged if any further attempt
is made.
"""
_set_meter_provider(meter_provider, log=True)
def get_meter_provider() -> MeterProvider:
"""Gets the current global :class:`~.MeterProvider` object."""
if _METER_PROVIDER is None:
if OTEL_PYTHON_METER_PROVIDER not in environ.keys():
return _PROXY_METER_PROVIDER
meter_provider: MeterProvider = _load_provider( # type: ignore
OTEL_PYTHON_METER_PROVIDER, "meter_provider"
)
_set_meter_provider(meter_provider, log=False)
# _METER_PROVIDER will have been set by one thread
return cast("MeterProvider", _METER_PROVIDER)