Skip to content

core

core

Core workflow implementation.

Workflow

Workflow(
    max_processes: Optional[int] = None,
    max_threads: Optional[int] = None,
    process_id: Optional[str] = None,
)

Manages the execution of a workflow graph.

Source code in src/process_manager/workflow/core.py
def __init__(self, 
             max_processes: Optional[int] = None,
             max_threads: Optional[int] = None,
             process_id: Optional[str] = None):
    self.nodes: Dict[str, WorkflowNode] = {}
    self.results: Dict[str, ProcessResult] = {}
    self.max_processes = max_processes
    self.max_threads = max_threads
    self.process_id = process_id or f"workflow_{id(self)}"
    self.pool_manager = WorkflowPoolManager.get_instance()

execution_graph property

execution_graph: Dict[str, List[str]]

Build a graph of process dependencies.

add_node

add_node(node: WorkflowNode) -> None

Add a node to the workflow.

Source code in src/process_manager/workflow/core.py
def add_node(self, node: WorkflowNode) -> None:
    """Add a node to the workflow."""
    self.nodes[node.process.config.process_id] = node

execute async

execute(initial_data: Dict[str, Any] = None) -> Dict[str, ProcessResult]

Execute the workflow.

Source code in src/process_manager/workflow/core.py
async def execute(self, initial_data: Dict[str, Any] = None) -> Dict[str, ProcessResult]:
    """Execute the workflow."""
    try:
        with self.get_pools() as (thread_pool, process_pool):
            # ... (keep existing code until the results processing part)
            initial_data = initial_data or {}
            node_results: Dict[str, ProcessResult] = {}
            completed_nodes = set()
            failed_nodes = set()

            all_nodes = set(self.nodes.keys())

            while len(completed_nodes) < len(self.nodes):
                ready_nodes = [
                    node_id for node_id in all_nodes - completed_nodes
                    if (
                        all(dep in completed_nodes for dep in self.nodes[node_id].dependencies) and
                        all(
                            dep not in failed_nodes or 
                            not self.nodes[dep].required 
                            for dep in self.nodes[node_id].dependencies
                        )
                    )
                ]

                if not ready_nodes:
                    remaining_nodes = all_nodes - completed_nodes
                    if remaining_nodes:
                        unmet_dependencies = {
                            node: [
                                dep for dep in self.nodes[node].dependencies
                                if dep not in completed_nodes
                            ]
                            for node in remaining_nodes
                        }
                        raise Exception(
                            f"Workflow deadlock detected. Unmet dependencies: {unmet_dependencies}"
                        )
                    break

                async_nodes = []
                thread_nodes = []
                process_nodes = []

                for node_id in ready_nodes:
                    process_type = self.nodes[node_id].process.config.process_type
                    match process_type:
                        case ProcessType.ASYNC:
                            async_nodes.append(node_id)
                        case ProcessType.THREAD:
                            thread_nodes.append(node_id)
                        case ProcessType.PROCESS:
                            process_nodes.append(node_id)
                        case _:
                            raise ValueError(f"Invalid ProcessType: {process_type}")

                tasks = []

                for node_id in ready_nodes:
                    node = self.nodes[node_id]
                    node_input = {}

                    for dep in node.dependencies:
                        if dep in node_results and node_results[dep].success:
                            node_input[dep] = node_results[dep].data

                    if not node.dependencies and node_id in initial_data:
                        node_input = initial_data[node_id]

                    if node_id in async_nodes:
                        tasks.append(node.process.run(node_input))

                    elif node_id in thread_nodes:
                        loop = asyncio.get_running_loop()
                        tasks.append(
                            loop.run_in_executor(
                                thread_pool,
                                self._run_sync_process,
                                node.process,
                                node_input
                            )
                        )

                    else:  # process_nodes
                        loop = asyncio.get_running_loop()
                        tasks.append(
                            loop.run_in_executor(
                                process_pool,
                                self._run_sync_process,
                                node.process,
                                node_input
                            )
                        )

                # Execute all ready nodes in parallel
                results = await asyncio.gather(*tasks, return_exceptions=True)

                # Process results and update workflow state
                for node_id, result in zip(ready_nodes, results):
                    node = self.nodes[node_id]
                    completed_nodes.add(node_id)

                    if isinstance(result, Exception):
                        node.process.metadata.state = ProcessState.FAILED
                        node.process.metadata.last_error = str(result)
                        failed_nodes.add(node_id)

                        if node.required:
                            if node.process.config.fail_fast:
                                raise Exception(
                                    f"Critical node {node_id} failed: {str(result)}"
                                )
                        continue

                    # Wrap the result in ProcessResult if it isn't already
                    if not isinstance(result, ProcessResult):
                        result = ProcessResult(
                            success=True,
                            data=result,
                            execution_time=0,  # We don't have timing info here
                            start_time=datetime.now(),  # Approximate
                            end_time=datetime.now()
                        )

                    node_results[node_id] = result
                    if not result.success:
                        failed_nodes.add(node_id)
                        if node.required and node.process.config.fail_fast:
                            raise Exception(
                                f"Critical node {node_id} failed: {result.error}"
                            )

            self.results = node_results
            return node_results
