Source code for opentelemetry.sdk.trace.export

# Copyright The OpenTelemetry Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import logging
import sys
import threading
import typing
from enum import Enum
from os import environ, linesep
from typing import Optional

from opentelemetry.context import (
from opentelemetry.sdk.environment_variables import (
from opentelemetry.sdk.trace import ReadableSpan, Span, SpanProcessor
from opentelemetry.util._time import _time_ns

logger = logging.getLogger(__name__)

[docs]class SpanExportResult(Enum): SUCCESS = 0 FAILURE = 1
[docs]class SpanExporter: """Interface for exporting spans. Interface to be implemented by services that want to export spans recorded in their own format. To export data this MUST be registered to the :class`opentelemetry.sdk.trace.Tracer` using a `SimpleSpanProcessor` or a `BatchSpanProcessor`. """
[docs] def export( self, spans: typing.Sequence[ReadableSpan] ) -> "SpanExportResult": """Exports a batch of telemetry data. Args: spans: The list of `opentelemetry.trace.Span` objects to be exported Returns: The result of the export """
[docs] def shutdown(self) -> None: """Shuts down the exporter. Called when the SDK is shut down. """
[docs]class SimpleSpanProcessor(SpanProcessor): """Simple SpanProcessor implementation. SimpleSpanProcessor is an implementation of `SpanProcessor` that passes ended spans directly to the configured `SpanExporter`. """ def __init__(self, span_exporter: SpanExporter): self.span_exporter = span_exporter
[docs] def on_start( self, span: Span, parent_context: typing.Optional[Context] = None ) -> None: pass
[docs] def on_end(self, span: ReadableSpan) -> None: if not span.context.trace_flags.sampled: return token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) try: self.span_exporter.export((span,)) # pylint: disable=broad-except except Exception: logger.exception("Exception while exporting Span.") detach(token)
[docs] def shutdown(self) -> None: self.span_exporter.shutdown()
[docs] def force_flush(self, timeout_millis: int = 30000) -> bool: # pylint: disable=unused-argument return True
class _FlushRequest: """Represents a request for the BatchSpanProcessor to flush spans.""" __slots__ = ["event", "num_spans"] def __init__(self): self.event = threading.Event() self.num_spans = 0
[docs]class BatchSpanProcessor(SpanProcessor): """Batch span processor implementation. `BatchSpanProcessor` is an implementation of `SpanProcessor` that batches ended spans and pushes them to the configured `SpanExporter`. `BatchSpanProcessor` is configurable with the following environment variables which correspond to constructor parameters: - :envvar:`OTEL_BSP_SCHEDULE_DELAY` - :envvar:`OTEL_BSP_MAX_QUEUE_SIZE` - :envvar:`OTEL_BSP_MAX_EXPORT_BATCH_SIZE` - :envvar:`OTEL_BSP_EXPORT_TIMEOUT` """ def __init__( self, span_exporter: SpanExporter, max_queue_size: int = None, schedule_delay_millis: float = None, max_export_batch_size: int = None, export_timeout_millis: float = None, ): if max_queue_size is None: max_queue_size = int(environ.get(OTEL_BSP_MAX_QUEUE_SIZE, 2048)) if schedule_delay_millis is None: schedule_delay_millis = int( environ.get(OTEL_BSP_SCHEDULE_DELAY, 5000) ) if max_export_batch_size is None: max_export_batch_size = int( environ.get(OTEL_BSP_MAX_EXPORT_BATCH_SIZE, 512) ) if export_timeout_millis is None: export_timeout_millis = int( environ.get(OTEL_BSP_EXPORT_TIMEOUT, 30000) ) if max_queue_size <= 0: raise ValueError("max_queue_size must be a positive integer.") if schedule_delay_millis <= 0: raise ValueError("schedule_delay_millis must be positive.") if max_export_batch_size <= 0: raise ValueError( "max_export_batch_size must be a positive integer." ) if max_export_batch_size > max_queue_size: raise ValueError( "max_export_batch_size must be less than or equal to max_queue_size." ) self.span_exporter = span_exporter self.queue = collections.deque( [], max_queue_size ) # type: typing.Deque[Span] self.worker_thread = threading.Thread( name="OtelBatchSpanProcessor", target=self.worker, daemon=True ) self.condition = threading.Condition(threading.Lock()) self._flush_request = None # type: typing.Optional[_FlushRequest] self.schedule_delay_millis = schedule_delay_millis self.max_export_batch_size = max_export_batch_size self.max_queue_size = max_queue_size self.export_timeout_millis = export_timeout_millis self.done = False # flag that indicates that spans are being dropped self._spans_dropped = False # precallocated list to send spans to exporter self.spans_list = [ None ] * self.max_export_batch_size # type: typing.List[typing.Optional[Span]] self.worker_thread.start()
[docs] def on_start( self, span: Span, parent_context: typing.Optional[Context] = None ) -> None: pass
[docs] def on_end(self, span: ReadableSpan) -> None: if self.done: logger.warning("Already shutdown, dropping span.") return if not span.context.trace_flags.sampled: return if len(self.queue) == self.max_queue_size: if not self._spans_dropped: logger.warning("Queue is full, likely spans will be dropped.") self._spans_dropped = True self.queue.appendleft(span) if len(self.queue) >= self.max_export_batch_size: with self.condition: self.condition.notify()
[docs] def worker(self): timeout = self.schedule_delay_millis / 1e3 flush_request = None # type: typing.Optional[_FlushRequest] while not self.done: with self.condition: if self.done: # done flag may have changed, avoid waiting break flush_request = self._get_and_unset_flush_request() if ( len(self.queue) < self.max_export_batch_size and flush_request is None ): self.condition.wait(timeout) flush_request = self._get_and_unset_flush_request() if not self.queue: # spurious notification, let's wait again, reset timeout timeout = self.schedule_delay_millis / 1e3 self._notify_flush_request_finished(flush_request) flush_request = None continue if self.done: # missing spans will be sent when calling flush break # subtract the duration of this export call to the next timeout start = _time_ns() self._export(flush_request) end = _time_ns() duration = (end - start) / 1e9 timeout = self.schedule_delay_millis / 1e3 - duration self._notify_flush_request_finished(flush_request) flush_request = None # there might have been a new flush request while export was running # and before the done flag switched to true with self.condition: shutdown_flush_request = self._get_and_unset_flush_request() # be sure that all spans are sent self._drain_queue() self._notify_flush_request_finished(flush_request) self._notify_flush_request_finished(shutdown_flush_request)
def _get_and_unset_flush_request( self, ) -> typing.Optional[_FlushRequest]: """Returns the current flush request and makes it invisible to the worker thread for subsequent calls. """ flush_request = self._flush_request self._flush_request = None if flush_request is not None: flush_request.num_spans = len(self.queue) return flush_request @staticmethod def _notify_flush_request_finished( flush_request: typing.Optional[_FlushRequest], ): """Notifies the flush initiator(s) waiting on the given request/event that the flush operation was finished. """ if flush_request is not None: flush_request.event.set() def _get_or_create_flush_request(self) -> _FlushRequest: """Either returns the current active flush event or creates a new one. The flush event will be visible and read by the worker thread before an export operation starts. Callers of a flush operation may wait on the returned event to be notified when the flush/export operation was finished. This method is not thread-safe, i.e. callers need to take care about synchronization/locking. """ if self._flush_request is None: self._flush_request = _FlushRequest() return self._flush_request def _export(self, flush_request: typing.Optional[_FlushRequest]): """Exports spans considering the given flush_request. In case of a given flush_requests spans are exported in batches until the number of exported spans reached or exceeded the number of spans in the flush request. In no flush_request was given at most max_export_batch_size spans are exported. """ if not flush_request: self._export_batch() return num_spans = flush_request.num_spans while self.queue: num_exported = self._export_batch() num_spans -= num_exported if num_spans <= 0: break def _export_batch(self) -> int: """Exports at most max_export_batch_size spans and returns the number of exported spans. """ idx = 0 # currently only a single thread acts as consumer, so queue.pop() will # not raise an exception while idx < self.max_export_batch_size and self.queue: self.spans_list[idx] = self.queue.pop() idx += 1 token = attach(set_value(_SUPPRESS_INSTRUMENTATION_KEY, True)) try: # Ignore type b/c the Optional[None]+slicing is too "clever" # for mypy self.span_exporter.export(self.spans_list[:idx]) # type: ignore except Exception: # pylint: disable=broad-except logger.exception("Exception while exporting Span batch.") detach(token) # clean up list for index in range(idx): self.spans_list[index] = None return idx def _drain_queue(self): """Export all elements until queue is empty. Can only be called from the worker thread context because it invokes `export` that is not thread safe. """ while self.queue: self._export_batch()
[docs] def force_flush(self, timeout_millis: int = None) -> bool: if timeout_millis is None: timeout_millis = self.export_timeout_millis if self.done: logger.warning("Already shutdown, ignoring call to force_flush().") return True with self.condition: flush_request = self._get_or_create_flush_request() # signal the worker thread to flush and wait for it to finish self.condition.notify_all() # wait for token to be processed ret = flush_request.event.wait(timeout_millis / 1e3) if not ret: logger.warning("Timeout was exceeded in force_flush().") return ret
[docs] def shutdown(self) -> None: # signal the worker thread to finish and then wait for it self.done = True with self.condition: self.condition.notify_all() self.worker_thread.join() self.span_exporter.shutdown()
[docs]class ConsoleSpanExporter(SpanExporter): """Implementation of :class:`SpanExporter` that prints spans to the console. This class can be used for diagnostic purposes. It prints the exported spans to the console STDOUT. """ def __init__( self, service_name: Optional[str] = None, out: typing.IO = sys.stdout, formatter: typing.Callable[ [ReadableSpan], str ] = lambda span: span.to_json() + linesep, ): self.out = out self.formatter = formatter self.service_name = service_name
[docs] def export(self, spans: typing.Sequence[ReadableSpan]) -> SpanExportResult: for span in spans: self.out.write(self.formatter(span)) self.out.flush() return SpanExportResult.SUCCESS