# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-ancestors
"""
The OpenTelemetry metrics API describes the classes used to generate
metrics.
The :class:`.MeterProvider` provides users access to the :class:`.Meter` which in
turn is used to create :class:`.Instrument` objects. The :class:`.Instrument` objects are
used to record measurements.
This module provides abstract (i.e. unimplemented) classes required for
metrics, and a concrete no-op implementation :class:`.NoOpMeter` that allows applications
to use the API package alone without a supporting implementation.
To get a meter, you need to provide the package name from which you are
calling the meter APIs to OpenTelemetry by calling `MeterProvider.get_meter`
with the calling instrumentation name and the version of your package.
The following code shows how to obtain a meter using the global :class:`.MeterProvider`::
from opentelemetry.metrics import get_meter
meter = get_meter("example-meter")
counter = meter.create_counter("example-counter")
.. versionadded:: 1.10.0
"""
import warnings
from abc import ABC, abstractmethod
from logging import getLogger
from os import environ
from threading import Lock
from typing import List, Optional, Sequence, Set, Tuple, Union, cast
from opentelemetry.environment_variables import OTEL_PYTHON_METER_PROVIDER
from opentelemetry.metrics._internal.instrument import (
CallbackT,
Counter,
Gauge,
Histogram,
NoOpCounter,
NoOpGauge,
NoOpHistogram,
NoOpObservableCounter,
NoOpObservableGauge,
NoOpObservableUpDownCounter,
NoOpUpDownCounter,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
_ProxyCounter,
_ProxyGauge,
_ProxyHistogram,
_ProxyObservableCounter,
_ProxyObservableGauge,
_ProxyObservableUpDownCounter,
_ProxyUpDownCounter,
)
from opentelemetry.util._once import Once
from opentelemetry.util._providers import _load_provider
_logger = getLogger(__name__)
# pylint: disable=invalid-name
_ProxyInstrumentT = Union[
_ProxyCounter,
_ProxyHistogram,
_ProxyGauge,
_ProxyObservableCounter,
_ProxyObservableGauge,
_ProxyObservableUpDownCounter,
_ProxyUpDownCounter,
]
class MeterProvider(ABC):
"""
MeterProvider is the entry point of the API. It provides access to `Meter` instances.
"""
[docs] @abstractmethod
def get_meter(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> "Meter":
"""Returns a `Meter` for use by the given instrumentation library.
For any two calls it is undefined whether the same or different
`Meter` instances are returned, even for different library names.
This function may return different `Meter` types (e.g. a no-op meter
vs. a functional meter).
Args:
name: The name of the instrumenting module.
``__name__`` may not be used as this can result in
different meter names if the meters are in different files.
It is better to use a fixed string that can be imported where
needed and used consistently as the name of the meter.
This should *not* be the name of the module that is
instrumented but the name of the module doing the instrumentation.
E.g., instead of ``"requests"``, use
``"opentelemetry.instrumentation.requests"``.
version: Optional. The version string of the
instrumenting library. Usually this should be the same as
``importlib.metadata.version(instrumenting_library_name)``.
schema_url: Optional. Specifies the Schema URL of the emitted telemetry.
"""
class NoOpMeterProvider(MeterProvider):
"""The default MeterProvider used when no MeterProvider implementation is available."""
[docs] def get_meter(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> "Meter":
"""Returns a NoOpMeter."""
return NoOpMeter(name, version=version, schema_url=schema_url)
class _ProxyMeterProvider(MeterProvider):
def __init__(self) -> None:
self._lock = Lock()
self._meters: List[_ProxyMeter] = []
self._real_meter_provider: Optional[MeterProvider] = None
def get_meter(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> "Meter":
with self._lock:
if self._real_meter_provider is not None:
return self._real_meter_provider.get_meter(
name, version, schema_url
)
meter = _ProxyMeter(name, version=version, schema_url=schema_url)
self._meters.append(meter)
return meter
def on_set_meter_provider(self, meter_provider: MeterProvider) -> None:
with self._lock:
self._real_meter_provider = meter_provider
for meter in self._meters:
meter.on_set_meter_provider(meter_provider)
class Meter(ABC):
"""Handles instrument creation.
This class provides methods for creating instruments which are then
used to produce measurements.
"""
def __init__(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> None:
super().__init__()
self._name = name
self._version = version
self._schema_url = schema_url
self._instrument_ids: Set[str] = set()
self._instrument_ids_lock = Lock()
@property
def name(self) -> str:
"""
The name of the instrumenting module.
"""
return self._name
@property
def version(self) -> Optional[str]:
"""
The version string of the instrumenting library.
"""
return self._version
@property
def schema_url(self) -> Optional[str]:
"""
Specifies the Schema URL of the emitted telemetry
"""
return self._schema_url
def _is_instrument_registered(
self, name: str, type_: type, unit: str, description: str
) -> Tuple[bool, str]:
"""
Check if an instrument with the same name, type, unit and description
has been registered already.
Returns a tuple. The first value is `True` if the instrument has been
registered already, `False` otherwise. The second value is the
instrument id.
"""
instrument_id = ",".join(
[name.strip().lower(), type_.__name__, unit, description]
)
result = False
with self._instrument_ids_lock:
if instrument_id in self._instrument_ids:
result = True
else:
self._instrument_ids.add(instrument_id)
return (result, instrument_id)
[docs] @abstractmethod
def create_counter(
self,
name: str,
unit: str = "",
description: str = "",
) -> Counter:
"""Creates a `Counter` instrument
Args:
name: The name of the instrument to be created
unit: The unit for observations this instrument reports. For
example, ``By`` for bytes. UCUM units are recommended.
description: A description for this instrument and what it measures.
"""
[docs] @abstractmethod
def create_up_down_counter(
self,
name: str,
unit: str = "",
description: str = "",
) -> UpDownCounter:
"""Creates an `UpDownCounter` instrument
Args:
name: The name of the instrument to be created
unit: The unit for observations this instrument reports. For
example, ``By`` for bytes. UCUM units are recommended.
description: A description for this instrument and what it measures.
"""
[docs] @abstractmethod
def create_observable_counter(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableCounter:
"""Creates an `ObservableCounter` instrument
An observable counter observes a monotonically increasing count by calling provided
callbacks which accept a :class:`~opentelemetry.metrics.CallbackOptions` and return
multiple :class:`~opentelemetry.metrics.Observation`.
For example, an observable counter could be used to report system CPU
time periodically. Here is a basic implementation::
def cpu_time_callback(options: CallbackOptions) -> Iterable[Observation]:
observations = []
with open("/proc/stat") as procstat:
procstat.readline() # skip the first line
for line in procstat:
if not line.startswith("cpu"): break
cpu, *states = line.split()
observations.append(Observation(int(states[0]) // 100, {"cpu": cpu, "state": "user"}))
observations.append(Observation(int(states[1]) // 100, {"cpu": cpu, "state": "nice"}))
observations.append(Observation(int(states[2]) // 100, {"cpu": cpu, "state": "system"}))
# ... other states
return observations
meter.create_observable_counter(
"system.cpu.time",
callbacks=[cpu_time_callback],
unit="s",
description="CPU time"
)
To reduce memory usage, you can use generator callbacks instead of
building the full list::
def cpu_time_callback(options: CallbackOptions) -> Iterable[Observation]:
with open("/proc/stat") as procstat:
procstat.readline() # skip the first line
for line in procstat:
if not line.startswith("cpu"): break
cpu, *states = line.split()
yield Observation(int(states[0]) // 100, {"cpu": cpu, "state": "user"})
yield Observation(int(states[1]) // 100, {"cpu": cpu, "state": "nice"})
# ... other states
Alternatively, you can pass a sequence of generators directly instead of a sequence of
callbacks, which each should return iterables of :class:`~opentelemetry.metrics.Observation`::
def cpu_time_callback(states_to_include: set[str]) -> Iterable[Iterable[Observation]]:
# accept options sent in from OpenTelemetry
options = yield
while True:
observations = []
with open("/proc/stat") as procstat:
procstat.readline() # skip the first line
for line in procstat:
if not line.startswith("cpu"): break
cpu, *states = line.split()
if "user" in states_to_include:
observations.append(Observation(int(states[0]) // 100, {"cpu": cpu, "state": "user"}))
if "nice" in states_to_include:
observations.append(Observation(int(states[1]) // 100, {"cpu": cpu, "state": "nice"}))
# ... other states
# yield the observations and receive the options for next iteration
options = yield observations
meter.create_observable_counter(
"system.cpu.time",
callbacks=[cpu_time_callback({"user", "system"})],
unit="s",
description="CPU time"
)
The :class:`~opentelemetry.metrics.CallbackOptions` contain a timeout which the
callback should respect. For example if the callback does asynchronous work, like
making HTTP requests, it should respect the timeout::
def scrape_http_callback(options: CallbackOptions) -> Iterable[Observation]:
r = requests.get('http://scrapethis.com', timeout=options.timeout_millis / 10**3)
for value in r.json():
yield Observation(value)
Args:
name: The name of the instrument to be created
callbacks: A sequence of callbacks that return an iterable of
:class:`~opentelemetry.metrics.Observation`. Alternatively, can be a sequence of generators that each
yields iterables of :class:`~opentelemetry.metrics.Observation`.
unit: The unit for observations this instrument reports. For
example, ``By`` for bytes. UCUM units are recommended.
description: A description for this instrument and what it measures.
"""
[docs] @abstractmethod
def create_histogram(
self,
name: str,
unit: str = "",
description: str = "",
) -> Histogram:
"""Creates a :class:`~opentelemetry.metrics.Histogram` instrument
Args:
name: The name of the instrument to be created
unit: The unit for observations this instrument reports. For
example, ``By`` for bytes. UCUM units are recommended.
description: A description for this instrument and what it measures.
"""
[docs] def create_gauge( # type: ignore # pylint: disable=no-self-use
self,
name: str,
unit: str = "",
description: str = "",
) -> Gauge:
"""Creates a ``Gauge`` instrument
Args:
name: The name of the instrument to be created
unit: The unit for observations this instrument reports. For
example, ``By`` for bytes. UCUM units are recommended.
description: A description for this instrument and what it measures.
"""
warnings.warn("create_gauge() is not implemented and will be a no-op")
[docs] @abstractmethod
def create_observable_gauge(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableGauge:
"""Creates an `ObservableGauge` instrument
Args:
name: The name of the instrument to be created
callbacks: A sequence of callbacks that return an iterable of
:class:`~opentelemetry.metrics.Observation`. Alternatively, can be a generator that yields iterables
of :class:`~opentelemetry.metrics.Observation`.
unit: The unit for observations this instrument reports. For
example, ``By`` for bytes. UCUM units are recommended.
description: A description for this instrument and what it measures.
"""
[docs] @abstractmethod
def create_observable_up_down_counter(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableUpDownCounter:
"""Creates an `ObservableUpDownCounter` instrument
Args:
name: The name of the instrument to be created
callbacks: A sequence of callbacks that return an iterable of
:class:`~opentelemetry.metrics.Observation`. Alternatively, can be a generator that yields iterables
of :class:`~opentelemetry.metrics.Observation`.
unit: The unit for observations this instrument reports. For
example, ``By`` for bytes. UCUM units are recommended.
description: A description for this instrument and what it measures.
"""
class _ProxyMeter(Meter):
def __init__(
self,
name: str,
version: Optional[str] = None,
schema_url: Optional[str] = None,
) -> None:
super().__init__(name, version=version, schema_url=schema_url)
self._lock = Lock()
self._instruments: List[_ProxyInstrumentT] = []
self._real_meter: Optional[Meter] = None
def on_set_meter_provider(self, meter_provider: MeterProvider) -> None:
"""Called when a real meter provider is set on the creating _ProxyMeterProvider
Creates a real backing meter for this instance and notifies all created
instruments so they can create real backing instruments.
"""
real_meter = meter_provider.get_meter(
self._name, self._version, self._schema_url
)
with self._lock:
self._real_meter = real_meter
# notify all proxy instruments of the new meter so they can create
# real instruments to back themselves
for instrument in self._instruments:
instrument.on_meter_set(real_meter)
def create_counter(
self,
name: str,
unit: str = "",
description: str = "",
) -> Counter:
with self._lock:
if self._real_meter:
return self._real_meter.create_counter(name, unit, description)
proxy = _ProxyCounter(name, unit, description)
self._instruments.append(proxy)
return proxy
def create_up_down_counter(
self,
name: str,
unit: str = "",
description: str = "",
) -> UpDownCounter:
with self._lock:
if self._real_meter:
return self._real_meter.create_up_down_counter(
name, unit, description
)
proxy = _ProxyUpDownCounter(name, unit, description)
self._instruments.append(proxy)
return proxy
def create_observable_counter(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableCounter:
with self._lock:
if self._real_meter:
return self._real_meter.create_observable_counter(
name, callbacks, unit, description
)
proxy = _ProxyObservableCounter(
name, callbacks, unit=unit, description=description
)
self._instruments.append(proxy)
return proxy
def create_histogram(
self,
name: str,
unit: str = "",
description: str = "",
) -> Histogram:
with self._lock:
if self._real_meter:
return self._real_meter.create_histogram(
name, unit, description
)
proxy = _ProxyHistogram(name, unit, description)
self._instruments.append(proxy)
return proxy
def create_gauge(
self,
name: str,
unit: str = "",
description: str = "",
) -> Gauge:
with self._lock:
if self._real_meter:
return self._real_meter.create_gauge(name, unit, description)
proxy = _ProxyGauge(name, unit, description)
self._instruments.append(proxy)
return proxy
def create_observable_gauge(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableGauge:
with self._lock:
if self._real_meter:
return self._real_meter.create_observable_gauge(
name, callbacks, unit, description
)
proxy = _ProxyObservableGauge(
name, callbacks, unit=unit, description=description
)
self._instruments.append(proxy)
return proxy
def create_observable_up_down_counter(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableUpDownCounter:
with self._lock:
if self._real_meter:
return self._real_meter.create_observable_up_down_counter(
name,
callbacks,
unit,
description,
)
proxy = _ProxyObservableUpDownCounter(
name, callbacks, unit=unit, description=description
)
self._instruments.append(proxy)
return proxy
class NoOpMeter(Meter):
"""The default Meter used when no Meter implementation is available.
All operations are no-op.
"""
[docs] def create_counter(
self,
name: str,
unit: str = "",
description: str = "",
) -> Counter:
"""Returns a no-op Counter."""
if self._is_instrument_registered(
name, NoOpCounter, unit, description
)[0]:
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
Counter.__name__,
unit,
description,
)
return NoOpCounter(name, unit=unit, description=description)
[docs] def create_gauge(
self,
name: str,
unit: str = "",
description: str = "",
) -> Gauge:
"""Returns a no-op Gauge."""
if self._is_instrument_registered(name, NoOpGauge, unit, description)[
0
]:
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
Gauge.__name__,
unit,
description,
)
return NoOpGauge(name, unit=unit, description=description)
[docs] def create_up_down_counter(
self,
name: str,
unit: str = "",
description: str = "",
) -> UpDownCounter:
"""Returns a no-op UpDownCounter."""
if self._is_instrument_registered(
name, NoOpUpDownCounter, unit, description
)[0]:
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
UpDownCounter.__name__,
unit,
description,
)
return NoOpUpDownCounter(name, unit=unit, description=description)
[docs] def create_observable_counter(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableCounter:
"""Returns a no-op ObservableCounter."""
if self._is_instrument_registered(
name, NoOpObservableCounter, unit, description
)[0]:
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
ObservableCounter.__name__,
unit,
description,
)
return NoOpObservableCounter(
name,
callbacks,
unit=unit,
description=description,
)
[docs] def create_histogram(
self,
name: str,
unit: str = "",
description: str = "",
) -> Histogram:
"""Returns a no-op Histogram."""
if self._is_instrument_registered(
name, NoOpHistogram, unit, description
)[0]:
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
Histogram.__name__,
unit,
description,
)
return NoOpHistogram(name, unit=unit, description=description)
[docs] def create_observable_gauge(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableGauge:
"""Returns a no-op ObservableGauge."""
if self._is_instrument_registered(
name, NoOpObservableGauge, unit, description
)[0]:
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
ObservableGauge.__name__,
unit,
description,
)
return NoOpObservableGauge(
name,
callbacks,
unit=unit,
description=description,
)
[docs] def create_observable_up_down_counter(
self,
name: str,
callbacks: Optional[Sequence[CallbackT]] = None,
unit: str = "",
description: str = "",
) -> ObservableUpDownCounter:
"""Returns a no-op ObservableUpDownCounter."""
if self._is_instrument_registered(
name, NoOpObservableUpDownCounter, unit, description
)[0]:
_logger.warning(
"An instrument with name %s, type %s, unit %s and "
"description %s has been created already.",
name,
ObservableUpDownCounter.__name__,
unit,
description,
)
return NoOpObservableUpDownCounter(
name,
callbacks,
unit=unit,
description=description,
)
_METER_PROVIDER_SET_ONCE = Once()
_METER_PROVIDER: Optional[MeterProvider] = None
_PROXY_METER_PROVIDER = _ProxyMeterProvider()
def get_meter(
name: str,
version: str = "",
meter_provider: Optional[MeterProvider] = None,
schema_url: Optional[str] = None,
) -> "Meter":
"""Returns a `Meter` for use by the given instrumentation library.
This function is a convenience wrapper for
`opentelemetry.metrics.MeterProvider.get_meter`.
If meter_provider is omitted the current configured one is used.
"""
if meter_provider is None:
meter_provider = get_meter_provider()
return meter_provider.get_meter(name, version, schema_url)
def _set_meter_provider(meter_provider: MeterProvider, log: bool) -> None:
def set_mp() -> None:
global _METER_PROVIDER # pylint: disable=global-statement
_METER_PROVIDER = meter_provider
# gives all proxies real instruments off the newly set meter provider
_PROXY_METER_PROVIDER.on_set_meter_provider(meter_provider)
did_set = _METER_PROVIDER_SET_ONCE.do_once(set_mp)
if log and not did_set:
_logger.warning("Overriding of current MeterProvider is not allowed")
def set_meter_provider(meter_provider: MeterProvider) -> None:
"""Sets the current global :class:`~.MeterProvider` object.
This can only be done once, a warning will be logged if any further attempt
is made.
"""
_set_meter_provider(meter_provider, log=True)
def get_meter_provider() -> MeterProvider:
"""Gets the current global :class:`~.MeterProvider` object."""
if _METER_PROVIDER is None:
if OTEL_PYTHON_METER_PROVIDER not in environ:
return _PROXY_METER_PROVIDER
meter_provider: MeterProvider = _load_provider( # type: ignore
OTEL_PYTHON_METER_PROVIDER, "meter_provider"
)
_set_meter_provider(meter_provider, log=False)
# _METER_PROVIDER will have been set by one thread
return cast("MeterProvider", _METER_PROVIDER)