Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ classifiers = [
]
license = { text = "BSD-3-Clause" }
requires-python = ">=3.10"
dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7"]
dependencies = ["bidict", "pika", "setuptools", "stomp-py>=7", "opentelemetry-api==1.20.0", "opentelemetry-sdk==1.20.0", "opentelemetry-exporter-otlp-proto-http==1.20.0" ]

[project.urls]
Download = "https://github.com/DiamondLightSource/python-workflows/releases"
Expand Down
3 changes: 3 additions & 0 deletions requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@ pytest-mock==3.14.0
pytest-timeout==2.3.1
stomp-py==8.1.2
websocket-client==1.8.0
opentelemetry-api==1.20.0
opentelemetry-sdk==1.20.0
opentelemetry-exporter-otlp-proto-http==1.20.0
33 changes: 33 additions & 0 deletions src/workflows/recipe/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from collections.abc import Callable
from typing import Any

from opentelemetry import trace

from workflows.recipe.recipe import Recipe
from workflows.recipe.validate import validate_recipe
from workflows.recipe.wrapper import RecipeWrapper
Expand Down Expand Up @@ -69,6 +71,37 @@ def unwrap_recipe(header, message):
message = mangle_for_receiving(message)
if header.get("workflows-recipe") in {True, "True", "true", 1}:
rw = RecipeWrapper(message=message, transport=transport_layer)

# Extract recipe_id on the current span
span = trace.get_current_span()
recipe_id = None

# Extract recipe ID from environment
if isinstance(message, dict):
environment = message.get("environment", {})
if isinstance(environment, dict):
recipe_id = environment.get("ID")

if recipe_id:
span.set_attribute("recipe_id", recipe_id)
span.add_event(
"recipe.id_extracted", attributes={"recipe_id": recipe_id}
)

# Extract span_id and trace_id for logging
span_context = span.get_span_context()
if span_context and span_context.is_valid:
span_id = format(span_context.span_id, "016x")
trace_id = format(span_context.trace_id, "032x")

log_extra = {
"span_id": span_id,
"trace_id": trace_id,
}

if recipe_id:
log_extra["recipe_id"] = recipe_id

if log_extender and rw.environment and rw.environment.get("ID"):
with log_extender("recipe_ID", rw.environment["ID"]):
return callback(rw, header, message.get("payload"))
Expand Down
41 changes: 41 additions & 0 deletions src/workflows/services/common_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,16 @@
import time
from typing import Any

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor

import workflows
import workflows.logging
from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware
from workflows.util.zocalo.configuration import OTEL


class Status(enum.Enum):
Expand Down Expand Up @@ -185,6 +193,39 @@
self.transport.subscription_callback_set_intercept(
self._transport_interceptor
)

# Configure OTELTracing if configuration is available
otel_config = (
OTEL.config if hasattr(OTEL, "config") and OTEL.config else None
)

if otel_config:
# Configure OTELTracing
resource = Resource.create(
{
SERVICE_NAME: self._service_name,
}
)

self.log.debug("Configuring OTELTracing")
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)

# Configure BatchProcessor and OTLPSpanExporter using config values
otlp_exporter = OTLPSpanExporter(
endpoint=otel_config["endpoint"],
timeout=otel_config.get("timeout", 10),
)
span_processor = BatchSpanProcessor(otlp_exporter)

Check failure

Code scanning / CodeQL

Potentially uninitialized local variable Error

Local variable 'otlp_exporter' may be used before it is initialized.
provider.add_span_processor(span_processor)

Check failure

Code scanning / CodeQL

Potentially uninitialized local variable Error

Local variable 'provider' may be used before it is initialized.

# Add OTELTracingMiddleware to the transport layer
tracer = trace.get_tracer(__name__)
otel_middleware = OTELTracingMiddleware(
tracer, service_name=self._service_name
)
self._transport.add_middleware(otel_middleware)

metrics = self._environment.get("metrics")
if metrics:
import prometheus_client
Expand Down
34 changes: 34 additions & 0 deletions src/workflows/transport/middleware/otel_tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

import functools
from collections.abc import Callable

from opentelemetry import trace
from opentelemetry.propagate import extract

from workflows.transport.middleware import BaseTransportMiddleware


class OTELTracingMiddleware(BaseTransportMiddleware):
def __init__(self, tracer: trace.Tracer, service_name: str):
self.tracer = tracer
self.service_name = service_name

def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int:
@functools.wraps(callback)
def wrapped_callback(header, message):
# Extract trace context from message headers
ctx = extract(header) if header else None

# Start a new span with the extracted context
with self.tracer.start_as_current_span(
"transport.subscribe", context=ctx
) as span:
span.set_attribute("service_name", self.service_name)
span.set_attribute("channel", channel)

# Call the original callback
return callback(header, message)

# Call the next middleware with the wrapped callback
return call_next(channel, wrapped_callback, **kwargs)
26 changes: 26 additions & 0 deletions src/workflows/util/zocalo/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,32 @@
from workflows.transport.stomp_transport import StompTransport


class OTEL:
"""A Zocalo configuration plugin to pre-populate OTELTracing config defaults"""

class Schema(PluginSchema):
host = fields.Str(required=True)
port = fields.Int(required=True)
endpoint = fields.Str(required=False)
timeout = fields.Int(required=False, load_default=10)

# Store configuration for access by services
config = {}

@staticmethod
def activate(configuration):
# Build the full endpoint URL if not provided
if "endpoint" not in configuration:
endpoint = (
f"https://{configuration['host']}:{configuration['port']}/v1/traces"
)
else:
endpoint = configuration["endpoint"]

OTEL.config["endpoint"] = endpoint
OTEL.config["timeout"] = configuration.get("timeout", 10)


class Stomp:
"""A Zocalo configuration plugin to pre-populate StompTransport config defaults"""

Expand Down
Loading