Source code for opentelemetry.sdk._logs._internal.export

# Copyright The OpenTelemetry Authors
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import abc
import enum
import logging
import sys
from collections.abc import Callable, Sequence
from os import environ, linesep
from typing import IO

from typing_extensions import deprecated

from opentelemetry.context import (
    _ON_EMIT_RECURSION_COUNT_KEY,
    _SUPPRESS_INSTRUMENTATION_KEY,
    attach,
    detach,
    get_value,
    set_value,
)
from opentelemetry.metrics import MeterProvider, get_meter_provider
from opentelemetry.sdk._logs import (
    LogRecordProcessor,
    ReadableLogRecord,
    ReadWriteLogRecord,
)
from opentelemetry.sdk._shared_internal import (
    BatchProcessor,
    DuplicateFilter,
)
from opentelemetry.sdk._shared_internal._processor_metrics import (
    create_processor_metrics,
)
from opentelemetry.sdk.environment_variables import (
    OTEL_BLRP_EXPORT_TIMEOUT,
    OTEL_BLRP_MAX_EXPORT_BATCH_SIZE,
    OTEL_BLRP_MAX_QUEUE_SIZE,
    OTEL_BLRP_SCHEDULE_DELAY,
    OTEL_PYTHON_SDK_INTERNAL_METRICS_ENABLED,
)
from opentelemetry.sdk.environment_variables._internal import (
    parse_boolean_environment_variable,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv._incubating.attributes.otel_attributes import (
    OtelComponentTypeValues,
)

_DEFAULT_SCHEDULE_DELAY_MILLIS = 1000
_DEFAULT_MAX_EXPORT_BATCH_SIZE = 512
_DEFAULT_EXPORT_TIMEOUT_MILLIS = 30000
_DEFAULT_MAX_QUEUE_SIZE = 2048
_ENV_VAR_INT_VALUE_ERROR_MESSAGE = (
    "Unable to parse value for %s as integer. Defaulting to %s."
)
_logger = logging.getLogger(__name__)
_logger.addFilter(DuplicateFilter())

_propagate_false_logger = logging.getLogger(__name__ + ".propagate.false")
_propagate_false_logger.propagate = False


[docs] class LogRecordExportResult(enum.Enum): SUCCESS = 0 FAILURE = 1
[docs] @deprecated( "Use LogRecordExportResult. Since logs are not stable yet this WILL be removed in future releases." ) class LogExportResult(enum.Enum): SUCCESS = 0 FAILURE = 1
[docs] class LogRecordExporter(abc.ABC): """Interface for exporting logs. Interface to be implemented by services that want to export logs received in their own format. To export data this MUST be registered to the :class:`opentelemetry.sdk._logs.Logger` using a log processor. Important --------- The ``export()`` method may raise exceptions (e.g., network errors, timeouts, serialization errors). It is the responsibility of the ``LogRecordProcessor`` calling this exporter to handle these exceptions appropriately to prevent application crashes. See ``LogRecordProcessor`` for guidance on implementing proper error handling. """
[docs] @abc.abstractmethod def export( self, batch: Sequence[ReadableLogRecord] ) -> LogRecordExportResult: """Exports a batch of logs. Args: batch: The list of ``ReadableLogRecord`` objects to be exported. Returns: The result of the export. Raises: Exception: This method may raise exceptions on network errors, timeouts, or other failures. Callers (i.e., log processors) should handle these exceptions to comply with OpenTelemetry error handling principles. """
[docs] @abc.abstractmethod def shutdown(self): """Shuts down the exporter. Called when the SDK is shut down. """
[docs] @deprecated( "Use LogRecordExporter. Since logs are not stable yet this WILL be removed in future releases." ) class LogExporter(LogRecordExporter): pass
[docs] class ConsoleLogRecordExporter(LogRecordExporter): """Implementation of :class:`LogRecordExporter` that prints log records to the console. This class can be used for diagnostic purposes. It prints the exported log records to the console STDOUT. """ def __init__( self, out: IO = sys.stdout, formatter: Callable[[ReadableLogRecord], str] = lambda record: ( record.to_json() + linesep ), ): self.out = out self.formatter = formatter
[docs] def export(self, batch: Sequence[ReadableLogRecord]): for log_record in batch: self.out.write(self.formatter(log_record)) self.out.flush() return LogRecordExportResult.SUCCESS
[docs] def shutdown(self): pass
[docs] @deprecated( "Use ConsoleLogRecordExporter. Since logs are not stable yet this WILL be removed in future releases." ) class ConsoleLogExporter(ConsoleLogRecordExporter): pass
[docs] class SimpleLogRecordProcessor(LogRecordProcessor): """Implementation of LogRecordProcessor that exports logs synchronously. This processor passes received logs directly to the configured ``LogRecordExporter`` as soon as they are emitted. This class serves as a reference implementation for custom log processors, demonstrating proper error handling. Note how the ``on_emit`` method wraps the exporter call in a try/except block to prevent exceptions from propagating to the application. """ def __init__( self, exporter: LogRecordExporter, *, meter_provider: MeterProvider | None = None, ): self._exporter = exporter self._shutdown = False self._metrics = create_processor_metrics( "logs", OtelComponentTypeValues.SIMPLE_LOG_PROCESSOR, meter_provider or get_meter_provider(), enabled=parse_boolean_environment_variable( OTEL_PYTHON_SDK_INTERNAL_METRICS_ENABLED ), )
[docs] def on_emit(self, log_record: ReadWriteLogRecord): # Prevent entering a recursive loop. cnt = get_value(_ON_EMIT_RECURSION_COUNT_KEY) or 0 # Recursive depth of 3 is sort of arbitrary. It's possible that an Exporter.export call # emits a log which returns us to this function, but when we call Exporter.export again the log # is no longer emitted and we exit this recursive loop naturally, a depth of >3 allows 3 # recursive log calls but exits after because it's likely endless. if cnt > 3: # pyright: ignore[reportOperatorIssue] _propagate_false_logger.warning( "SimpleLogRecordProcessor.on_emit has entered a recursive loop. Dropping log and exiting the loop." ) return token = attach( set_value( _SUPPRESS_INSTRUMENTATION_KEY, True, set_value(_ON_EMIT_RECURSION_COUNT_KEY, cnt + 1), # pyright: ignore[reportOperatorIssue] ) ) error: Exception | None = None try: if self._shutdown: _logger.warning("Processor is already shutdown, ignoring call") return # Convert ReadWriteLogRecord to ReadableLogRecord before exporting # Note: resource should not be None at this point as it's set during Logger.emit() resource = ( log_record.resource if log_record.resource is not None else Resource.create({}) ) readable_log_record = ReadableLogRecord( log_record=log_record.log_record, resource=resource, instrumentation_scope=log_record.instrumentation_scope, limits=log_record.limits, ) self._exporter.export((readable_log_record,)) except Exception as err: # pylint: disable=broad-exception-caught error = err _logger.exception("Exception while exporting logs.") finally: self._metrics.finish_items(1, error) detach(token)
[docs] def shutdown(self): self._shutdown = True self._exporter.shutdown()
[docs] def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=no-self-use return True
[docs] class BatchLogRecordProcessor(LogRecordProcessor): """This is an implementation of LogRecordProcessor which creates batches of received logs and sends them to the configured LogRecordExporter. `BatchLogRecordProcessor` is configurable with the following environment variables which correspond to constructor parameters: - :envvar:`OTEL_BLRP_SCHEDULE_DELAY` - :envvar:`OTEL_BLRP_MAX_QUEUE_SIZE` - :envvar:`OTEL_BLRP_MAX_EXPORT_BATCH_SIZE` - :envvar:`OTEL_BLRP_EXPORT_TIMEOUT` All the logic for emitting logs, shutting down etc. resides in the BatchProcessor class. """ def __init__( self, exporter: LogRecordExporter, schedule_delay_millis: float | None = None, max_export_batch_size: int | None = None, export_timeout_millis: float | None = None, max_queue_size: int | None = None, *, meter_provider: MeterProvider | None = None, ): if max_queue_size is None: max_queue_size = BatchLogRecordProcessor._default_max_queue_size() if schedule_delay_millis is None: schedule_delay_millis = ( BatchLogRecordProcessor._default_schedule_delay_millis() ) if max_export_batch_size is None: max_export_batch_size = ( BatchLogRecordProcessor._default_max_export_batch_size() ) # Not used. No way currently to pass timeout to export. if export_timeout_millis is None: export_timeout_millis = ( BatchLogRecordProcessor._default_export_timeout_millis() ) BatchLogRecordProcessor._validate_arguments( max_queue_size, schedule_delay_millis, max_export_batch_size ) # Initializes BatchProcessor self._batch_processor = BatchProcessor( exporter, schedule_delay_millis, max_export_batch_size, export_timeout_millis, max_queue_size, "Log", create_processor_metrics( "logs", OtelComponentTypeValues.BATCHING_LOG_PROCESSOR, meter_provider or get_meter_provider(), capacity=max_queue_size, enabled=parse_boolean_environment_variable( OTEL_PYTHON_SDK_INTERNAL_METRICS_ENABLED ), ), )
[docs] def on_emit(self, log_record: ReadWriteLogRecord) -> None: # Convert ReadWriteLogRecord to ReadableLogRecord before passing to BatchProcessor # Note: resource should not be None at this point as it's set during Logger.emit() resource = ( log_record.resource if log_record.resource is not None else Resource.create({}) ) readable_log_record = ReadableLogRecord( log_record=log_record.log_record, resource=resource, instrumentation_scope=log_record.instrumentation_scope, limits=log_record.limits, ) return self._batch_processor.emit(readable_log_record)
[docs] def shutdown(self): return self._batch_processor.shutdown()
[docs] def force_flush(self, timeout_millis: int | None = None) -> bool: return self._batch_processor.force_flush(timeout_millis)
@staticmethod def _default_max_queue_size(): try: return int( environ.get(OTEL_BLRP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE) ) except ValueError: _logger.exception( _ENV_VAR_INT_VALUE_ERROR_MESSAGE, OTEL_BLRP_MAX_QUEUE_SIZE, _DEFAULT_MAX_QUEUE_SIZE, ) return _DEFAULT_MAX_QUEUE_SIZE @staticmethod def _default_schedule_delay_millis(): try: return int( environ.get( OTEL_BLRP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS ) ) except ValueError: _logger.exception( _ENV_VAR_INT_VALUE_ERROR_MESSAGE, OTEL_BLRP_SCHEDULE_DELAY, _DEFAULT_SCHEDULE_DELAY_MILLIS, ) return _DEFAULT_SCHEDULE_DELAY_MILLIS @staticmethod def _default_max_export_batch_size(): try: return int( environ.get( OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, _DEFAULT_MAX_EXPORT_BATCH_SIZE, ) ) except ValueError: _logger.exception( _ENV_VAR_INT_VALUE_ERROR_MESSAGE, OTEL_BLRP_MAX_EXPORT_BATCH_SIZE, _DEFAULT_MAX_EXPORT_BATCH_SIZE, ) return _DEFAULT_MAX_EXPORT_BATCH_SIZE @staticmethod def _default_export_timeout_millis(): try: return int( environ.get( OTEL_BLRP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS ) ) except ValueError: _logger.exception( _ENV_VAR_INT_VALUE_ERROR_MESSAGE, OTEL_BLRP_EXPORT_TIMEOUT, _DEFAULT_EXPORT_TIMEOUT_MILLIS, ) return _DEFAULT_EXPORT_TIMEOUT_MILLIS @staticmethod def _validate_arguments( max_queue_size, schedule_delay_millis, max_export_batch_size ): if max_queue_size <= 0: raise ValueError("max_queue_size must be a positive integer.") if schedule_delay_millis <= 0: raise ValueError("schedule_delay_millis must be positive.") if max_export_batch_size <= 0: raise ValueError( "max_export_batch_size must be a positive integer." ) if max_export_batch_size > max_queue_size: raise ValueError( "max_export_batch_size must be less than or equal to max_queue_size." )