Skip to content

f5

f5

Fixed Flow implementation that properly abstracts execution details.

Flow

Flow(function: Optional[Callable], config: FlowConfig, process_id: Optional[str] = None)

Core flow implementation that wraps user-defined processors.

Source code in src/flow/core/f5.py
def __init__(
    self,
    function: Optional[Callable],
    config: FlowConfig,
    process_id: Optional[str] = None
):
    from flow.core.context import FlowContext

    self.function = function or (lambda i: None)
    self.config = config
    self.process_id = process_id or str(uuid.uuid4())
    self.context = FlowContext.get_instance()

    # Flow state
    self.status = FlowStatus.PENDING
    self._parent_flow = None
    self._dependencies: Dict[str, DependencyType] = {}
    self._dependent_flows: Set[str] = set()

    # Register with context
    self.context.register_flow(self)
    logger.debug(f"Initialized flow: {self.config.name} ({self.process_id})")

cancel async

cancel() -> None

Cancel the flow execution.

Source code in src/flow/core/f5.py
async def cancel(self) -> None:
    """Cancel the flow execution."""
    if self.status == FlowStatus.RUNNING:
        await self.context.pool_manager.cancel_task(self.process_id)
        self.status = FlowStatus.CANCELLED

execute async

execute(input_data: Optional[Dict[str, Any]] = None) -> FlowResult

Execute the flow and its dependencies.

Source code in src/flow/core/f5.py
async def execute(
    self,
    input_data: Optional[Dict[str, Any]] = None
) -> FlowResult:
    """Execute the flow and its dependencies."""
    input_data = input_data or {}
    logger.debug(f"Starting execution of {self.config.name} with input: {input_data}")

    start_time = datetime.now()
    self.status = FlowStatus.RUNNING

    try:
        # Execute processor
        output = self.function(input_data)
        logger.debug(f"{self.config.name} produced output: {output}")

        # Create and save completed result
        result = FlowResult.create_completed(
            process_id=self.process_id,
            output=output,
            start_time=start_time,
            metadata={"flow_name": self.config.name}
        )
        self.status = FlowStatus.COMPLETED

        await self.context.results_manager.save_result(
            self.process_id,
            result
        )

        # Execute dependent flows
        for dep_id in self._dependent_flows:
            dep_flow = self.context.get_flow(dep_id)
            if dep_flow:
                logger.debug(f"Executing dependent flow {dep_flow.config.name}")
                dep_input = {**input_data, **output}
                await dep_flow.execute(dep_input)

    except Exception as e:
        # Create and save failed result
        result = FlowResult.create_failed(
            process_id=self.process_id,
            error=str(e),
            start_time=start_time,
            traceback=traceback.format_exc(),
            metadata={"flow_name": self.config.name}
        )
        self.status = FlowStatus.FAILED

        await self.context.results_manager.save_result(
            self.process_id,
            result
        )
        raise

    return result

get_dependencies

get_dependencies(dep_type: Optional[DependencyType] = None) -> Set[str]

Get set of dependency process IDs, optionally filtered by type.

Source code in src/flow/core/f5.py
def get_dependencies(self, dep_type: Optional[DependencyType] = None) -> Set[str]:
    """Get set of dependency process IDs, optionally filtered by type."""
    if dep_type is None:
        return set(self._dependencies.keys())
    return {
        dep_id for dep_id, dtype in self._dependencies.items() 
        if dtype == dep_type
    }

register_to

register_to(
    parent_flow: Flow,
    required_deps: Optional[List[str]] = None,
    optional_deps: Optional[List[str]] = None,
) -> Self

Register this flow as a child of another flow.

Source code in src/flow/core/f5.py
def register_to(
    self,
    parent_flow: Flow,
    required_deps: Optional[List[str]] = None,
    optional_deps: Optional[List[str]] = None
) -> Self:
    """Register this flow as a child of another flow."""
    self._parent_flow = parent_flow

    # Register dependencies
    if required_deps:
        for dep_id in required_deps:
            dep_flow = self.context.get_flow(dep_id)
            if not dep_flow:
                raise FlowError(f"Required dependency {dep_id} not found")
            self._dependencies[dep_id] = DependencyType.REQUIRED
            dep_flow._dependent_flows.add(self.process_id)
            self.context.register_dependency(dep_id, self.process_id)

    if optional_deps:
        for dep_id in optional_deps:
            dep_flow = self.context.get_flow(dep_id)
            if not dep_flow:
                raise FlowError(f"Optional dependency {dep_id} not found")
            self._dependencies[dep_id] = DependencyType.OPTIONAL
            dep_flow._dependent_flows.add(self.process_id)
            self.context.register_dependency(dep_id, self.process_id)
    return self

FlowConfig

Bases: BaseModel

Configuration for a flow.