Source code for opentelemetry.exporter.zipkin.json

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
OpenTelemetry Zipkin JSON Exporter
----------------------------------

This library allows to export tracing data to `Zipkin <https://zipkin.io/>`_.

Usage
-----

The **OpenTelemetry Zipkin JSON Exporter** allows exporting of `OpenTelemetry`_
traces to `Zipkin`_. This exporter sends traces to the configured Zipkin
collector endpoint using JSON over HTTP and supports multiple versions (v1, v2).

.. _Zipkin: https://zipkin.io/
.. _OpenTelemetry: https://github.com/open-telemetry/opentelemetry-python/
.. _Specification: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/sdk-environment-variables.md#zipkin-exporter

.. code:: python

    import requests

    from opentelemetry import trace
    from opentelemetry.exporter.zipkin.json import ZipkinExporter
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import BatchSpanProcessor

    trace.set_tracer_provider(TracerProvider())
    tracer = trace.get_tracer(__name__)

    # create a ZipkinExporter
    zipkin_exporter = ZipkinExporter(
        # version=Protocol.V2
        # optional:
        # endpoint="http://localhost:9411/api/v2/spans",
        # local_node_ipv4="192.168.0.1",
        # local_node_ipv6="2001:db8::c001",
        # local_node_port=31313,
        # max_tag_value_length=256,
        # timeout=5 (in seconds),
        # session=requests.Session(),
    )

    # Create a BatchSpanProcessor and add the exporter to it
    span_processor = BatchSpanProcessor(zipkin_exporter)

    # add to the tracer
    trace.get_tracer_provider().add_span_processor(span_processor)

    with tracer.start_as_current_span("foo"):
        print("Hello world!")

The exporter supports the following environment variable for configuration:

- :envvar:`OTEL_EXPORTER_ZIPKIN_ENDPOINT`
- :envvar:`OTEL_EXPORTER_ZIPKIN_TIMEOUT`

API
---
"""

import logging
from os import environ
from typing import Optional, Sequence

import requests

from opentelemetry.exporter.zipkin.encoder import Protocol
from opentelemetry.exporter.zipkin.json.v1 import JsonV1Encoder
from opentelemetry.exporter.zipkin.json.v2 import JsonV2Encoder
from opentelemetry.exporter.zipkin.node_endpoint import IpInput, NodeEndpoint
from opentelemetry.sdk.environment_variables import (
    OTEL_EXPORTER_ZIPKIN_ENDPOINT,
    OTEL_EXPORTER_ZIPKIN_TIMEOUT,
)
from opentelemetry.sdk.resources import SERVICE_NAME
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.trace import Span

DEFAULT_ENDPOINT = "http://localhost:9411/api/v2/spans"
REQUESTS_SUCCESS_STATUS_CODES = (200, 202)

logger = logging.getLogger(__name__)


[docs]class ZipkinExporter(SpanExporter): def __init__( self, version: Protocol = Protocol.V2, endpoint: Optional[str] = None, local_node_ipv4: IpInput = None, local_node_ipv6: IpInput = None, local_node_port: Optional[int] = None, max_tag_value_length: Optional[int] = None, timeout: Optional[int] = None, session: Optional[requests.Session] = None, ): """Zipkin exporter. Args: version: The protocol version to be used. endpoint: The endpoint of the Zipkin collector. local_node_ipv4: Primary IPv4 address associated with this connection. local_node_ipv6: Primary IPv6 address associated with this connection. local_node_port: Depending on context, this could be a listen port or the client-side of a socket. max_tag_value_length: Max length string attribute values can have. timeout: Maximum time the Zipkin exporter will wait for each batch export. The default value is 10s. session: Connection session to the Zipkin collector endpoint. The tuple (local_node_ipv4, local_node_ipv6, local_node_port) is used to represent the network context of a node in the service graph. """ self.local_node = NodeEndpoint( local_node_ipv4, local_node_ipv6, local_node_port ) if endpoint is None: endpoint = ( environ.get(OTEL_EXPORTER_ZIPKIN_ENDPOINT) or DEFAULT_ENDPOINT ) self.endpoint = endpoint if version == Protocol.V1: self.encoder = JsonV1Encoder(max_tag_value_length) elif version == Protocol.V2: self.encoder = JsonV2Encoder(max_tag_value_length) self.session = session or requests.Session() self.session.headers.update( {"Content-Type": self.encoder.content_type()} ) self._closed = False self.timeout = timeout or int( environ.get(OTEL_EXPORTER_ZIPKIN_TIMEOUT, 10) )
[docs] def export(self, spans: Sequence[Span]) -> SpanExportResult: # After the call to Shutdown subsequent calls to Export are # not allowed and should return a Failure result if self._closed: logger.warning("Exporter already shutdown, ignoring batch") return SpanExportResult.FAILURE # Populate service_name from first span # We restrict any SpanProcessor to be only associated with a single # TracerProvider, so it is safe to assume that all Spans in a single # batch all originate from one TracerProvider (and in turn have all # the same service.name) if spans: service_name = spans[0].resource.attributes.get(SERVICE_NAME) if service_name: self.local_node.service_name = service_name result = self.session.post( url=self.endpoint, data=self.encoder.serialize(spans, self.local_node), timeout=self.timeout, ) if result.status_code not in REQUESTS_SUCCESS_STATUS_CODES: logger.error( "Traces cannot be uploaded; status code: %s, message %s", result.status_code, result.text, ) return SpanExportResult.FAILURE return SpanExportResult.SUCCESS
[docs] def shutdown(self) -> None: if self._closed: logger.warning("Exporter already shutdown, ignoring call") return self.session.close() self._closed = True
[docs] def force_flush(self, timeout_millis: int = 30000) -> bool: return True