Source code for opentelemetry.exporter.jaeger.thrift.send

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import base64
import logging
import math
import socket

from thrift.protocol import TBinaryProtocol, TCompactProtocol
from thrift.transport import THttpClient, TTransport

from opentelemetry.exporter.jaeger.thrift.gen.agent import Agent as agent
from opentelemetry.exporter.jaeger.thrift.gen.jaeger import Collector as jaeger

UDP_PACKET_MAX_LENGTH = 65000


logger = logging.getLogger(__name__)


[docs]class AgentClientUDP: """Implement a UDP client to agent. Args: host_name: The host name of the Jaeger server. port: The port of the Jaeger server. max_packet_size: Maximum size of UDP packet. client: Class for creating new client objects for agencies. split_oversized_batches: Re-emit oversized batches in smaller chunks. """ def __init__( self, host_name, port, max_packet_size=UDP_PACKET_MAX_LENGTH, client=agent.Client, split_oversized_batches=False, ): self.address = (host_name, port) self.max_packet_size = max_packet_size self.buffer = TTransport.TMemoryBuffer() self.client = client( iprot=TCompactProtocol.TCompactProtocol(trans=self.buffer) ) self.split_oversized_batches = split_oversized_batches
[docs] def emit(self, batch: jaeger.Batch): """ Args: batch: Object to emit Jaeger spans. """ # pylint: disable=protected-access self.client._seqid = 0 # truncate and reset the position of BytesIO object self.buffer._buffer.truncate(0) self.buffer._buffer.seek(0) self.client.emitBatch(batch) buff = self.buffer.getvalue() if len(buff) > self.max_packet_size: if self.split_oversized_batches and len(batch.spans) > 1: packets = math.ceil(len(buff) / self.max_packet_size) div = math.ceil(len(batch.spans) / packets) for packet in range(packets): start = packet * div end = (packet + 1) * div if start < len(batch.spans): self.emit( jaeger.Batch( process=batch.process, spans=batch.spans[start:end], ) ) else: logger.warning( "Data exceeds the max UDP packet size; size %r, max %r", len(buff), self.max_packet_size, ) return with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as udp_socket: udp_socket.sendto(buff, self.address)
[docs]class Collector: """Submits collected spans to Jaeger collector in jaeger.thrift format over binary thrift protocol. This is recommend option in cases where it is not feasible to deploy Jaeger Agent next to the application, for example, when the application code is running as AWS Lambda function. In these scenarios the Jaeger Clients can be configured to submit spans directly to the Collectors over HTTP/HTTPS. Args: thrift_url: Endpoint used to send spans directly to Collector the over HTTP. auth: Auth tuple that contains username and password for Basic Auth. timeout_in_millis: timeout for THttpClient. """ def __init__(self, thrift_url="", auth=None, timeout_in_millis=None): self.thrift_url = thrift_url self.auth = auth self.http_transport = THttpClient.THttpClient( uri_or_host=self.thrift_url ) if timeout_in_millis is not None: self.http_transport.setTimeout(timeout_in_millis) self.protocol = TBinaryProtocol.TBinaryProtocol(self.http_transport) # set basic auth header if auth is not None: auth_header = f"{auth[0]}:{auth[1]}" decoded = base64.b64encode(auth_header.encode()).decode("ascii") basic_auth = dict(Authorization=f"Basic {decoded}") self.http_transport.setCustomHeaders(basic_auth)
[docs] def submit(self, batch: jaeger.Batch): """Submits batches to Thrift HTTP Server through Binary Protocol. Args: batch: Object to emit Jaeger spans. """ batch.write(self.protocol) self.http_transport.flush() code = self.http_transport.code msg = self.http_transport.message if code >= 300 or code < 200: logger.error( "Traces cannot be uploaded; HTTP status code: %s, message: %s", code, msg, )