Skip to content

execution

execution

Execution components of the flow package.

FlowContext

FlowContext()

Central manager for flow coordination and service access.

Source code in src/flow/core/context.py
def __init__(self):
    if FlowContext._instance is not None:
        raise RuntimeError("FlowContext is a singleton - use get_instance()")

    self._flow_graph = nx.DiGraph()
    self._flows: Dict[str, Flow] = {}
    self._status_locks: Dict[str, asyncio.Lock] = {}
    self._execution_locks: Dict[str, asyncio.Lock] = {}

    # Initialize managers
    self.results_manager = ResultsManager(context=self)

    from flow.execution.pool import ProcessPoolManager
    self.pool_manager = ProcessPoolManager()

    logger.info("FlowContext initialized")

cleanup

cleanup() -> None

Cleanup all managers and resources.

Source code in src/flow/core/context.py
def cleanup(self) -> None:
    """Cleanup all managers and resources."""
    self.pool_manager.shutdown()
    self.results_manager.cleanup()
    self._flows.clear()
    self._flow_graph.clear()
    self._status_locks.clear()
    self._execution_locks.clear()
    logger.info("FlowContext cleaned up")

fail_flow async

fail_flow(process_id: str, reason: str) -> None

Mark a flow as failed with the given reason.

Source code in src/flow/core/context.py
async def fail_flow(self, process_id: str, reason: str) -> None:
    """Mark a flow as failed with the given reason."""
    flow = self._flows.get(process_id)
    if not flow:
        return
    from flow.core.flow import FlowResult
    async with self._status_locks[process_id]:
        flow.status = FlowStatus.FAILED
        await self.results_manager.save_result(
            process_id,
            FlowResult(
                process_id=process_id,
                status=FlowStatus.FAILED,
                error=reason,
                start_time=datetime.now(),
                end_time=datetime.now()
            )
        )

get_flow

get_flow(process_id: str) -> Optional[Flow]

Get a flow by its process ID.

Source code in src/flow/core/context.py
def get_flow(self, process_id: str) -> Optional[Flow]:
    """Get a flow by its process ID."""
    return self._flows.get(process_id)

get_instance classmethod

get_instance() -> FlowContext

Get or create the singleton instance.

Source code in src/flow/core/context.py
@classmethod
def get_instance(cls) -> FlowContext:
    """Get or create the singleton instance."""
    if cls._instance is None:
        cls._instance = cls()
    return cls._instance

handle_flow_failure async

handle_flow_failure(process_id: str) -> None

Handle flow failure and notify dependent flows.

Source code in src/flow/core/context.py
async def handle_flow_failure(self, process_id: str) -> None:
    """Handle flow failure and notify dependent flows."""
    flow = self._flows.get(process_id)
    if not flow:
        return

    async with self._status_locks[process_id]:
        # Get all dependent flows
        dependent_flows = set(self._flow_graph.successors(process_id))

        for dep_id in dependent_flows:
            dep_flow = self._flows.get(dep_id)
            if not dep_flow:
                continue

            # If this was an optional dependency, mark it as skipped
            if process_id in dep_flow.get_dependencies(DependencyType.OPTIONAL):
                logger.warning(f"Optional dependency {process_id} failed for {dep_id}")
                continue

            # For required dependencies, fail the dependent flow
            logger.error(f"Required dependency {process_id} failed - failing {dep_id}")
            await self.fail_flow(dep_id, f"Required dependency {process_id} failed")

has_cycle

has_cycle(start_id: str) -> bool

Check if adding a flow would create a cycle.

Source code in src/flow/core/context.py
def has_cycle(self, start_id: str) -> bool:
    """Check if adding a flow would create a cycle."""
    try:
        nx.find_cycle(self._flow_graph, source=start_id)
        return True
    except nx.NetworkXNoCycle:
        return False

register_dependency

register_dependency(parent_id: str, child_id: str) -> None

Register a dependency relationship between flows.

Source code in src/flow/core/context.py
def register_dependency(self, parent_id: str, child_id: str) -> None:
    """Register a dependency relationship between flows."""
    if parent_id not in self._flows or child_id not in self._flows:
        raise FlowError(f"Cannot register dependency - flows not found")

    self._flow_graph.add_edge(parent_id, child_id)

    # Validate no cycles were created
    if not nx.is_directed_acyclic_graph(self._flow_graph):
        self._flow_graph.remove_edge(parent_id, child_id)
        raise FlowError("Adding dependency would create a cycle")

    logger.debug(f"Registered dependency: {parent_id} -> {child_id}")

register_flow

register_flow(flow: Flow) -> None

Register a flow with the context.

Source code in src/flow/core/context.py
def register_flow(self, flow: Flow) -> None:
    """Register a flow with the context."""
    if flow.process_id in self._flows:
        raise FlowError(f"Flow with id {flow.process_id} already registered")

    self._flows[flow.process_id] = flow
    self._flow_graph.add_node(flow.process_id)
    self._status_locks[flow.process_id] = asyncio.Lock()
    self._execution_locks[flow.process_id] = asyncio.Lock()

    logger.debug(f"Registered flow: {flow.process_id}")