# async def execute(self, initial_data: Dict[str, Any] = None) -> Dict[str, ProcessResult]:
#     """Execute the workflow."""
#     try:
#         with self.get_pools() as (thread_pool, process_pool):
#             initial_data = initial_data or {}
#             node_results: Dict[str, ProcessResult] = {}
#             completed_nodes = set()
#             failed_nodes = set()

#             all_nodes = set(self.nodes.keys())

#             while len(completed_nodes) < len(self.nodes):
#                 ready_nodes = [
#                     node_id for node_id in all_nodes - completed_nodes
#                     if (
#                         all(dep in completed_nodes for dep in self.nodes[node_id].dependencies) and
#                         all(
#                             dep not in failed_nodes or 
#                             not self.nodes[dep].required 
#                             for dep in self.nodes[node_id].dependencies
#                         )
#                     )
#                 ]

#                 if not ready_nodes:
#                     remaining_nodes = all_nodes - completed_nodes
#                     if remaining_nodes:
#                         unmet_dependencies = {
#                             node: [
#                                 dep for dep in self.nodes[node].dependencies
#                                 if dep not in completed_nodes
#                             ]
#                             for node in remaining_nodes
#                         }
#                         raise Exception(
#                             f"Workflow deadlock detected. Unmet dependencies: {unmet_dependencies}"
#                         )
#                     break

#                 async_nodes = []
#                 thread_nodes = []
#                 process_nodes = []

#                 for node_id in ready_nodes:
#                     process_type = self.nodes[node_id].process.config.process_type
#                     match process_type:
#                         case ProcessType.ASYNC:
#                             async_nodes.append(node_id)
#                         case ProcessType.THREAD:
#                             thread_nodes.append(node_id)
#                         case ProcessType.PROCESS:
#                             process_nodes.append(node_id)
#                         case _:
#                             raise ValueError(f"Invalid ProcessType: {process_type}")

#                 tasks = []

#                 for node_id in ready_nodes:
#                     node = self.nodes[node_id]
#                     node_input = {}

#                     for dep in node.dependencies:
#                         if dep in node_results and node_results[dep].success:
#                             node_input[dep] = node_results[dep].data

#                     if not node.dependencies and node_id in initial_data:
#                         node_input = initial_data[node_id]

#                     if node_id in async_nodes:
#                         tasks.append(node.process.run(node_input))

#                     elif node_id in thread_nodes:
#                         loop = asyncio.get_running_loop()
#                         tasks.append(
#                             loop.run_in_executor(
#                                 thread_pool,
#                                 self._run_sync_process,
#                                 node.process,
#                                 node_input
#                             )
#                         )

#                     else:  # process_nodes
#                         loop = asyncio.get_running_loop()
#                         tasks.append(
#                             loop.run_in_executor(
#                                 process_pool,
#                                 self._run_sync_process,
#                                 node.process,
#                                 node_input
#                             )
#                         )

#                 results = await asyncio.gather(*tasks, return_exceptions=True)

#                 for node_id, result in zip(ready_nodes, results):
#                     node = self.nodes[node_id]
#                     completed_nodes.add(node_id)

#                     if isinstance(result, Exception):
#                         node.process.metadata.state = ProcessState.FAILED
#                         node.process.metadata.last_error = str(result)
#                         failed_nodes.add(node_id)

#                         if node.required:
#                             if node.process.config.fail_fast:
#                                 raise Exception(
#                                     f"Critical node {node_id} failed: {str(result)}"
#                                 )
#                         continue

