OSSS.ai.correlation¶
OSSS.ai.correlation
¶
Correlation Context Management for OSSS.
Provides correlation ID propagation through async execution chains using contextvars. This enables tracing of requests through the orchestrator, agents, and event system for debugging and observability.
CorrelationContext
¶
Bases: BaseModel
Correlation context for tracing requests through the system.
Migrated from dataclass to Pydantic BaseModel for enhanced validation, serialization, and integration with the OSSS Pydantic ecosystem.
Provides access to correlation information that automatically propagates through async execution chains.
to_dict()
¶
Convert correlation context to dictionary for serialization.
Maintained for backward compatibility. Uses Pydantic's model_dump() internally for consistent serialization with datetime handling.
from_dict(data)
classmethod
¶
Create correlation context from dictionary.
Uses Pydantic's model_validate() for cleaner deserialization and automatic type conversion.
trace(correlation_id=None, workflow_id=None, parent_span_id=None, metadata=None)
async
¶
Context manager for explicit correlation control.
Sets correlation context for the duration of the async context and automatically restores the previous context when exiting.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
correlation_id
|
Optional[str]
|
Explicit correlation ID, or generates new one |
None
|
workflow_id
|
Optional[str]
|
Explicit workflow ID, or generates new one |
None
|
parent_span_id
|
Optional[str]
|
Parent span for nested operations |
None
|
metadata
|
Optional[Dict[str, Any]]
|
Additional trace metadata |
None
|
Yields:
| Type | Description |
|---|---|
AsyncGenerator[CorrelationContext, None]
|
CorrelationContext with current trace information |
Example
async with trace(correlation_id="custom-trace-123") as ctx: result = await orchestrator.run(query, config) # All events emitted within this context use custom-trace-123
get_correlation_id()
¶
Get the current correlation ID from context.
get_workflow_id()
¶
Get the current workflow ID from context.
get_parent_span_id()
¶
Get the current parent span ID from context.
get_trace_metadata()
¶
Get the current trace metadata from context.
get_current_context()
¶
Get the current correlation context if available.
Returns:
| Type | Description |
|---|---|
Optional[CorrelationContext]
|
CorrelationContext if context is active, None otherwise |
ensure_correlation_context()
¶
Ensure a correlation context exists, creating one if necessary.
This is useful for operations that should always have trace context, even if not explicitly created by the caller.
Returns:
| Type | Description |
|---|---|
CorrelationContext
|
CorrelationContext (existing or newly created) |
create_child_span(operation_name, metadata=None)
¶
Create a child span ID for nested operations.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
operation_name
|
str
|
Name of the operation for the child span |
required |
metadata
|
Optional[Dict[str, Any]]
|
Additional metadata for the child span |
None
|
Returns:
| Type | Description |
|---|---|
str
|
Child span ID that can be used as parent_span_id for further nesting |
add_trace_metadata(key, value)
¶
Add metadata to the current trace context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Metadata key |
required |
value
|
Any
|
Metadata value |
required |
get_correlation_headers()
¶
Get correlation information as HTTP headers.
Useful for propagating correlation across service boundaries.
Returns:
| Type | Description |
|---|---|
Dict[str, str]
|
Dictionary of correlation headers |
propagate_correlation(func, *args, **kwargs)
async
¶
Decorator-like function to ensure correlation context propagates to async functions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
func
|
Any
|
Async function to call |
required |
*args
|
Any
|
Arguments for the function |
()
|
**kwargs
|
Any
|
Keyword arguments for the function |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
Result of the function call with correlation context preserved |
with_correlation(correlation_id, workflow_id=None, func=None, *args, **kwargs)
async
¶
Execute a function with explicit correlation context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
correlation_id
|
str
|
Correlation ID to use |
required |
workflow_id
|
Optional[str]
|
Workflow ID to use (optional, generates if not provided) |
None
|
func
|
Any
|
Async function to execute |
None
|
*args
|
Any
|
Arguments for the function |
()
|
**kwargs
|
Any
|
Keyword arguments for the function |
{}
|
Returns:
| Type | Description |
|---|---|
Any
|
Result of the function execution |
is_traced()
¶
Check if we're currently in a trace context.
Returns:
| Type | Description |
|---|---|
bool
|
True if correlation context is active |