wait_for_flows async

wait_for_flows(flow_ids: Set[str], timeout: Optional[float] = None) -> None

Wait for multiple flows to complete.

Source code in src/flow/core/context.py
async def wait_for_flows(self, flow_ids: Set[str], timeout: Optional[float] = None) -> None:
    """Wait for multiple flows to complete."""
    if not flow_ids:
        return

    async def wait_for_flow(flow_id: str) -> None:
        flow = self._flows.get(flow_id)
        if not flow:
            raise FlowError(f"Flow {flow_id} not found")

        async with self._status_locks[flow_id]:
            while flow.status not in (FlowStatus.COMPLETED, FlowStatus.FAILED):
                await asyncio.sleep(0.1)

    # Wait for all flows with timeout
    wait_tasks = [wait_for_flow(fid) for fid in flow_ids]
    try:
        await asyncio.wait_for(asyncio.gather(*wait_tasks), timeout=timeout)
    except asyncio.TimeoutError:
        raise FlowError(f"Timeout waiting for flows: {flow_ids}")

ProcessPoolManager

ProcessPoolManager(max_threads: Optional[int] = None, max_processes: Optional[int] = None)

Manages process and thread pools for flow execution.

Source code in src/flow/execution/pool.py
def __init__(
    self,
    max_threads: Optional[int] = None,
    max_processes: Optional[int] = None
):
    self.max_threads = max_threads or mp.cpu_count()
    self.max_processes = max_processes or mp.cpu_count()

    # Initialize pools
    self._thread_pool = ThreadPoolExecutor(max_workers=self.max_threads)
    self._process_pool = ProcessPoolExecutor(max_workers=self.max_processes)

    # Track active tasks
    self._futures: Dict[str, Future] = {}
    self._locks: Dict[str, asyncio.Lock] = {}
    self._loop = asyncio.get_event_loop()

active_tasks property

active_tasks: Set[str]

Get set of currently active task IDs.

cancel_task async

cancel_task(process_id: str) -> None

Cancel a running task.

Source code in src/flow/execution/pool.py
async def cancel_task(self, process_id: str) -> None:
    """Cancel a running task."""
    future = self._futures.get(process_id)
    if future and not future.done():
        future.cancel()
        self._cleanup_task(process_id)

shutdown

shutdown(wait: bool = True) -> None

Shutdown the pool manager.

Source code in src/flow/execution/pool.py
def shutdown(self, wait: bool = True) -> None:
    """Shutdown the pool manager."""
    # Cancel all running tasks
    for process_id in list(self._futures.keys()):
        asyncio.create_task(self.cancel_task(process_id))

    # Shutdown pools
    self._thread_pool.shutdown(wait=wait)
    self._process_pool.shutdown(wait=wait)

    # Clear tracking
    self._futures.clear()
    self._locks.clear()

submit_task async

submit_task(
    process_id: str,
    flow_type: FlowType,
    func: Callable,
    input_data: Dict[str, Any],
    timeout: Optional[float] = None,
) -> Any

Submit a task for execution.

Source code in src/flow/execution/pool.py
async def submit_task(
    self,
    process_id: str,
    flow_type: FlowType,
    func: Callable,
    input_data: Dict[str, Any],
    timeout: Optional[float] = None,
) -> Any:
    """Submit a task for execution."""
    if process_id in self._futures:
        raise FlowError(f"Task {process_id} already running")

    self._locks[process_id] = asyncio.Lock()

    try:
        # Create the executor based on flow type
        executor = self._thread_pool if flow_type == FlowType.THREAD else self._process_pool

        # Submit the task
        future = executor.submit(func, input_data)
        self._futures[process_id] = future

        # Wait for completion
        try:
            if timeout:
                result = await asyncio.wait_for(
                    asyncio.wrap_future(future),
                    timeout=timeout
                )
            else:
                result = await asyncio.wrap_future(future)

            return result

        except asyncio.TimeoutError:
            future.cancel()
            raise FlowTimeoutError(f"Task {process_id} timed out after {timeout} seconds")

    finally:
        self._cleanup_task(process_id)

wait_for_task async

wait_for_task(process_id: str, timeout: Optional[float] = None) -> None

Wait for a specific task to complete.

Source code in src/flow/execution/pool.py
async def wait_for_task(self, process_id: str, timeout: Optional[float] = None) -> None:
    """Wait for a specific task to complete."""
    future = self._futures.get(process_id)
    if not future:
        return

    try:
        await asyncio.wait_for(
            self._loop.run_in_executor(None, future.result),
            timeout=timeout
        )
    except asyncio.TimeoutError:
        raise FlowTimeoutError(f"Timeout waiting for task {process_id}")