#                     node_results[node_id] = result
#                     if not result.success:
#                         failed_nodes.add(node_id)
#                         if node.required and node.process.config.fail_fast:
#                             raise Exception(
#                                 f"Critical node {node_id} failed: {result.error}"
#                             )

#             self.results = node_results
#             return node_results

    except Exception as e:
        for node_id in all_nodes - completed_nodes:
            self.nodes[node_id].process.metadata.state = ProcessState.SKIPPED
        raise

    finally:
        for node_id, result in node_results.items():
            self.nodes[node_id].process.metadata.result = result

get_pools

get_pools()

Get pools for this workflow.

Source code in src/process_manager/workflow/core.py
@contextmanager
def get_pools(self):
    """Get pools for this workflow."""
    try:
        pools = self.pool_manager.get_or_create_pools(
            self.process_id,
            self.max_threads,
            self.max_processes
        )
        yield pools['thread_pool'], pools['process_pool']
    finally:
        pass  # Don't cleanup here, pools are reused

shutdown

shutdown()

Cleanup resources for this workflow.

Source code in src/process_manager/workflow/core.py
def shutdown(self):
    """Cleanup resources for this workflow."""
    self.pool_manager.cleanup_pools(self.process_id)

to_json

to_json() -> str

Serialize the workflow state to JSON.

Source code in src/process_manager/workflow/core.py
def to_json(self) -> str:
    """Serialize the workflow state to JSON."""
    workflow_state = {
        "nodes": {
            node_id: {
                "process_id": node.process.config.process_id,
                "dependencies": node.dependencies,
                "required": node.required,
                "state": node.process.metadata.state.value,
                "retry_count": node.process.metadata.retry_count,
                "last_error": node.process.metadata.last_error
            }
            for node_id, node in self.nodes.items()
        },
        "results": {
            node_id: result.dict() for node_id, result in self.results.items()
        }
    }
    return json.dumps(workflow_state, default=str)

WorkflowPoolManager

Manages thread and process pools across workflows.

cleanup_pools

cleanup_pools(process_id: str) -> None

Cleanup pools for a specific process.

Source code in src/process_manager/workflow/core.py
def cleanup_pools(self, process_id: str) -> None:
    """Cleanup pools for a specific process."""
    if process_id in self._pools:
        pools = self._pools[process_id]
        pools['thread_pool'].shutdown()
        pools['process_pool'].shutdown()
        del self._pools[process_id]

get_or_create_pools

get_or_create_pools(
    process_id: str, max_threads: Optional[int] = None, max_processes: Optional[int] = None
) -> Dict[str, Any]

Get or create pools for a specific process.

Source code in src/process_manager/workflow/core.py
def get_or_create_pools(self, process_id: str, max_threads: Optional[int] = None, max_processes: Optional[int] = None) -> Dict[str, Any]:
    """Get or create pools for a specific process."""
    if process_id not in self._pools:
        self._pools[process_id] = {
            'thread_pool': ThreadPoolExecutor(max_workers=max_threads),
            'process_pool': ProcessPoolExecutor(max_workers=max_processes)
        }
    return self._pools[process_id]

create_workflow

create_workflow(
    max_processes: Optional[int] = None,
    max_threads: Optional[int] = None,
    process_id: Optional[str] = None,
) -> Workflow

Create a new workflow instance with specified pool sizes.

Parameters:

Name Type Description Default
max_processes Optional[int]

Maximum number of processes in the process pool

None
max_threads Optional[int]

Maximum number of threads in the thread pool

None
process_id Optional[str]

Unique identifier for this workflow instance

None

Returns:

Name Type Description
Workflow Workflow

A new workflow instance

Source code in src/process_manager/workflow/core.py
def create_workflow(
        max_processes: Optional[int] = None,
        max_threads: Optional[int] = None,
        process_id: Optional[str] = None) -> Workflow:
    """Create a new workflow instance with specified pool sizes.

    Args:
        max_processes: Maximum number of processes in the process pool
        max_threads: Maximum number of threads in the thread pool
        process_id: Unique identifier for this workflow instance

    Returns:
        Workflow: A new workflow instance
    """
    return Workflow(
        max_processes=max_processes,
        max_threads=max_threads,
        process_id=process_id
    )