OSSS.ai.workflows.executor¶
OSSS.ai.workflows.executor
¶
Declarative workflow execution engine for OSSS DAG workflows.
This module provides the DeclarativeOrchestrator and WorkflowExecutor for executing sophisticated DAG workflows with advanced nodes, event emission, and comprehensive state management.
WorkflowExecutionError
¶
Bases: Exception
Exception raised during workflow execution.
ExecutionContext
¶
CompositionResult
¶
Bases: BaseModel
Result of DAG composition process.
Contains the complete mapping of workflow definition to executable DAG structure with validation results and metadata.
WorkflowResult
¶
Bases: BaseModel
Comprehensive result of workflow execution with metadata and tracing.
Provides complete execution information including performance metrics, event correlation, and node execution details for analytics and debugging.
to_dict()
¶
Convert WorkflowResult to dictionary with JSON-safe string escaping.
build_execution_structure()
¶
Build hierarchical execution structure from flat node execution order.
Uses workflow knowledge to group parallel nodes: - refiner runs first (sequential) - critic and historian run in parallel after refiner - synthesis runs last (sequential)
Returns¶
List[Union[str, List[str]]] Hierarchical structure like ['refiner', ['critic', 'historian'], 'synthesis']
WorkflowExecutor
¶
Runtime execution engine for advanced node workflows.
Provides enhanced state management, event integration, and performance monitoring for complex DAG state propagation with correlation tracking.
execute_workflow(workflow_def, query, execution_config)
async
¶
Execute a workflow definition (legacy interface).
validate_workflow_definition(workflow_def)
async
¶
Validate a workflow definition.
execute(initial_context, workflow_id, execution_id=None)
async
¶
Execute workflow with comprehensive state management and event emission.
Parameters¶
initial_context : AgentContext Starting context for workflow execution workflow_id : str Workflow identifier for correlation execution_id : Optional[str] Execution identifier (auto-generated if None)
Returns¶
WorkflowResult Comprehensive execution result with metadata
DeclarativeOrchestrator
¶
Main orchestrator for executing DAG workflows using advanced nodes.
Provides the primary interface for declarative workflow execution with comprehensive event emission, error handling, and result tracking.
run(query, config=None)
async
¶
Run a basic workflow with legacy interface.
execute_workflow(workflow, initial_context, execution_id=None)
async
¶
Execute declarative workflow with comprehensive results.
Parameters¶
workflow : WorkflowDefinition Workflow definition to execute initial_context : AgentContext Starting context for execution execution_id : Optional[str] Execution identifier (auto-generated if None)
Returns¶
WorkflowResult Comprehensive workflow execution result
validate_workflow(workflow=None)
async
¶
Validate workflow definition (uses instance workflow if not provided).