Source code for opentelemetry.baggage.propagation

# Copyright The OpenTelemetry Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from logging import getLogger
from re import split
from typing import Iterable, List, Mapping, Optional, Set
from urllib.parse import quote_plus, unquote_plus

from opentelemetry.baggage import _is_valid_pair, get_all, set_baggage
from opentelemetry.context import get_current
from opentelemetry.context.context import Context
from opentelemetry.propagators import textmap
from opentelemetry.util.re import _DELIMITER_PATTERN

_logger = getLogger(__name__)


[docs]class W3CBaggagePropagator(textmap.TextMapPropagator): """Extracts and injects Baggage which is used to annotate telemetry.""" _MAX_HEADER_LENGTH = 8192 _MAX_PAIR_LENGTH = 4096 _MAX_PAIRS = 180 _BAGGAGE_HEADER_NAME = "baggage"
[docs] def extract( self, carrier: textmap.CarrierT, context: Optional[Context] = None, getter: textmap.Getter[textmap.CarrierT] = textmap.default_getter, ) -> Context: """Extract Baggage from the carrier. See `opentelemetry.propagators.textmap.TextMapPropagator.extract` """ if context is None: context = get_current() header = _extract_first_element( getter.get(carrier, self._BAGGAGE_HEADER_NAME) ) if not header: return context if len(header) > self._MAX_HEADER_LENGTH: _logger.warning( "Baggage header `%s` exceeded the maximum number of bytes per baggage-string", header, ) return context baggage_entries: List[str] = split(_DELIMITER_PATTERN, header) total_baggage_entries = self._MAX_PAIRS if len(baggage_entries) > self._MAX_PAIRS: _logger.warning( "Baggage header `%s` exceeded the maximum number of list-members", header, ) for entry in baggage_entries: if len(entry) > self._MAX_PAIR_LENGTH: _logger.warning( "Baggage entry `%s` exceeded the maximum number of bytes per list-member", entry, ) continue if not entry: # empty string continue try: name, value = entry.split("=", 1) except Exception: # pylint: disable=broad-except _logger.warning( "Baggage list-member `%s` doesn't match the format", entry ) continue if not _is_valid_pair(name, value): _logger.warning("Invalid baggage entry: `%s`", entry) continue name = unquote_plus(name).strip() value = unquote_plus(value).strip() context = set_baggage( name, value, context=context, ) total_baggage_entries -= 1 if total_baggage_entries == 0: break return context
[docs] def inject( self, carrier: textmap.CarrierT, context: Optional[Context] = None, setter: textmap.Setter[textmap.CarrierT] = textmap.default_setter, ) -> None: """Injects Baggage into the carrier. See `opentelemetry.propagators.textmap.TextMapPropagator.inject` """ baggage_entries = get_all(context=context) if not baggage_entries: return baggage_string = _format_baggage(baggage_entries) setter.set(carrier, self._BAGGAGE_HEADER_NAME, baggage_string)
@property def fields(self) -> Set[str]: """Returns a set with the fields set in `inject`.""" return {self._BAGGAGE_HEADER_NAME}
def _format_baggage(baggage_entries: Mapping[str, object]) -> str: return ",".join( quote_plus(str(key)) + "=" + quote_plus(str(value)) for key, value in baggage_entries.items() ) def _extract_first_element( items: Optional[Iterable[textmap.CarrierT]], ) -> Optional[textmap.CarrierT]: if items is None: return None return next(iter(items), None)