Skip to content

OSSS.ai.workflows.executor

OSSS.ai.workflows.executor

Declarative workflow execution engine for OSSS DAG workflows.

This module provides the DeclarativeOrchestrator and WorkflowExecutor for executing sophisticated DAG workflows with advanced nodes, event emission, and comprehensive state management.

WorkflowExecutionError

Bases: Exception

Exception raised during workflow execution.

ExecutionContext

Bases: BaseModel

Context for workflow execution tracking.

Provides comprehensive execution state management with validation and automatic serialization for workflow orchestration.

update_status(status)

Update execution status.

add_metadata(key, value)

Add execution metadata.

CompositionResult

Bases: BaseModel

Result of DAG composition process.

Contains the complete mapping of workflow definition to executable DAG structure with validation results and metadata.

WorkflowResult

Bases: BaseModel

Comprehensive result of workflow execution with metadata and tracing.

Provides complete execution information including performance metrics, event correlation, and node execution details for analytics and debugging.

to_dict()

Convert WorkflowResult to dictionary with JSON-safe string escaping.

build_execution_structure()

Build hierarchical execution structure from flat node execution order.

Uses workflow knowledge to group parallel nodes: - refiner runs first (sequential) - critic and historian run in parallel after refiner - synthesis runs last (sequential)

Returns

List[Union[str, List[str]]] Hierarchical structure like ['refiner', ['critic', 'historian'], 'synthesis']

WorkflowExecutor

Runtime execution engine for advanced node workflows.

Provides enhanced state management, event integration, and performance monitoring for complex DAG state propagation with correlation tracking.

execute_workflow(workflow_def, query, execution_config) async

Execute a workflow definition (legacy interface).

validate_workflow_definition(workflow_def) async

Validate a workflow definition.

execute(initial_context, workflow_id, execution_id=None) async

Execute workflow with comprehensive state management and event emission.

Parameters

initial_context : AgentContext Starting context for workflow execution workflow_id : str Workflow identifier for correlation execution_id : Optional[str] Execution identifier (auto-generated if None)

Returns

WorkflowResult Comprehensive execution result with metadata

DeclarativeOrchestrator

Main orchestrator for executing DAG workflows using advanced nodes.

Provides the primary interface for declarative workflow execution with comprehensive event emission, error handling, and result tracking.

run(query, config=None) async

Run a basic workflow with legacy interface.

execute_workflow(workflow, initial_context, execution_id=None) async

Execute declarative workflow with comprehensive results.

Parameters

workflow : WorkflowDefinition Workflow definition to execute initial_context : AgentContext Starting context for execution execution_id : Optional[str] Execution identifier (auto-generated if None)

Returns

WorkflowResult Comprehensive workflow execution result

validate_workflow(workflow=None) async

Validate workflow definition (uses instance workflow if not provided).

export_workflow_snapshot(workflow) async

Export workflow with composition metadata for sharing.

Parameters

workflow : WorkflowDefinition Workflow to export

Returns

Dict[str, Any] Complete workflow snapshot with metadata

get_workflow_metadata()

Get metadata about the current workflow.

Returns

Dict[str, Any] Workflow metadata including basic information

update_workflow_definition(new_workflow)

Update the workflow definition.

Parameters

new_workflow : WorkflowDefinition New workflow definition to use