diff --git a/sentry_sdk/_span_batcher.py b/sentry_sdk/_span_batcher.py new file mode 100644 index 0000000000..b7c9ef9e5c --- /dev/null +++ b/sentry_sdk/_span_batcher.py @@ -0,0 +1,132 @@ +import threading +from collections import defaultdict +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +from sentry_sdk._batcher import Batcher +from sentry_sdk.consts import SPANSTATUS +from sentry_sdk.envelope import Envelope, Item, PayloadRef +from sentry_sdk.utils import format_timestamp, serialize_attribute, safe_repr + +if TYPE_CHECKING: + from typing import Any, Callable, Optional + from sentry_sdk._tracing import SpanStatus, StreamedSpan + from sentry_sdk._types import SerializedAttributeValue + + +class SpanBatcher(Batcher["StreamedSpan"]): + # TODO[span-first]: size-based flushes + MAX_BEFORE_FLUSH = 1000 + MAX_BEFORE_DROP = 5000 + FLUSH_WAIT_TIME = 5.0 + + TYPE = "span" + CONTENT_TYPE = "application/vnd.sentry.items.span.v2+json" + + def __init__( + self, + capture_func: "Callable[[Envelope], None]", + record_lost_func: "Callable[..., None]", + ) -> None: + # Spans from different traces cannot be emitted in the same envelope + # since the envelope contains a shared trace header. That's why we bucket + # by trace_id, so that we can then send the buckets each in its own + # envelope. + # trace_id -> span buffer + self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list) + self._capture_func = capture_func + self._record_lost_func = record_lost_func + self._running = True + self._lock = threading.Lock() + + self._flush_event: "threading.Event" = threading.Event() + + self._flusher: "Optional[threading.Thread]" = None + self._flusher_pid: "Optional[int]" = None + + def get_size(self) -> int: + # caller is responsible for locking before checking this + return sum(len(buffer) for buffer in self._span_buffer.values()) + + def add(self, span: "StreamedSpan") -> None: + if not self._ensure_thread() or self._flusher is None: + return None + + with self._lock: + size = self.get_size() + if size >= self.MAX_BEFORE_DROP: + self._record_lost_func( + reason="queue_overflow", + data_category="span", + quantity=1, + ) + return None + + self._span_buffer[span.trace_id].append(span) + if size + 1 >= self.MAX_BEFORE_FLUSH: + self._flush_event.set() + + @staticmethod + def _to_transport_format(item: "StreamedSpan") -> "Any": + res: "dict[str, Any]" = { + "trace_id": item.trace_id, + "span_id": item.span_id, + "name": item.get_name(), + "status": item.status.value, + "is_segment": item.is_segment(), + "start_timestamp": item.start_timestamp.timestamp(), # TODO[span-first] + } + + if item.timestamp: + # this is here to make mypy happy + res["end_timestamp"] = item.timestamp.timestamp() + + if item.parent_span_id: + res["parent_span_id"] = item.parent_span_id + + if item.attributes: + res["attributes"] = { + k: serialize_attribute(v) for (k, v) in item.attributes.items() + } + + return res + + def _flush(self) -> "Optional[Envelope]": + with self._lock: + if len(self._span_buffer) == 0: + return None + + for trace_id, spans in self._span_buffer.items(): + if spans: + dsc = spans[0].dynamic_sampling_context() + # XXX[span-first]: empty dsc? + + envelope = Envelope( + headers={ + "sent_at": format_timestamp(datetime.now(timezone.utc)), + "trace": dsc, + } + ) + + envelope.add_item( + Item( + type="span", + content_type="application/vnd.sentry.items.span.v2+json", + headers={ + "item_count": len(spans), + }, + payload=PayloadRef( + json={ + "items": [ + self._to_transport_format(span) + for span in spans + ] + } + ), + ) + ) + + self._span_buffer.clear() + + self._capture_func(envelope) + return envelope diff --git a/sentry_sdk/_tracing.py b/sentry_sdk/_tracing.py new file mode 100644 index 0000000000..96cac67447 --- /dev/null +++ b/sentry_sdk/_tracing.py @@ -0,0 +1,464 @@ +import uuid +from datetime import datetime, timedelta, timezone +from enum import Enum +from typing import TYPE_CHECKING + +import sentry_sdk +from sentry_sdk.consts import SPANDATA +from sentry_sdk.profiler.continuous_profiler import get_profiler_id +from sentry_sdk.tracing_utils import ( + Baggage, + _generate_sample_rand, + has_span_streaming_enabled, + has_tracing_enabled, +) +from sentry_sdk.utils import ( + capture_internal_exceptions, + format_attribute, + get_current_thread_meta, + is_valid_sample_rate, + logger, + nanosecond_time, + should_be_treated_as_error, +) + +if TYPE_CHECKING: + from typing import Any, Optional, Union + from sentry_sdk._types import Attributes, AttributeValue, SamplingContext + from sentry_sdk.scope import Scope + + +FLAGS_CAPACITY = 10 + +""" +TODO[span-first] / notes +- redis, http, subprocess breadcrumbs (maybe_create_breadcrumbs_from_span) work + on op, change or ignore? +- @trace +- tags +- initial status: OK? or unset? +- dropped spans are not migrated +- recheck transaction.finish <-> Streamedspan.end +- profiling: drop transaction based +- profiling: actually send profiles +- maybe: use getters/setter OR properties but not both +- add size-based flushing to buffer(s) +- migrate transaction sample_rand logic +- remove deprecated profiler impl +- {custom_}sampling_context? -> if this is going to die, we need to revive the + potel pr that went through the integrations and got rid of custom_sampling_context + in favor of attributes +- noop spans +- add a switcher to top level API that figures out which @trace to enable + +Notes: +- removed ability to provide a start_timestamp +- moved _flags_capacity to a const +""" + + +def start_span( + name: str, + attributes: "Optional[Attributes]" = None, + parent_span: "Optional[StreamedSpan]" = None, +) -> "StreamedSpan": + return sentry_sdk.get_current_scope().start_streamed_span( + name, attributes, parent_span + ) + + +class SpanStatus(str, Enum): + OK = "ok" + ERROR = "error" + + def __str__(self) -> str: + return self.value + + +# Segment source, see +# https://getsentry.github.io/sentry-conventions/generated/attributes/sentry.html#sentryspansource +class SegmentSource(str, Enum): + COMPONENT = "component" + CUSTOM = "custom" + ROUTE = "route" + TASK = "task" + URL = "url" + VIEW = "view" + + def __str__(self) -> str: + return self.value + + +# These are typically high cardinality and the server hates them +LOW_QUALITY_SEGMENT_SOURCES = [ + SegmentSource.URL, +] + + +SOURCE_FOR_STYLE = { + "endpoint": SegmentSource.COMPONENT, + "function_name": SegmentSource.COMPONENT, + "handler_name": SegmentSource.COMPONENT, + "method_and_path_pattern": SegmentSource.ROUTE, + "path": SegmentSource.URL, + "route_name": SegmentSource.COMPONENT, + "route_pattern": SegmentSource.ROUTE, + "uri_template": SegmentSource.ROUTE, + "url": SegmentSource.ROUTE, +} + + +class NoOpStreamedSpan: + pass + + +class StreamedSpan: + """ + A span holds timing information of a block of code. + + Spans can have multiple child spans thus forming a span tree. + + This is the Span First span implementation. The original transaction-based + span implementation lives in tracing.Span. + """ + + __slots__ = ( + "name", + "attributes", + "_span_id", + "_trace_id", + "parent_span_id", + "segment", + "_sampled", + "parent_sampled", + "start_timestamp", + "timestamp", + "status", + "_start_timestamp_monotonic_ns", + "_scope", + "_flags", + "_context_manager_state", + "_profile", + "_continuous_profile", + "_baggage", + "sample_rate", + "_sample_rand", + "source", + ) + + def __init__( + self, + *, + name: str, + scope: "Scope", + attributes: "Optional[Attributes]" = None, + # TODO[span-first]: would be good to actually take this propagation + # context stuff directly from the PropagationContext, but for that + # we'd actually need to refactor PropagationContext to stay in sync + # with what's going on (e.g. update the current span_id) and not just + # update when a trace is continued + trace_id: "Optional[str]" = None, + parent_span_id: "Optional[str]" = None, + parent_sampled: "Optional[bool]" = None, + baggage: "Optional[Baggage]" = None, + segment: "Optional[StreamedSpan]" = None, + ) -> None: + self._scope = scope + + self.name: str = name + self.attributes: "Attributes" = attributes or {} + + self._trace_id = trace_id + self.parent_span_id = parent_span_id + self.parent_sampled = parent_sampled + self.segment = segment or self + + self.start_timestamp = datetime.now(timezone.utc) + + try: + # profiling depends on this value and requires that + # it is measured in nanoseconds + self._start_timestamp_monotonic_ns = nanosecond_time() + except AttributeError: + pass + + self.timestamp: "Optional[datetime]" = None + self._span_id: "Optional[str]" = None + + self.status: SpanStatus = SpanStatus.OK + self.source: "Optional[SegmentSource]" = SegmentSource.CUSTOM + # XXX[span-first] ^ populate this correctly + + self._sampled: "Optional[bool]" = None + self.sample_rate: "Optional[float]" = None + + # XXX[span-first]: just do this for segments? + self._baggage = baggage + baggage_sample_rand = ( + None if self._baggage is None else self._baggage._sample_rand() + ) + if baggage_sample_rand is not None: + self._sample_rand = baggage_sample_rand + else: + self._sample_rand = _generate_sample_rand(self.trace_id) + + self._flags: dict[str, bool] = {} + self._profile = None + self._continuous_profile = None + + self._update_active_thread() + self._set_profiler_id(get_profiler_id()) + + def __repr__(self) -> str: + return ( + f"<{self.__class__.__name__}(" + f"name={self.name}, " + f"trace_id={self.trace_id}, " + f"span_id={self.span_id}, " + f"parent_span_id={self.parent_span_id}, " + f"sampled={self.sampled})>" + ) + + def __enter__(self) -> "StreamedSpan": + scope = self._scope or sentry_sdk.get_current_scope() + old_span = scope.span + scope.span = self + self._context_manager_state = (scope, old_span) + + if self.is_segment() and self._profile is not None: + self._profile.__enter__() + + return self + + def __exit__( + self, ty: "Optional[Any]", value: "Optional[Any]", tb: "Optional[Any]" + ) -> None: + if self.is_segment(): + if self._profile is not None: + self._profile.__exit__(ty, value, tb) + + if self._continuous_profile is not None: + self._continuous_profile.stop() + + if value is not None and should_be_treated_as_error(ty, value): + self.set_status(SpanStatus.ERROR) + + with capture_internal_exceptions(): + scope, old_span = self._context_manager_state + del self._context_manager_state + self.end(scope=scope) + scope.span = old_span + + def end( + self, + end_timestamp: "Optional[Union[float, datetime]]" = None, + scope: "Optional[sentry_sdk.Scope]" = None, + ) -> None: + """ + Set the end timestamp of the span and queue it for sending. + + :param end_timestamp: Optional timestamp that should + be used as timestamp instead of the current time. + :param scope: The scope to use. + """ + client = sentry_sdk.get_client() + if not client.is_active(): + return + + scope: "Optional[sentry_sdk.Scope]" = ( + scope or self._scope or sentry_sdk.get_current_scope() + ) + + # Explicit check against False needed because self.sampled might be None + if self.sampled is False: + logger.debug("Discarding span because sampled = False") + + # This is not entirely accurate because discards here are not + # exclusively based on sample rate but also traces sampler, but + # we handle this the same here. + if client.transport and has_tracing_enabled(client.options): + if client.monitor and client.monitor.downsample_factor > 0: + reason = "backpressure" + else: + reason = "sample_rate" + + client.transport.record_lost_event(reason, data_category="span") + + return + + if self.sampled is None: + logger.warning("Discarding transaction without sampling decision.") + + if self.timestamp is not None: + # This span is already finished, ignore. + return + + try: + if end_timestamp: + if isinstance(end_timestamp, float): + end_timestamp = datetime.fromtimestamp(end_timestamp, timezone.utc) + self.timestamp = end_timestamp + else: + elapsed = nanosecond_time() - self._start_timestamp_monotonic_ns + self.timestamp = self.start_timestamp + timedelta( + microseconds=elapsed / 1000 + ) + except AttributeError: + self.timestamp = datetime.now(timezone.utc) + + if self.segment.sampled: # XXX this should just use its own sampled + sentry_sdk.get_current_scope()._capture_span(self) + + def get_attributes(self) -> "Attributes": + return self.attributes + + def set_attribute(self, key: str, value: "AttributeValue") -> None: + self.attributes[key] = format_attribute(value) + + def set_attributes(self, attributes: "Attributes") -> None: + for key, value in attributes.items(): + self.set_attribute(key, value) + + def set_status(self, status: SpanStatus) -> None: + self.status = status + + def get_name(self) -> str: + return self.name + + def set_name(self, name: str) -> None: + self.name = name + + def set_flag(self, flag: str, result: bool) -> None: + if len(self._flags) < FLAGS_CAPACITY: + self._flags[flag] = result + + def is_segment(self) -> bool: + return self.segment == self + + @property + def span_id(self) -> str: + if not self._span_id: + self._span_id = uuid.uuid4().hex[16:] + + return self._span_id + + @property + def trace_id(self) -> str: + if not self._trace_id: + self._trace_id = uuid.uuid4().hex + + return self._trace_id + + @property + def sampled(self) -> "Optional[bool]": + if self._sampled is not None: + return self._sampled + + if not self.is_segment(): + self._sampled = self.parent_sampled + + return self._sampled + + def dynamic_sampling_context(self) -> dict[str, str]: + return self.segment._get_baggage().dynamic_sampling_context() + + def _update_active_thread(self) -> None: + thread_id, thread_name = get_current_thread_meta() + self._set_thread(thread_id, thread_name) + + def _set_thread( + self, thread_id: "Optional[int]", thread_name: "Optional[str]" + ) -> None: + if thread_id is not None: + self.set_attribute(SPANDATA.THREAD_ID, str(thread_id)) + + if thread_name is not None: + self.set_attribute(SPANDATA.THREAD_NAME, thread_name) + + def _set_profiler_id(self, profiler_id: "Optional[str]") -> None: + if profiler_id is not None: + self.set_attribute(SPANDATA.PROFILER_ID, profiler_id) + + def _set_http_status(self, http_status: int) -> None: + self.set_attribute(SPANDATA.HTTP_STATUS_CODE, http_status) + + if http_status >= 400: + self.set_status(SpanStatus.ERROR) + else: + self.set_status(SpanStatus.OK) + + def _get_baggage(self) -> "Baggage": + """ + Return the :py:class:`~sentry_sdk.tracing_utils.Baggage` associated with + the segment. + + The first time a new baggage with Sentry items is made, it will be frozen. + """ + if not self._baggage or self._baggage.mutable: + self._baggage = Baggage.populate_from_segment(self) + + return self._baggage + + def _set_sampling_decision(self, sampling_context: "SamplingContext") -> None: + """ + Set the segment's sampling decision, inherited by all child spans. + """ + client = sentry_sdk.get_client() + + # nothing to do if tracing is disabled + if not has_tracing_enabled(client.options): + self._sampled = False + return + + if not self.is_segment(): + return + + traces_sampler_defined = callable(client.options.get("traces_sampler")) + + # We would have bailed already if neither `traces_sampler` nor + # `traces_sample_rate` were defined, so one of these should work; prefer + # the hook if so + if traces_sampler_defined: + sample_rate = client.options["traces_sampler"](sampling_context) + else: + if sampling_context["parent_sampled"] is not None: + sample_rate = sampling_context["parent_sampled"] + else: + sample_rate = client.options["traces_sample_rate"] + + # Since this is coming from the user (or from a function provided by the + # user), who knows what we might get. (The only valid values are + # booleans or numbers between 0 and 1.) + if not is_valid_sample_rate(sample_rate, source="Tracing"): + logger.warning( + f"[Tracing] Discarding {self.name} because of invalid sample rate." + ) + self._sampled = False + return + + self.sample_rate = float(sample_rate) + + if client.monitor: + self.sample_rate /= 2**client.monitor.downsample_factor + + # if the function returned 0 (or false), or if `traces_sample_rate` is + # 0, it's a sign the transaction should be dropped + if not self.sample_rate: + if traces_sampler_defined: + reason = "traces_sampler returned 0 or False" + else: + reason = "traces_sample_rate is set to 0" + + logger.debug(f"[Tracing] Discarding {self.name} because {reason}") + self._sampled = False + return + + # Now we roll the dice. + self._sampled = self._sample_rand < self.sample_rate + + if self.sampled: + logger.debug(f"[Tracing] Starting {self.name}") + else: + logger.debug( + f"[Tracing] Discarding {self.name} because it's not included in the random sample (sampling rate = {self.sample_rate})" + ) diff --git a/sentry_sdk/api.py b/sentry_sdk/api.py index c4e2229938..2c0603ec96 100644 --- a/sentry_sdk/api.py +++ b/sentry_sdk/api.py @@ -7,6 +7,7 @@ from sentry_sdk.consts import INSTRUMENTER from sentry_sdk.scope import Scope, _ScopeManager, new_scope, isolation_scope from sentry_sdk.tracing import NoOpSpan, Transaction, trace +from sentry_sdk._tracing import StreamedSpan from sentry_sdk.crons import monitor from typing import TYPE_CHECKING @@ -385,7 +386,9 @@ def set_measurement(name: str, value: float, unit: "MeasurementUnit" = "") -> No transaction.set_measurement(name, value, unit) -def get_current_span(scope: "Optional[Scope]" = None) -> "Optional[Span]": +def get_current_span( + scope: "Optional[Scope]" = None, +) -> "Optional[Union[Span, StreamedSpan]]": """ Returns the currently active span if there is one running, otherwise `None` """ @@ -501,6 +504,16 @@ def update_current_span( if current_span is None: return + if isinstance(current_span, StreamedSpan): + warnings.warn( + "The `update_current_span` API isn't available in streaming mode. " + "Retrieve the current span with get_current_span() and use its API " + "directly.", + DeprecationWarning, + stacklevel=2, + ) + return + if op is not None: current_span.op = op diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index fb14d8e36a..ff47000bb8 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -11,6 +11,7 @@ import sentry_sdk from sentry_sdk._compat import PY37, check_uwsgi_thread_support from sentry_sdk._metrics_batcher import MetricsBatcher +from sentry_sdk._span_batcher import SpanBatcher from sentry_sdk.utils import ( AnnotatedValue, ContextVar, @@ -31,6 +32,7 @@ ) from sentry_sdk.serializer import serialize from sentry_sdk.tracing import trace +from sentry_sdk.tracing_utils import has_span_streaming_enabled from sentry_sdk.transport import BaseHttpTransport, make_transport from sentry_sdk.consts import ( SPANDATA, @@ -67,6 +69,7 @@ from sentry_sdk.scope import Scope from sentry_sdk.session import Session from sentry_sdk.spotlight import SpotlightClient + from sentry_sdk._tracing import StreamedSpan from sentry_sdk.transport import Transport, Item from sentry_sdk._log_batcher import LogBatcher from sentry_sdk._metrics_batcher import MetricsBatcher @@ -188,6 +191,7 @@ def __init__(self, options: "Optional[Dict[str, Any]]" = None) -> None: self.monitor: "Optional[Monitor]" = None self.log_batcher: "Optional[LogBatcher]" = None self.metrics_batcher: "Optional[MetricsBatcher]" = None + self.span_batcher: "Optional[SpanBatcher]" = None self.integrations: "dict[str, Integration]" = {} def __getstate__(self, *args: "Any", **kwargs: "Any") -> "Any": @@ -224,6 +228,9 @@ def _capture_log(self, log: "Log", scope: "Scope") -> None: def _capture_metric(self, metric: "Metric", scope: "Scope") -> None: pass + def _capture_span(self, span: "StreamedSpan", scope: "Scope") -> None: + pass + def capture_session(self, *args: "Any", **kwargs: "Any") -> None: return None @@ -399,6 +406,13 @@ def _record_lost_event( record_lost_func=_record_lost_event, ) + self.span_batcher = None + if has_span_streaming_enabled(self.options): + self.span_batcher = SpanBatcher( + capture_func=_capture_envelope, + record_lost_func=_record_lost_event, + ) + max_request_body_size = ("always", "never", "small", "medium") if self.options["max_request_body_size"] not in max_request_body_size: raise ValueError( @@ -909,7 +923,10 @@ def capture_event( return return_value def _capture_telemetry( - self, telemetry: "Optional[Union[Log, Metric]]", ty: str, scope: "Scope" + self, + telemetry: "Optional[Union[Log, Metric, StreamedSpan]]", + ty: str, + scope: "Scope", ) -> None: # Capture attributes-based telemetry (logs, metrics, spansV2) if telemetry is None: @@ -922,6 +939,7 @@ def _capture_telemetry( before_send = get_before_send_log(self.options) elif ty == "metric": before_send = get_before_send_metric(self.options) # type: ignore + # no before_send for spans if before_send is not None: telemetry = before_send(telemetry, {}) # type: ignore @@ -934,6 +952,8 @@ def _capture_telemetry( batcher = self.log_batcher elif ty == "metric": batcher = self.metrics_batcher # type: ignore + elif ty == "span": + batcher = self.span_batcher # type: ignore if batcher is not None: batcher.add(telemetry) # type: ignore @@ -944,6 +964,9 @@ def _capture_log(self, log: "Optional[Log]", scope: "Scope") -> None: def _capture_metric(self, metric: "Optional[Metric]", scope: "Scope") -> None: self._capture_telemetry(metric, "metric", scope) + def _capture_span(self, span: "Optional[StreamedSpan]", scope: "Scope") -> None: + self._capture_telemetry(span, "span", scope) + def capture_session( self, session: "Session", @@ -993,6 +1016,8 @@ def close( self.log_batcher.kill() if self.metrics_batcher is not None: self.metrics_batcher.kill() + if self.span_batcher is not None: + self.span_batcher.kill() if self.monitor: self.monitor.kill() self.transport.kill() @@ -1018,6 +1043,8 @@ def flush( self.log_batcher.flush() if self.metrics_batcher is not None: self.metrics_batcher.flush() + if self.span_batcher is not None: + self.span_batcher.flush() self.transport.flush(timeout=timeout, callback=callback) def __enter__(self) -> "_Client": diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index 59d3997c9a..c095467c8e 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -82,6 +82,7 @@ class CompressionAlgo(Enum): "before_send_log": Optional[Callable[[Log, Hint], Optional[Log]]], "enable_metrics": Optional[bool], "before_send_metric": Optional[Callable[[Metric, Hint], Optional[Metric]]], + "trace_lifecycle": Optional[Literal["static", "stream"]], }, total=False, ) diff --git a/sentry_sdk/envelope.py b/sentry_sdk/envelope.py index 307fb26fd6..5e52c6196f 100644 --- a/sentry_sdk/envelope.py +++ b/sentry_sdk/envelope.py @@ -253,6 +253,8 @@ def data_category(self) -> "EventDataCategory": return "session" elif ty == "attachment": return "attachment" + elif ty == "span": + return "span" elif ty == "transaction": return "transaction" elif ty == "event": diff --git a/sentry_sdk/scope.py b/sentry_sdk/scope.py index 6df26690c8..99a4edd23c 100644 --- a/sentry_sdk/scope.py +++ b/sentry_sdk/scope.py @@ -29,9 +29,11 @@ from sentry_sdk.tracing_utils import ( Baggage, has_tracing_enabled, + has_span_streaming_enabled, normalize_incoming_data, PropagationContext, ) +from sentry_sdk._tracing import StreamedSpan from sentry_sdk.tracing import ( BAGGAGE_HEADER_NAME, SENTRY_TRACE_HEADER_NAME, @@ -577,6 +579,13 @@ def get_traceparent(self, *args: "Any", **kwargs: "Any") -> "Optional[str]": # If we have an active span, return traceparent from there if has_tracing_enabled(client.options) and self.span is not None: + if isinstance(self.span, StreamedSpan): + warnings.warn( + "Scope.get_traceparent is not available in streaming mode.", + DeprecationWarning, + stacklevel=2, + ) + return None return self.span.to_traceparent() # else return traceparent from the propagation context @@ -591,6 +600,13 @@ def get_baggage(self, *args: "Any", **kwargs: "Any") -> "Optional[Baggage]": # If we have an active span, return baggage from there if has_tracing_enabled(client.options) and self.span is not None: + if isinstance(self.span, StreamedSpan): + warnings.warn( + "Scope.get_baggage is not available in streaming mode.", + DeprecationWarning, + stacklevel=2, + ) + return None return self.span.to_baggage() # else return baggage from the propagation context @@ -601,6 +617,14 @@ def get_trace_context(self) -> "Dict[str, Any]": Returns the Sentry "trace" context from the Propagation Context. """ if has_tracing_enabled(self.get_client().options) and self._span is not None: + if isinstance(self._span, StreamedSpan): + warnings.warn( + "Scope.get_trace_context is not available in streaming mode.", + DeprecationWarning, + stacklevel=2, + ) + return {} + return self._span.get_trace_context() # if we are tracing externally (otel), those values take precedence @@ -665,6 +689,15 @@ def iter_trace_propagation_headers( span = kwargs.pop("span", None) span = span or self.span + if isinstance(span, StreamedSpan): + warnings.warn( + "Scope.iter_trace_propagation_headers is not available in " + "streaming mode.", + DeprecationWarning, + stacklevel=2, + ) + return None + if has_tracing_enabled(client.options) and span is not None: for header in span.iter_headers(): yield header @@ -706,7 +739,7 @@ def clear(self) -> None: self.clear_breadcrumbs() self._should_capture: bool = True - self._span: "Optional[Span]" = None + self._span: "Optional[Union[Span, StreamedSpan]]" = None self._session: "Optional[Session]" = None self._force_auto_session_tracking: "Optional[bool]" = None @@ -758,6 +791,14 @@ def transaction(self) -> "Any": if self._span is None: return None + if isinstance(self._span, StreamedSpan): + warnings.warn( + "Scope.transaction is not available in streaming mode.", + DeprecationWarning, + stacklevel=2, + ) + return None + # there is an orphan span on the scope if self._span.containing_transaction is None: return None @@ -787,17 +828,34 @@ def transaction(self, value: "Any") -> None: "Assigning to scope.transaction directly is deprecated: use scope.set_transaction_name() instead." ) self._transaction = value - if self._span and self._span.containing_transaction: - self._span.containing_transaction.name = value + if self._span: + if isinstance(self._span, StreamedSpan): + warnings.warn( + "Scope.transaction is not available in streaming mode.", + DeprecationWarning, + stacklevel=2, + ) + return None + + if self._span.containing_transaction: + self._span.containing_transaction.name = value def set_transaction_name(self, name: str, source: "Optional[str]" = None) -> None: """Set the transaction name and optionally the transaction source.""" self._transaction = name + if self._span: + if isinstance(self._span, StreamedSpan): + warnings.warn( + "Scope.set_transaction_name is not available in streaming mode.", + DeprecationWarning, + stacklevel=2, + ) + return None - if self._span and self._span.containing_transaction: - self._span.containing_transaction.name = name - if source: - self._span.containing_transaction.source = source + if self._span.containing_transaction: + self._span.containing_transaction.name = name + if source: + self._span.containing_transaction.source = source if source: self._transaction_info["source"] = source @@ -820,12 +878,12 @@ def set_user(self, value: "Optional[Dict[str, Any]]") -> None: session.update(user=value) @property - def span(self) -> "Optional[Span]": + def span(self) -> "Optional[Union[Span, StreamedSpan]]": """Get/set current tracing span or transaction.""" return self._span @span.setter - def span(self, span: "Optional[Span]") -> None: + def span(self, span: "Optional[Union[Span, StreamedSpan]]") -> None: self._span = span # XXX: this differs from the implementation in JS, there Scope.setSpan # does not set Scope._transactionName. @@ -1114,6 +1172,15 @@ def start_span( be removed in the next major version. Going forward, it should only be used by the SDK itself. """ + client = sentry_sdk.get_client() + if has_span_streaming_enabled(client.options): + warnings.warn( + "Scope.start_span is not available in streaming mode.", + DeprecationWarning, + stacklevel=2, + ) + return NoOpSpan() + if kwargs.get("description") is not None: warnings.warn( "The `description` parameter is deprecated. Please use `name` instead.", @@ -1133,6 +1200,9 @@ def start_span( # get current span or transaction span = self.span or self.get_isolation_scope().span + if isinstance(span, StreamedSpan): + # make mypy happy + return NoOpSpan() if span is None: # New spans get the `trace_id` from the scope @@ -1147,6 +1217,65 @@ def start_span( return span + def start_streamed_span( + self, + name: str, + attributes: "Optional[Attributes]" = None, + parent_span: "Optional[StreamedSpan]" = None, + ) -> "StreamedSpan": + # TODO: rename to start_span once we drop the old API + if parent_span is None: + # Get currently active span + parent_span = self.span or self.get_isolation_scope().span + + # If no specific parent_span provided and there is no currently + # active span, this is a segment + if parent_span is None: + propagation_context = self.get_active_propagation_context() + span = StreamedSpan( + name=name, + attributes=attributes, + scope=self, + segment=None, + trace_id=propagation_context.trace_id, + parent_span_id=propagation_context.parent_span_id, + parent_sampled=propagation_context.parent_sampled, + baggage=propagation_context.baggage, + ) + + try_autostart_continuous_profiler() + + # XXX[span-first]: no sampling context? + sampling_context = { + "transaction_context": { + "trace_id": span.trace_id, + "span_id": span.span_id, + "parent_span_id": span.parent_span_id, + }, + "parent_sampled": span.parent_sampled, + "attributes": span.attributes, + } + # Use traces_sample_rate, traces_sampler, and/or inheritance to make a + # sampling decision + span._set_sampling_decision(sampling_context=sampling_context) + + return span + + # This is a child span; take propagation context from the parent span + with new_scope(): + span = StreamedSpan( + name=name, + attributes=attributes, + scope=self, + trace_id=parent_span.trace_id, + parent_span_id=parent_span.span_id, + parent_sampled=parent_span.sampled, + segment=parent_span.segment, + # XXX[span-first]: baggage? + ) + + return span + def continue_trace( self, environ_or_headers: "Dict[str, Any]", @@ -1180,6 +1309,9 @@ def continue_trace( **optional_kwargs, ) + def set_propagation_context(self, environ_or_headers: "dict[str, Any]") -> None: + self.generate_propagation_context(environ_or_headers) + def capture_event( self, event: "Event", @@ -1253,6 +1385,17 @@ def _capture_metric(self, metric: "Optional[Metric]") -> None: client._capture_metric(metric, scope=merged_scope) + def _capture_span(self, span: "Optional[StreamedSpan]") -> None: + if span is None: + return + + client = self.get_client() + if not has_span_streaming_enabled(client.options): + return + + merged_scope = self._merge_scopes() + client._capture_span(span, scope=merged_scope) + def capture_message( self, message: str, @@ -1497,16 +1640,25 @@ def _apply_flags_to_event( ) def _apply_scope_attributes_to_telemetry( - self, telemetry: "Union[Log, Metric]" + self, telemetry: "Union[Log, Metric, StreamedSpan]" ) -> None: + # TODO: turn Logs, Metrics into actual classes + if isinstance(telemetry, dict): + attributes = telemetry["attributes"] + else: + attributes = telemetry.attributes + for attribute, value in self._attributes.items(): - if attribute not in telemetry["attributes"]: - telemetry["attributes"][attribute] = value + if attribute not in attributes: + attributes[attribute] = value def _apply_user_attributes_to_telemetry( - self, telemetry: "Union[Log, Metric]" + self, telemetry: "Union[Log, Metric, StreamedSpan]" ) -> None: - attributes = telemetry["attributes"] + if isinstance(telemetry, dict): + attributes = telemetry["attributes"] + else: + attributes = telemetry.attributes if not should_send_default_pii() or self._user is None: return @@ -1626,16 +1778,19 @@ def apply_to_event( return event @_disable_capture - def apply_to_telemetry(self, telemetry: "Union[Log, Metric]") -> None: + def apply_to_telemetry(self, telemetry: "Union[Log, Metric, StreamedSpan]") -> None: # Attributes-based events and telemetry go through here (logs, metrics, # spansV2) - trace_context = self.get_trace_context() - trace_id = trace_context.get("trace_id") - if telemetry.get("trace_id") is None: - telemetry["trace_id"] = trace_id or "00000000-0000-0000-0000-000000000000" - span_id = trace_context.get("span_id") - if telemetry.get("span_id") is None and span_id: - telemetry["span_id"] = span_id + if not isinstance(telemetry, StreamedSpan): + trace_context = self.get_trace_context() + trace_id = trace_context.get("trace_id") + if telemetry.get("trace_id") is None: + telemetry["trace_id"] = ( + trace_id or "00000000-0000-0000-0000-000000000000" + ) + span_id = trace_context.get("span_id") + if telemetry.get("span_id") is None and span_id: + telemetry["span_id"] = span_id self._apply_scope_attributes_to_telemetry(telemetry) self._apply_user_attributes_to_telemetry(telemetry) diff --git a/sentry_sdk/tracing.py b/sentry_sdk/tracing.py index c4b38e4528..17c84fdd3d 100644 --- a/sentry_sdk/tracing.py +++ b/sentry_sdk/tracing.py @@ -126,6 +126,10 @@ class TransactionKwargs(SpanKwargs, total=False): }, ) +# TODO: Once the old Span class is gone, move _tracing.py to tracing.py. This is +# here for now so that you can do sentry_sdk.tracing.start_span for the new API. +from sentry_sdk._tracing import start_span + BAGGAGE_HEADER_NAME = "baggage" SENTRY_TRACE_HEADER_NAME = "sentry-trace" @@ -1423,6 +1427,69 @@ def calculate_interest_rate(amount, rate, years): return decorator +def streaming_trace( + func: "Optional[Callable[P, R]]" = None, + *, + name: "Optional[str]" = None, + attributes: "Optional[dict[str, Any]]" = None, +) -> "Union[Callable[P, R], Callable[[Callable[P, R]], Callable[P, R]]]": + """ + Decorator to start a span around a function call. + + This decorator automatically creates a new span when the decorated function + is called, and finishes the span when the function returns or raises an exception. + + :param func: The function to trace. When used as a decorator without parentheses, + this is the function being decorated. When used with parameters (e.g., + ``@trace(op="custom")``, this should be None. + :type func: Callable or None + + :param name: The human-readable name/description for the span. If not provided, + defaults to the function name. This provides more specific details about + what the span represents (e.g., "GET /api/users", "process_user_data"). + :type name: str or None + + :param attributes: A dictionary of key-value pairs to add as attributes to the span. + Attribute values must be strings, integers, floats, or booleans. These + attributes provide additional context about the span's execution. + :type attributes: dict[str, Any] or None + + :returns: When used as ``@trace``, returns the decorated function. When used as + ``@trace(...)`` with parameters, returns a decorator function. + :rtype: Callable or decorator function + + Example:: + + import sentry_sdk + + # Simple usage with default values + @sentry_sdk.trace + def process_data(): + # Function implementation + pass + + # With custom parameters + @sentry_sdk.trace( + name="Get user data", + attributes={"postgres": True} + ) + def make_db_query(sql): + # Function implementation + pass + """ + from sentry_sdk.tracing_utils import create_streaming_span_decorator + + decorator = create_streaming_span_decorator( + name=name, + attributes=attributes, + ) + + if func: + return decorator(func) + else: + return decorator + + # Circular imports from sentry_sdk.tracing_utils import ( diff --git a/sentry_sdk/tracing_utils.py b/sentry_sdk/tracing_utils.py index 742582423b..68f95f4c3b 100644 --- a/sentry_sdk/tracing_utils.py +++ b/sentry_sdk/tracing_utils.py @@ -4,11 +4,12 @@ import os import re import sys +import uuid +import warnings from collections.abc import Mapping, MutableMapping from datetime import timedelta from random import Random from urllib.parse import quote, unquote -import uuid import sentry_sdk from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS, SPANTEMPLATE @@ -106,6 +107,13 @@ def has_tracing_enabled(options: "Optional[Dict[str, Any]]") -> bool: ) +def has_span_streaming_enabled(options: "Optional[dict[str, Any]]") -> bool: + if options is None: + return False + + return (options.get("_experiments") or {}).get("trace_lifecycle") == "stream" + + @contextlib.contextmanager def record_sql_queries( cursor: "Any", @@ -742,6 +750,52 @@ def populate_from_transaction( return Baggage(sentry_items, mutable=False) + @classmethod + def populate_from_segment(cls, segment: "StreamedSpan") -> "Baggage": + """ + Populate fresh baggage entry with sentry_items and make it immutable + if this is the head SDK which originates traces. + """ + client = sentry_sdk.get_client() + sentry_items: "Dict[str, str]" = {} + + if not client.is_active(): + return Baggage(sentry_items) + + options = client.options or {} + + sentry_items["trace_id"] = segment.trace_id + sentry_items["sample_rand"] = f"{segment._sample_rand:.6f}" # noqa: E231 + + if options.get("environment"): + sentry_items["environment"] = options["environment"] + + if options.get("release"): + sentry_items["release"] = options["release"] + + if client.parsed_dsn: + sentry_items["public_key"] = client.parsed_dsn.public_key + if client.parsed_dsn.org_id: + sentry_items["org_id"] = client.parsed_dsn.org_id + + if segment.source not in LOW_QUALITY_SEGMENT_SOURCES: + sentry_items["transaction"] = segment.name + + if segment.sample_rate is not None: + sentry_items["sample_rate"] = str(segment.sample_rate) + + if segment.sampled is not None: + sentry_items["sampled"] = "true" if segment.sampled else "false" + + # There's an existing baggage but it was mutable, which is why we are + # creating this new baggage. + # However, if by chance the user put some sentry items in there, give + # them precedence. + if segment._baggage and segment._baggage.sentry_items: + sentry_items.update(segment._baggage.sentry_items) + + return Baggage(sentry_items, mutable=False) + def freeze(self) -> None: self.mutable = False @@ -935,7 +989,61 @@ def sync_wrapper(*args: "Any", **kwargs: "Any") -> "Any": return span_decorator -def get_current_span(scope: "Optional[sentry_sdk.Scope]" = None) -> "Optional[Span]": +def create_streaming_span_decorator( + name: "Optional[str]" = None, + attributes: "Optional[dict[str, Any]]" = None, +) -> "Any": + """ + Create a span decorator that can wrap both sync and async functions. + + :param name: The name of the span. + :type name: str or None + :param attributes: Additional attributes to set on the span. + :type attributes: dict or None + """ + from sentry_sdk.scope import should_send_default_pii + + def span_decorator(f: "Any") -> "Any": + """ + Decorator to create a span for the given function. + """ + + @functools.wraps(f) + async def async_wrapper(*args: "Any", **kwargs: "Any") -> "Any": + span_name = name or qualname_from_function(f) or "" + + with start_streaming_span(name=span_name, attributes=attributes): + result = await f(*args, **kwargs) + return result + + try: + async_wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined] + except Exception: + pass + + @functools.wraps(f) + def sync_wrapper(*args: "Any", **kwargs: "Any") -> "Any": + span_name = name or qualname_from_function(f) or "" + + with start_streaming_span(name=span_name, attributes=attributes): + return f(*args, **kwargs) + + try: + sync_wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined] + except Exception: + pass + + if inspect.iscoroutinefunction(f): + return async_wrapper + else: + return sync_wrapper + + return span_decorator + + +def get_current_span( + scope: "Optional[sentry_sdk.Scope]" = None, +) -> "Optional[Union[Span, StreamedSpan]]": """ Returns the currently active span if there is one running, otherwise `None` """ @@ -950,6 +1058,15 @@ def set_span_errored(span: "Optional[Span]" = None) -> None: Also sets the status of the transaction (root span) to INTERNAL_ERROR. """ span = span or get_current_span() + + if not isinstance(span, Span): + warnings.warn( + "set_span_errored is not available in streaming mode.", + DeprecationWarning, + stacklevel=2, + ) + return + if span is not None: span.set_status(SPANSTATUS.INTERNAL_ERROR) if span.containing_transaction is not None: @@ -1311,5 +1428,11 @@ def add_sentry_baggage_to_headers( SENTRY_TRACE_HEADER_NAME, ) +from sentry_sdk._tracing import ( + LOW_QUALITY_SEGMENT_SOURCES, + start_span as start_streaming_span, +) + if TYPE_CHECKING: from sentry_sdk.tracing import Span + from sentry_sdk._tracing import StreamedSpan