Source code for opentelemetry.exporter.otlp.json.file.metric_exporter

# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0

import logging
from os import PathLike
from typing import IO, Any, overload

from opentelemetry.exporter.otlp.json.common._internal import (
    _get_aggregation,
    _get_temporality,
)
from opentelemetry.exporter.otlp.json.common.metrics_encoder import (
    encode_metrics,
)
from opentelemetry.exporter.otlp.json.file._internal import _FileExporter
from opentelemetry.sdk.metrics.export import (
    AggregationTemporality,
    MetricExporter,
    MetricExportResult,
    MetricsData,
)
from opentelemetry.sdk.metrics.view import Aggregation

_logger = logging.getLogger(__name__)


def _encode_metrics_to_dict(
    metrics_data: MetricsData,
) -> dict[str, Any] | None:
    data = encode_metrics(metrics_data)
    return data.to_dict() if data.resource_metrics else None


[docs] class FileMetricExporter(MetricExporter): @overload def __init__( self, path: str | PathLike[str], *, preferred_temporality: dict[type, AggregationTemporality] | None = None, preferred_aggregation: dict[type, Aggregation] | None = None, ) -> None: ... @overload def __init__( self, *, stream: IO[str], preferred_temporality: dict[type, AggregationTemporality] | None = None, preferred_aggregation: dict[type, Aggregation] | None = None, ) -> None: ... @overload def __init__( self, *, preferred_temporality: dict[type, AggregationTemporality] | None = None, preferred_aggregation: dict[type, Aggregation] | None = None, ) -> None: ... def __init__( self, path: str | PathLike[str] | None = None, *, stream: IO[str] | None = None, preferred_temporality: dict[type, AggregationTemporality] | None = None, preferred_aggregation: dict[type, Aggregation] | None = None, ) -> None: MetricExporter.__init__( self, preferred_temporality=_get_temporality(preferred_temporality), preferred_aggregation=_get_aggregation(preferred_aggregation), ) self._exporter: _FileExporter[MetricsData] = _FileExporter( encode=_encode_metrics_to_dict, kind="metrics", logger=_logger, path=path, stream=stream, )
[docs] def export( self, metrics_data: MetricsData, timeout_millis: float = 10_000, **kwargs, ) -> MetricExportResult: return ( MetricExportResult.SUCCESS if self._exporter.export(metrics_data) else MetricExportResult.FAILURE )
[docs] def shutdown(self, timeout_millis: float = 30_000, **kwargs) -> None: self._exporter.shutdown()
[docs] def force_flush(self, timeout_millis: float = 10_000) -> bool: return True