# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
"""
This library allows export of metrics data to `Prometheus <https://prometheus.io/>`_.
Usage
-----
The **OpenTelemetry Prometheus Exporter** allows export of `OpenTelemetry`_
metrics to `Prometheus`_.
.. _Prometheus: https://prometheus.io/
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/
.. code:: python
from prometheus_client import start_http_server
from opentelemetry.exporter.prometheus import PrometheusMetricReader
from opentelemetry.metrics import get_meter_provider, set_meter_provider
from opentelemetry.sdk.metrics import MeterProvider
# Start Prometheus client
start_http_server(port=8000, addr="localhost")
# Exporter to export metrics to Prometheus
prefix = "MyAppPrefix"
reader = PrometheusMetricReader(prefix=prefix)
# Meter is responsible for creating and recording metrics
set_meter_provider(MeterProvider(metric_readers=[reader]))
meter = get_meter_provider().get_meter("myapp", "0.1.2")
counter = meter.create_counter(
"requests",
"requests",
"number of requests",
)
# Labels are used to identify key-values that are associated with a specific
# metric that you want to record. These are useful for pre-aggregation and can
# be used to store custom dimensions pertaining to a metric
labels = {"environment": "staging"}
counter.add(25, labels)
input("Press any key to exit...")
API
---
"""
from collections import deque
from collections.abc import Callable, Iterable, Sequence
from itertools import chain
from json import dumps
from logging import getLogger
from os import environ
from typing import (
Any,
TypeVar,
)
from prometheus_client import CollectorRegistry, start_http_server
from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
GaugeMetricFamily,
HistogramMetricFamily,
InfoMetricFamily,
)
from prometheus_client.core import Metric as PrometheusMetric
from opentelemetry.exporter.prometheus._mapping import (
map_unit,
sanitize_attribute,
sanitize_full_name,
)
from opentelemetry.sdk.environment_variables import (
OTEL_EXPORTER_PROMETHEUS_HOST,
OTEL_EXPORTER_PROMETHEUS_PORT,
)
from opentelemetry.sdk.metrics import (
Counter,
ObservableCounter,
ObservableGauge,
ObservableUpDownCounter,
UpDownCounter,
)
from opentelemetry.sdk.metrics import Histogram as HistogramInstrument
from opentelemetry.sdk.metrics.export import (
AggregationTemporality,
DataT,
Gauge,
Histogram,
HistogramDataPoint,
Metric,
MetricReader,
MetricsData,
Sum,
)
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
OtelComponentTypeValues,
)
from opentelemetry.util.types import Attributes, AttributeValue
_logger = getLogger(__name__)
_TARGET_INFO_NAME = "target"
_TARGET_INFO_DESCRIPTION = "Target metadata"
_OTEL_SCOPE_NAME_LABEL = "otel_scope_name"
_OTEL_SCOPE_VERSION_LABEL = "otel_scope_version"
_OTEL_SCOPE_SCHEMA_URL_LABEL = "otel_scope_schema_url"
_OTEL_SCOPE_ATTR_PREFIX = "otel_scope_"
def _convert_buckets(
bucket_counts: Sequence[int], explicit_bounds: Sequence[float]
) -> Sequence[tuple[str, int]]:
buckets = []
total_count = 0
for upper_bound, count in zip(
chain(explicit_bounds, ["+Inf"]),
bucket_counts,
):
total_count += count
buckets.append((f"{upper_bound}", total_count))
return buckets
def _should_convert_sum_to_gauge(metric: Metric) -> bool:
# The Prometheus compatibility spec requires cumulative non-monotonic Sums
# to be exported as Gauges.
if not isinstance(metric.data, Sum):
return False
return (
not metric.data.is_monotonic
and metric.data.aggregation_temporality
== AggregationTemporality.CUMULATIVE
)
_FamilyT = TypeVar("_FamilyT", bound=PrometheusMetric)
def _get_or_create_family(
registry: dict[str, PrometheusMetric],
family_id: str,
factory: Callable[..., _FamilyT],
*,
name: str,
documentation: str,
labels: Sequence[str],
unit: str,
) -> _FamilyT:
if family_id not in registry:
registry[family_id] = factory(
name=name,
documentation=documentation,
labels=labels,
unit=unit,
)
return registry[family_id]
def _populate_counter_family(
registry: dict[str, PrometheusMetric],
per_metric_family_id: str,
metric_name: str,
description: str,
unit: str,
label_keys: Sequence[str],
label_rows: Sequence[Sequence[str]],
values: Sequence[float],
) -> None:
family_id = "|".join([per_metric_family_id, CounterMetricFamily.__name__])
family = _get_or_create_family(
registry,
family_id,
CounterMetricFamily,
name=metric_name,
documentation=description,
labels=label_keys,
unit=unit,
)
for label_values, value in zip(label_rows, values):
family.add_metric(labels=label_values, value=value)
def _populate_gauge_family(
registry: dict[str, PrometheusMetric],
per_metric_family_id: str,
metric_name: str,
description: str,
unit: str,
label_keys: Sequence[str],
label_rows: Sequence[Sequence[str]],
values: Sequence[float],
) -> None:
family_id = "|".join([per_metric_family_id, GaugeMetricFamily.__name__])
family = _get_or_create_family(
registry,
family_id,
GaugeMetricFamily,
name=metric_name,
documentation=description,
labels=label_keys,
unit=unit,
)
for label_values, value in zip(label_rows, values):
family.add_metric(labels=label_values, value=value)
def _populate_histogram_family(
registry: dict[str, PrometheusMetric],
per_metric_family_id: str,
metric_name: str,
description: str,
unit: str,
label_keys: Sequence[str],
label_rows: Sequence[Sequence[str]],
values: Sequence[dict[str, Any]],
) -> None:
family_id = "|".join(
[per_metric_family_id, HistogramMetricFamily.__name__]
)
family = _get_or_create_family(
registry,
family_id,
HistogramMetricFamily,
name=metric_name,
documentation=description,
labels=label_keys,
unit=unit,
)
for label_values, value in zip(label_rows, values):
family.add_metric(
labels=label_values,
buckets=_convert_buckets(
value["bucket_counts"], value["explicit_bounds"]
),
sum_value=value["sum"],
)
[docs]
class PrometheusMetricReader(MetricReader):
"""Prometheus metric exporter for OpenTelemetry.
Args:
disable_target_info: Whether to disable the ``target_info`` metric.
scope_info_enabled: Whether to include instrumentation scope labels on
exported metrics. Scope labels are exported by default.
prefix: Prefix added to exported Prometheus metric names.
"""
def __init__(
self,
disable_target_info: bool = False,
prefix: str = "",
scope_info_enabled: bool = True,
*,
registry: CollectorRegistry = REGISTRY,
) -> None:
super().__init__(
preferred_temporality={
Counter: AggregationTemporality.CUMULATIVE,
UpDownCounter: AggregationTemporality.CUMULATIVE,
HistogramInstrument: AggregationTemporality.CUMULATIVE,
ObservableCounter: AggregationTemporality.CUMULATIVE,
ObservableUpDownCounter: AggregationTemporality.CUMULATIVE,
ObservableGauge: AggregationTemporality.CUMULATIVE,
},
otel_component_type=OtelComponentTypeValues.PROMETHEUS_HTTP_TEXT_METRIC_EXPORTER,
)
self._collector = _CustomCollector(
disable_target_info=disable_target_info,
prefix=prefix,
scope_info_enabled=scope_info_enabled,
)
self._registry = registry
self._registry.register(self._collector)
self._collector._callback = self.collect
self._prefix = prefix
def _receive_metrics(
self,
metrics_data: MetricsData,
timeout_millis: float = 10_000,
**kwargs,
) -> None:
if metrics_data is None:
return
self._collector.add_metrics_data(metrics_data)
[docs]
def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None:
self._registry.unregister(self._collector)
class _CustomCollector:
"""_CustomCollector represents the Prometheus Collector object
See more:
https://github.com/prometheus/client_python#custom-collectors
"""
def __init__(
self,
disable_target_info: bool = False,
prefix: str = "",
scope_info_enabled: bool = True,
):
self._callback = None
self._metrics_datas: deque[MetricsData] = deque()
self._disable_target_info = disable_target_info
self._scope_info_enabled = scope_info_enabled
self._target_info = None
self._prefix = prefix
def add_metrics_data(self, metrics_data: MetricsData) -> None:
"""Add metrics to Prometheus data"""
self._metrics_datas.append(metrics_data)
def collect(self) -> Iterable[PrometheusMetric]:
"""Collect fetches the metrics from OpenTelemetry
and delivers them as Prometheus Metrics.
Collect is invoked every time a ``prometheus.Gatherer`` is run
for example when the HTTP endpoint is invoked by Prometheus.
"""
if self._callback is not None:
self._callback()
metric_family_id_metric_family = {}
if len(self._metrics_datas):
if not self._disable_target_info:
if self._target_info is None:
attributes: Attributes = {}
for res in self._metrics_datas[0].resource_metrics:
attributes = {**attributes, **res.resource.attributes}
self._target_info = self._create_info_metric(
_TARGET_INFO_NAME, _TARGET_INFO_DESCRIPTION, attributes
)
metric_family_id_metric_family[_TARGET_INFO_NAME] = (
self._target_info
)
while self._metrics_datas:
self._translate_to_prometheus(
self._metrics_datas.popleft(), metric_family_id_metric_family
)
if metric_family_id_metric_family:
yield from metric_family_id_metric_family.values()
def _translate_to_prometheus(
self,
metrics_data: MetricsData,
metric_family_id_metric_family: dict[str, PrometheusMetric],
):
for rm in metrics_data.resource_metrics:
for sm in rm.scope_metrics:
scope_attrs = self._build_scope_attrs(sm.scope)
for metric in sm.metrics:
self._translate_metric(
metric,
scope_attrs,
metric_family_id_metric_family,
)
def _translate_metric(
self,
metric: Metric,
scope_attrs: dict[str, Any],
metric_family_id_metric_family: dict[str, PrometheusMetric],
) -> None:
metric_name = self._resolve_metric_name(metric.name)
description = metric.description or ""
unit = map_unit(metric.unit or "")
label_keys, label_rows, values = self._collect_data_points(
metric.data, scope_attrs
)
per_metric_family_id = "|".join((metric_name, description, unit))
convert_sum_to_gauge = _should_convert_sum_to_gauge(metric)
if isinstance(metric.data, Sum) and not convert_sum_to_gauge:
_populate_counter_family(
registry=metric_family_id_metric_family,
per_metric_family_id=per_metric_family_id,
metric_name=metric_name,
description=description,
unit=unit,
label_keys=label_keys,
label_rows=label_rows,
values=values,
)
elif isinstance(metric.data, Gauge) or convert_sum_to_gauge:
_populate_gauge_family(
registry=metric_family_id_metric_family,
per_metric_family_id=per_metric_family_id,
metric_name=metric_name,
description=description,
unit=unit,
label_keys=label_keys,
label_rows=label_rows,
values=values,
)
elif isinstance(metric.data, Histogram):
_populate_histogram_family(
registry=metric_family_id_metric_family,
per_metric_family_id=per_metric_family_id,
metric_name=metric_name,
description=description,
unit=unit,
label_keys=label_keys,
label_rows=label_rows,
values=values,
)
else:
_logger.warning("Unsupported metric data. %s", type(metric.data))
def _build_scope_attrs(
self, scope: InstrumentationScope
) -> dict[str, AttributeValue]:
if not self._scope_info_enabled:
return {}
attrs: dict[str, AttributeValue] = {}
if scope.attributes:
for key, value in scope.attributes.items():
attrs[_OTEL_SCOPE_ATTR_PREFIX + key] = value
attrs[_OTEL_SCOPE_NAME_LABEL] = scope.name or ""
attrs[_OTEL_SCOPE_VERSION_LABEL] = scope.version or ""
attrs[_OTEL_SCOPE_SCHEMA_URL_LABEL] = scope.schema_url or ""
return attrs
def _resolve_metric_name(self, name: str) -> str:
if self._prefix:
name = self._prefix + "_" + name
return sanitize_full_name(name)
def _collect_data_points(
self,
metric_data: DataT,
scope_attrs: dict[str, AttributeValue],
) -> tuple[list[str], list[list[str]], list[float | dict[str, Any]]]:
keys: set[str] = set()
rows: list[dict[str, str]] = []
values: list[float | dict[str, Any]] = []
for point in metric_data.data_points:
labels: dict[str, str] = {}
for key, value in chain(
scope_attrs.items(),
point.attributes.items(),
):
label = sanitize_attribute(key)
keys.add(label)
labels[label] = self._check_value(value)
rows.append(labels)
if isinstance(point, HistogramDataPoint):
values.append(
{
"bucket_counts": point.bucket_counts,
"explicit_bounds": point.explicit_bounds,
"sum": point.sum,
}
)
else:
values.append(point.value)
label_keys = sorted(keys)
# Backfill missing labels with "" so every data point exposes the
# full label set expected by the Prometheus family.
label_rows = [
[labels.get(k, "") for k in label_keys] for labels in rows
]
return label_keys, label_rows, values
# pylint: disable=no-self-use
def _check_value(self, value: int | float | str | Sequence) -> str:
"""Check the label value and return is appropriate representation"""
if not isinstance(value, str):
return dumps(value, default=str)
return str(value)
def _create_info_metric(
self, name: str, description: str, attributes: dict[str, str]
) -> InfoMetricFamily:
"""Create an Info Metric Family with list of attributes"""
# sanitize the attribute names according to Prometheus rule
attributes = {
sanitize_attribute(key): self._check_value(value)
for key, value in attributes.items()
}
info = InfoMetricFamily(name, description, labels=attributes)
info.add_metric(labels=list(attributes.keys()), value=attributes)
return info
class _AutoPrometheusMetricReader(PrometheusMetricReader):
"""Thin wrapper around PrometheusMetricReader used for the opentelemetry_metrics_exporter entry point.
This allows users to use the prometheus exporter with opentelemetry-instrument. It handles
starting the Prometheus http server on the the correct port and host.
"""
def __init__(self) -> None:
super().__init__()
# Default values are specified in
# https://github.com/open-telemetry/opentelemetry-specification/blob/v1.24.0/specification/configuration/sdk-environment-variables.md#prometheus-exporter
start_http_server(
port=int(environ.get(OTEL_EXPORTER_PROMETHEUS_PORT, "9464")),
addr=environ.get(OTEL_EXPORTER_PROMETHEUS_HOST, "localhost"),
)