Skip to content

f2

f2

Fixed Flow implementation that properly abstracts execution details.

Flow

Flow(processor: Any, config: FlowConfig, process_id: Optional[str] = None)

Core flow implementation that wraps user-defined processors.

Source code in src/flow/core/f2.py
def __init__(
    self,
    processor: Any,
    config: FlowConfig,
    process_id: Optional[str] = None
):
    from flow.core.context import FlowContext

    self.processor = processor
    self.config = config
    self.process_id = process_id or str(uuid.uuid4())
    self.context = FlowContext.get_instance()

    # Flow state
    self.status = FlowStatus.PENDING
    self._parent_flow = None
    self._dependencies: Dict[str, DependencyType] = {}
    self._dependent_flows: Set[str] = set()

    # Register with context
    self.context.register_flow(self)
    logger.debug(f"Initialized flow: {self.config.name} ({self.process_id})")

cancel async

cancel() -> None

Cancel the flow execution.

Source code in src/flow/core/f2.py
async def cancel(self) -> None:
    """Cancel the flow execution."""
    if self.status == FlowStatus.RUNNING:
        await self.context.pool_manager.cancel_task(self.process_id)
        self.status = FlowStatus.CANCELLED

execute async

execute(input_data: Optional[Dict[str, Any]] = None) -> FlowResult

Execute the flow and its dependencies.

Source code in src/flow/core/f2.py
async def execute(
    self,
    input_data: Optional[Dict[str, Any]] = None
) -> FlowResult:
    """Execute the flow and its dependencies."""
    logger.debug(f"Starting execution of flow {self.config.name} ({self.process_id})")
    input_data = input_data or {}

    # Initialize result first
    result = FlowResult(
        process_id=self.process_id,
        status=FlowStatus.RUNNING,
        start_time=datetime.now(),
        metadata={}
    )

    try:
        # Set status before dependency execution
        self.status = FlowStatus.RUNNING
        logger.debug(f"Flow {self.config.name} status set to RUNNING")

        # Execute dependencies
        logger.debug(f"Executing dependencies for {self.config.name}")
        deps_data = await self._execute_dependencies(input_data)
        logger.debug(f"Dependencies completed for {self.config.name}")

        # Merge input data
        execution_data = {**deps_data, **input_data}
        logger.debug(f"Prepared execution data for {self.config.name}")

        # Execute processor
        logger.debug(f"Executing processor for {self.config.name}")
        try:
            # Direct execution without process pool for debugging
            if hasattr(self.processor, 'process'):
                output = self.processor.process(execution_data)
            else:
                output = self.processor(execution_data)

            # Ensure output is a dictionary
            if not isinstance(output, dict):
                output = {"result": output}

            logger.debug(f"Processor execution completed for {self.config.name}")

            # Update result
            result.status = FlowStatus.COMPLETED
            result.output = output
            self.status = FlowStatus.COMPLETED

        except Exception as e:
            logger.error(f"Processor execution failed for {self.config.name}: {e}")
            result.status = FlowStatus.FAILED
            result.error = str(e)
            result.traceback = traceback.format_exc()
            self.status = FlowStatus.FAILED
            raise

    except Exception as e:
        logger.error(f"Flow execution failed for {self.config.name}: {e}")
        result.status = FlowStatus.FAILED
        result.error = str(e)
        result.traceback = traceback.format_exc()
        self.status = FlowStatus.FAILED
        raise

    finally:
        # Always save result and update timing
        result.end_time = datetime.now()
        logger.debug(f"Saving result for {self.config.name}")
        await self.context.results_manager.save_result(
            self.process_id,
            result
        )
        logger.debug(f"Result saved for {self.config.name}")

    logger.debug(f"Flow {self.config.name} execution completed with status {result.status}")
    return result

get_dependencies

get_dependencies(dep_type: Optional[DependencyType] = None) -> Set[str]

Get set of dependency process IDs, optionally filtered by type.

Source code in src/flow/core/f2.py
def get_dependencies(self, dep_type: Optional[DependencyType] = None) -> Set[str]:
    """Get set of dependency process IDs, optionally filtered by type."""
    if dep_type is None:
        return set(self._dependencies.keys())
    return {
        dep_id for dep_id, dtype in self._dependencies.items() 
        if dtype == dep_type
    }

register_to

register_to(
    parent_flow: "Flow",
    required_deps: Optional[List[str]] = None,
    optional_deps: Optional[List[str]] = None,
) -> None

Register this flow as a child of another flow.

Source code in src/flow/core/f2.py
def register_to(
    self,
    parent_flow: 'Flow',
    required_deps: Optional[List[str]] = None,
    optional_deps: Optional[List[str]] = None
) -> None:
    """Register this flow as a child of another flow."""
    self._parent_flow = parent_flow

    # Register dependencies
    if required_deps:
        for dep_id in required_deps:
            dep_flow = self.context.get_flow(dep_id)
            if not dep_flow:
                raise FlowError(f"Required dependency {dep_id} not found")
            self._dependencies[dep_id] = DependencyType.REQUIRED
            dep_flow._dependent_flows.add(self.process_id)
            self.context.register_dependency(dep_id, self.process_id)

    if optional_deps:
        for dep_id in optional_deps:
            dep_flow = self.context.get_flow(dep_id)
            if not dep_flow:
                raise FlowError(f"Optional dependency {dep_id} not found")
            self._dependencies[dep_id] = DependencyType.OPTIONAL
            dep_flow._dependent_flows.add(self.process_id)
            self.context.register_dependency(dep_id, self.process_id)

FlowConfig

Bases: BaseModel

Configuration for a flow.

FlowResult

Bases: BaseModel

Result of a flow execution.