Skip to content

pool

pool

Process pool management for flow execution.

ProcessPoolManager

ProcessPoolManager(max_threads: Optional[int] = None, max_processes: Optional[int] = None)

Manages process and thread pools for flow execution.

Source code in src/flow/execution/pool.py
def __init__(
    self,
    max_threads: Optional[int] = None,
    max_processes: Optional[int] = None
):
    self.max_threads = max_threads or mp.cpu_count()
    self.max_processes = max_processes or mp.cpu_count()

    # Initialize pools
    self._thread_pool = ThreadPoolExecutor(max_workers=self.max_threads)
    self._process_pool = ProcessPoolExecutor(max_workers=self.max_processes)

    # Track active tasks
    self._futures: Dict[str, Future] = {}
    self._locks: Dict[str, asyncio.Lock] = {}
    self._loop = asyncio.get_event_loop()

active_tasks property

active_tasks: Set[str]

Get set of currently active task IDs.

cancel_task async

cancel_task(process_id: str) -> None

Cancel a running task.

Source code in src/flow/execution/pool.py
async def cancel_task(self, process_id: str) -> None:
    """Cancel a running task."""
    future = self._futures.get(process_id)
    if future and not future.done():
        future.cancel()
        self._cleanup_task(process_id)

shutdown

shutdown(wait: bool = True) -> None

Shutdown the pool manager.

Source code in src/flow/execution/pool.py
def shutdown(self, wait: bool = True) -> None:
    """Shutdown the pool manager."""
    # Cancel all running tasks
    for process_id in list(self._futures.keys()):
        asyncio.create_task(self.cancel_task(process_id))

    # Shutdown pools
    self._thread_pool.shutdown(wait=wait)
    self._process_pool.shutdown(wait=wait)

    # Clear tracking
    self._futures.clear()
    self._locks.clear()

submit_task async

submit_task(
    process_id: str,
    flow_type: FlowType,
    func: Callable,
    input_data: Dict[str, Any],
    timeout: Optional[float] = None,
) -> Any

Submit a task for execution.

Source code in src/flow/execution/pool.py
async def submit_task(
    self,
    process_id: str,
    flow_type: FlowType,
    func: Callable,
    input_data: Dict[str, Any],
    timeout: Optional[float] = None,
) -> Any:
    """Submit a task for execution."""
    if process_id in self._futures:
        raise FlowError(f"Task {process_id} already running")

    self._locks[process_id] = asyncio.Lock()

    try:
        # Create the executor based on flow type
        executor = self._thread_pool if flow_type == FlowType.THREAD else self._process_pool

        # Submit the task
        future = executor.submit(func, input_data)
        self._futures[process_id] = future

        # Wait for completion
        try:
            if timeout:
                result = await asyncio.wait_for(
                    asyncio.wrap_future(future),
                    timeout=timeout
                )
            else:
                result = await asyncio.wrap_future(future)

            return result

        except asyncio.TimeoutError:
            future.cancel()
            raise FlowTimeoutError(f"Task {process_id} timed out after {timeout} seconds")

    finally:
        self._cleanup_task(process_id)

wait_for_task async

wait_for_task(process_id: str, timeout: Optional[float] = None) -> None

Wait for a specific task to complete.

Source code in src/flow/execution/pool.py
async def wait_for_task(self, process_id: str, timeout: Optional[float] = None) -> None:
    """Wait for a specific task to complete."""
    future = self._futures.get(process_id)
    if not future:
        return

    try:
        await asyncio.wait_for(
            self._loop.run_in_executor(None, future.result),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        raise FlowTimeoutError(f"Timeout waiting for task {process_id}")