Source code for opentelemetry.sdk._logs._internal

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import abc
import atexit
import concurrent.futures
import json
import logging
import threading
import traceback
from time import time_ns
from typing import Any, Callable, Optional, Tuple, Union

from opentelemetry._logs import Logger as APILogger
from opentelemetry._logs import LoggerProvider as APILoggerProvider
from opentelemetry._logs import LogRecord as APILogRecord
from opentelemetry._logs import (
    SeverityNumber,
    get_logger,
    get_logger_provider,
    std_to_otel,
)
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.util import ns_to_iso_str
from opentelemetry.sdk.util.instrumentation import InstrumentationScope
from opentelemetry.semconv.trace import SpanAttributes
from opentelemetry.trace import (
    format_span_id,
    format_trace_id,
    get_current_span,
)
from opentelemetry.trace.span import TraceFlags
from opentelemetry.util.types import Attributes

_logger = logging.getLogger(__name__)


[docs]class LogRecord(APILogRecord): """A LogRecord instance represents an event being logged. LogRecord instances are created and emitted via `Logger` every time something is logged. They contain all the information pertinent to the event being logged. """ def __init__( self, timestamp: Optional[int] = None, observed_timestamp: Optional[int] = None, trace_id: Optional[int] = None, span_id: Optional[int] = None, trace_flags: Optional[TraceFlags] = None, severity_text: Optional[str] = None, severity_number: Optional[SeverityNumber] = None, body: Optional[Any] = None, resource: Optional[Resource] = None, attributes: Optional[Attributes] = None, ): super().__init__( **{ "timestamp": timestamp, "observed_timestamp": observed_timestamp, "trace_id": trace_id, "span_id": span_id, "trace_flags": trace_flags, "severity_text": severity_text, "severity_number": severity_number, "body": body, "attributes": attributes, } ) self.resource = resource def __eq__(self, other: object) -> bool: if not isinstance(other, LogRecord): return NotImplemented return self.__dict__ == other.__dict__
[docs] def to_json(self, indent=4) -> str: return json.dumps( { "body": self.body, "severity_number": repr(self.severity_number), "severity_text": self.severity_text, "attributes": self.attributes, "timestamp": ns_to_iso_str(self.timestamp), "trace_id": f"0x{format_trace_id(self.trace_id)}" if self.trace_id is not None else "", "span_id": f"0x{format_span_id(self.span_id)}" if self.span_id is not None else "", "trace_flags": self.trace_flags, "resource": repr(self.resource.attributes) if self.resource else "", }, indent=indent, )
[docs]class LogData: """Readable LogRecord data plus associated InstrumentationLibrary.""" def __init__( self, log_record: LogRecord, instrumentation_scope: InstrumentationScope, ): self.log_record = log_record self.instrumentation_scope = instrumentation_scope
[docs]class LogRecordProcessor(abc.ABC): """Interface to hook the log record emitting action. Log processors can be registered directly using :func:`LoggerProvider.add_log_record_processor` and they are invoked in the same order as they were registered. """
[docs] @abc.abstractmethod def emit(self, log_data: LogData): """Emits the `LogData`"""
[docs] @abc.abstractmethod def shutdown(self): """Called when a :class:`opentelemetry.sdk._logs.Logger` is shutdown"""
[docs] @abc.abstractmethod def force_flush(self, timeout_millis: int = 30000): """Export all the received logs to the configured Exporter that have not yet been exported. Args: timeout_millis: The maximum amount of time to wait for logs to be exported. Returns: False if the timeout is exceeded, True otherwise. """
# Temporary fix until https://github.com/PyCQA/pylint/issues/4098 is resolved # pylint:disable=no-member class SynchronousMultiLogRecordProcessor(LogRecordProcessor): """Implementation of class:`LogRecordProcessor` that forwards all received events to a list of log processors sequentially. The underlying log processors are called in sequential order as they were added. """ def __init__(self): # use a tuple to avoid race conditions when adding a new log and # iterating through it on "emit". self._log_record_processors = () # type: Tuple[LogRecordProcessor, ...] self._lock = threading.Lock() def add_log_record_processor( self, log_record_processor: LogRecordProcessor ) -> None: """Adds a Logprocessor to the list of log processors handled by this instance""" with self._lock: self._log_record_processors += (log_record_processor,) def emit(self, log_data: LogData) -> None: for lp in self._log_record_processors: lp.emit(log_data) def shutdown(self) -> None: """Shutdown the log processors one by one""" for lp in self._log_record_processors: lp.shutdown() def force_flush(self, timeout_millis: int = 30000) -> bool: """Force flush the log processors one by one Args: timeout_millis: The maximum amount of time to wait for logs to be exported. If the first n log processors exceeded the timeout then remaining log processors will not be flushed. Returns: True if all the log processors flushes the logs within timeout, False otherwise. """ deadline_ns = time_ns() + timeout_millis * 1000000 for lp in self._log_record_processors: current_ts = time_ns() if current_ts >= deadline_ns: return False if not lp.force_flush((deadline_ns - current_ts) // 1000000): return False return True class ConcurrentMultiLogRecordProcessor(LogRecordProcessor): """Implementation of :class:`LogRecordProcessor` that forwards all received events to a list of log processors in parallel. Calls to the underlying log processors are forwarded in parallel by submitting them to a thread pool executor and waiting until each log processor finished its work. Args: max_workers: The number of threads managed by the thread pool executor and thus defining how many log processors can work in parallel. """ def __init__(self, max_workers: int = 2): # use a tuple to avoid race conditions when adding a new log and # iterating through it on "emit". self._log_record_processors = () # type: Tuple[LogRecordProcessor, ...] self._lock = threading.Lock() self._executor = concurrent.futures.ThreadPoolExecutor( max_workers=max_workers ) def add_log_record_processor( self, log_record_processor: LogRecordProcessor ): with self._lock: self._log_record_processors += (log_record_processor,) def _submit_and_wait( self, func: Callable[[LogRecordProcessor], Callable[..., None]], *args: Any, **kwargs: Any, ): futures = [] for lp in self._log_record_processors: future = self._executor.submit(func(lp), *args, **kwargs) futures.append(future) for future in futures: future.result() def emit(self, log_data: LogData): self._submit_and_wait(lambda lp: lp.emit, log_data) def shutdown(self): self._submit_and_wait(lambda lp: lp.shutdown) def force_flush(self, timeout_millis: int = 30000) -> bool: """Force flush the log processors in parallel. Args: timeout_millis: The maximum amount of time to wait for logs to be exported. Returns: True if all the log processors flushes the logs within timeout, False otherwise. """ futures = [] for lp in self._log_record_processors: future = self._executor.submit(lp.force_flush, timeout_millis) futures.append(future) done_futures, not_done_futures = concurrent.futures.wait( futures, timeout_millis / 1e3 ) if not_done_futures: return False for future in done_futures: if not future.result(): return False return True # skip natural LogRecord attributes # http://docs.python.org/library/logging.html#logrecord-attributes _RESERVED_ATTRS = frozenset( ( "asctime", "args", "created", "exc_info", "exc_text", "filename", "funcName", "getMessage", "levelname", "levelno", "lineno", "module", "msecs", "msg", "name", "pathname", "process", "processName", "relativeCreated", "stack_info", "thread", "threadName", ) )
[docs]class LoggingHandler(logging.Handler): """A handler class which writes logging records, in OTLP format, to a network destination or file. Supports signals from the `logging` module. https://docs.python.org/3/library/logging.html """ def __init__( self, level=logging.NOTSET, logger_provider=None, ) -> None: super().__init__(level=level) self._logger_provider = logger_provider or get_logger_provider() self._logger = get_logger( __name__, logger_provider=self._logger_provider ) @staticmethod def _get_attributes(record: logging.LogRecord) -> Attributes: attributes = { k: v for k, v in vars(record).items() if k not in _RESERVED_ATTRS } if record.exc_info: exc_type = "" message = "" stack_trace = "" exctype, value, tb = record.exc_info if exctype is not None: exc_type = exctype.__name__ if value is not None and value.args: message = value.args[0] if tb is not None: # https://github.com/open-telemetry/opentelemetry-specification/blob/9fa7c656b26647b27e485a6af7e38dc716eba98a/specification/trace/semantic_conventions/exceptions.md#stacktrace-representation stack_trace = "".join( traceback.format_exception(*record.exc_info) ) attributes[SpanAttributes.EXCEPTION_TYPE] = exc_type attributes[SpanAttributes.EXCEPTION_MESSAGE] = message attributes[SpanAttributes.EXCEPTION_STACKTRACE] = stack_trace return attributes def _translate(self, record: logging.LogRecord) -> LogRecord: timestamp = int(record.created * 1e9) span_context = get_current_span().get_span_context() attributes = self._get_attributes(record) severity_number = std_to_otel(record.levelno) return LogRecord( timestamp=timestamp, trace_id=span_context.trace_id, span_id=span_context.span_id, trace_flags=span_context.trace_flags, severity_text=record.levelname, severity_number=severity_number, body=record.getMessage(), resource=self._logger.resource, attributes=attributes, )
[docs] def emit(self, record: logging.LogRecord) -> None: """ Emit a record. The record is translated to OTel format, and then sent across the pipeline. """ self._logger.emit(self._translate(record))
[docs] def flush(self) -> None: """ Flushes the logging output. """ self._logger_provider.force_flush()
[docs]class Logger(APILogger): def __init__( self, resource: Resource, multi_log_record_processor: Union[ SynchronousMultiLogRecordProcessor, ConcurrentMultiLogRecordProcessor, ], instrumentation_scope: InstrumentationScope, ): super().__init__( instrumentation_scope.name, instrumentation_scope.version, instrumentation_scope.schema_url, ) self._resource = resource self._multi_log_record_processor = multi_log_record_processor self._instrumentation_scope = instrumentation_scope @property def resource(self): return self._resource
[docs] def emit(self, record: LogRecord): """Emits the :class:`LogData` by associating :class:`LogRecord` and instrumentation info. """ log_data = LogData(record, self._instrumentation_scope) self._multi_log_record_processor.emit(log_data)
[docs]class LoggerProvider(APILoggerProvider): def __init__( self, resource: Resource = Resource.create(), shutdown_on_exit: bool = True, multi_log_record_processor: Union[ SynchronousMultiLogRecordProcessor, ConcurrentMultiLogRecordProcessor, ] = None, ): self._resource = resource self._multi_log_record_processor = ( multi_log_record_processor or SynchronousMultiLogRecordProcessor() ) self._at_exit_handler = None if shutdown_on_exit: self._at_exit_handler = atexit.register(self.shutdown) @property def resource(self): return self._resource
[docs] def get_logger( self, name: str, version: Optional[str] = None, schema_url: Optional[str] = None, ) -> Logger: return Logger( self._resource, self._multi_log_record_processor, InstrumentationScope( name, version, schema_url, ), )
[docs] def add_log_record_processor( self, log_record_processor: LogRecordProcessor ): """Registers a new :class:`LogRecordProcessor` for this `LoggerProvider` instance. The log processors are invoked in the same order they are registered. """ self._multi_log_record_processor.add_log_record_processor( log_record_processor )
[docs] def shutdown(self): """Shuts down the log processors.""" self._multi_log_record_processor.shutdown() if self._at_exit_handler is not None: atexit.unregister(self._at_exit_handler) self._at_exit_handler = None
[docs] def force_flush(self, timeout_millis: int = 30000) -> bool: """Force flush the log processors. Args: timeout_millis: The maximum amount of time to wait for logs to be exported. Returns: True if all the log processors flushes the logs within timeout, False otherwise. """ return self._multi_log_record_processor.force_flush(timeout_millis)