Skip to content

OSSS.ai.correlation

OSSS.ai.correlation

Correlation Context Management for OSSS.

Provides correlation ID propagation through async execution chains using contextvars. This enables tracing of requests through the orchestrator, agents, and event system for debugging and observability.

CorrelationContext

Bases: BaseModel

Correlation context for tracing requests through the system.

Migrated from dataclass to Pydantic BaseModel for enhanced validation, serialization, and integration with the OSSS Pydantic ecosystem.

Provides access to correlation information that automatically propagates through async execution chains.

to_dict()

Convert correlation context to dictionary for serialization.

Maintained for backward compatibility. Uses Pydantic's model_dump() internally for consistent serialization with datetime handling.

from_dict(data) classmethod

Create correlation context from dictionary.

Uses Pydantic's model_validate() for cleaner deserialization and automatic type conversion.

trace(correlation_id=None, workflow_id=None, parent_span_id=None, metadata=None) async

Context manager for explicit correlation control.

Sets correlation context for the duration of the async context and automatically restores the previous context when exiting.

Parameters:

Name Type Description Default
correlation_id Optional[str]

Explicit correlation ID, or generates new one

None
workflow_id Optional[str]

Explicit workflow ID, or generates new one

None
parent_span_id Optional[str]

Parent span for nested operations

None
metadata Optional[Dict[str, Any]]

Additional trace metadata

None

Yields:

Type Description
AsyncGenerator[CorrelationContext, None]

CorrelationContext with current trace information

Example

async with trace(correlation_id="custom-trace-123") as ctx: result = await orchestrator.run(query, config) # All events emitted within this context use custom-trace-123

get_correlation_id()

Get the current correlation ID from context.

get_workflow_id()

Get the current workflow ID from context.

get_parent_span_id()

Get the current parent span ID from context.

get_trace_metadata()

Get the current trace metadata from context.

get_current_context()

Get the current correlation context if available.

Returns:

Type Description
Optional[CorrelationContext]

CorrelationContext if context is active, None otherwise

ensure_correlation_context()

Ensure a correlation context exists, creating one if necessary.

This is useful for operations that should always have trace context, even if not explicitly created by the caller.

Returns:

Type Description
CorrelationContext

CorrelationContext (existing or newly created)

create_child_span(operation_name, metadata=None)

Create a child span ID for nested operations.

Parameters:

Name Type Description Default
operation_name str

Name of the operation for the child span

required
metadata Optional[Dict[str, Any]]

Additional metadata for the child span

None

Returns:

Type Description
str

Child span ID that can be used as parent_span_id for further nesting

add_trace_metadata(key, value)

Add metadata to the current trace context.

Parameters:

Name Type Description Default
key str

Metadata key

required
value Any

Metadata value

required

get_correlation_headers()

Get correlation information as HTTP headers.

Useful for propagating correlation across service boundaries.

Returns:

Type Description
Dict[str, str]

Dictionary of correlation headers

propagate_correlation(func, *args, **kwargs) async

Decorator-like function to ensure correlation context propagates to async functions.

Parameters:

Name Type Description Default
func Any

Async function to call

required
*args Any

Arguments for the function

()
**kwargs Any

Keyword arguments for the function

{}

Returns:

Type Description
Any

Result of the function call with correlation context preserved

with_correlation(correlation_id, workflow_id=None, func=None, *args, **kwargs) async

Execute a function with explicit correlation context.

Parameters:

Name Type Description Default
correlation_id str

Correlation ID to use

required
workflow_id Optional[str]

Workflow ID to use (optional, generates if not provided)

None
func Any

Async function to execute

None
*args Any

Arguments for the function

()
**kwargs Any

Keyword arguments for the function

{}

Returns:

Type Description
Any

Result of the function execution

is_traced()

Check if we're currently in a trace context.

Returns:

Type Description
bool

True if correlation context is active