# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
import logging
from collections.abc import Sequence
from os import PathLike
from typing import IO, Any, overload
from opentelemetry.exporter.otlp.json.common._log_encoder import encode_logs
from opentelemetry.exporter.otlp.json.file._internal import _FileExporter
from opentelemetry.sdk._logs import ReadableLogRecord
from opentelemetry.sdk._logs.export import (
LogRecordExporter,
LogRecordExportResult,
)
from opentelemetry.sdk._shared_internal import DuplicateFilter
_logger = logging.getLogger(__name__)
_logger.addFilter(DuplicateFilter())
def _encode_logs_to_dict(
batch: Sequence[ReadableLogRecord],
) -> dict[str, Any] | None:
data = encode_logs(batch)
return data.to_dict() if data.resource_logs else None
[docs]
class FileLogExporter(LogRecordExporter):
@overload
def __init__(
self,
path: str | PathLike[str],
) -> None: ...
@overload
def __init__(
self,
*,
stream: IO[str],
) -> None: ...
@overload
def __init__(
self,
) -> None: ...
def __init__(
self,
path: str | PathLike[str] | None = None,
*,
stream: IO[str] | None = None,
) -> None:
self._exporter: _FileExporter[Sequence[ReadableLogRecord]] = (
_FileExporter(
encode=_encode_logs_to_dict,
kind="logs",
logger=_logger,
path=path,
stream=stream,
)
)
[docs]
def export(
self, batch: Sequence[ReadableLogRecord]
) -> LogRecordExportResult:
return (
LogRecordExportResult.SUCCESS
if self._exporter.export(batch)
else LogRecordExportResult.FAILURE
)
[docs]
def shutdown(self) -> None:
self._exporter.shutdown()
# pylint: disable-next=no-self-use
[docs]
def force_flush(self, timeout_millis: float = 10_000) -> bool:
return True