Skip to content

f1

f1

Fixed Flow implementation that properly abstracts execution details.

Flow

Flow(processor: Any, config: FlowConfig, process_id: Optional[str] = None)

Core flow implementation that wraps user-defined processors.

Source code in src/flow/core/f1.py
def __init__(
    self,
    processor: Any,
    config: FlowConfig,
    process_id: Optional[str] = None
):
    from flow.core.context import FlowContext

    self.processor = processor
    self.config = config
    self.process_id = process_id or str(uuid.uuid4())
    self.context = FlowContext.get_instance()

    # Flow state
    self.status = FlowStatus.PENDING
    self._parent_flow = None
    self._dependencies: Dict[str, DependencyType] = {}
    self._dependent_flows: Set[str] = set()

    # Initialize logger
    self.logger = logging.getLogger(f"flow.{self.config.name}")

    # Register with context
    self.context.register_flow(self)

    logger.debug(f"Initialized flow: {self.config.name} ({self.process_id})")

cancel async

cancel() -> None

Cancel the flow execution.

Source code in src/flow/core/f1.py
async def cancel(self) -> None:
    """Cancel the flow execution."""
    if self.status == FlowStatus.RUNNING:
        await self.context.pool_manager.cancel_task(self.process_id)
        self.status = FlowStatus.CANCELLED

execute async

execute(input_data: Optional[Dict[str, Any]] = None) -> FlowResult

Execute the flow and its dependencies.

This is the public interface that users should call.

Source code in src/flow/core/f1.py
async def execute(
        self,
        input_data: Optional[Dict[str, Any]] = None
    ) -> FlowResult:
    """Execute the flow and its dependencies.

    This is the public interface that users should call.
    """
    monitoring_service = MonitoringService.get_instance()

    async with monitoring_service.monitor_flow(self):
        try:
            await monitoring_service.record_flow_event(
                self,
                "execution_started",
                f"Started execution of flow {self.config.name}",
                LoggingLevel.INFO
            )

            result = await self._execute_with_dependencies(input_data or {})

            await monitoring_service.record_flow_event(
                self,
                "execution_completed",
                f"Completed execution of flow {self.config.name}",
                LoggingLevel.INFO,
                {"status": result.status}
            )

            return result

        except Exception as e:
            await monitoring_service.record_flow_event(
                self,
                "execution_failed",
                f"Flow {self.config.name} failed: {str(e)}",
                LoggingLevel.ERROR
            )
            raise

register_to

register_to(
    parent_flow: "Flow",
    required_deps: Optional[List[str]] = None,
    optional_deps: Optional[List[str]] = None,
) -> None

Register this flow as a child of another flow.

Source code in src/flow/core/f1.py
def register_to(
    self,
    parent_flow: 'Flow',
    required_deps: Optional[List[str]] = None,
    optional_deps: Optional[List[str]] = None
) -> None:
    """Register this flow as a child of another flow."""
    self._parent_flow = parent_flow

    # Register dependencies
    if required_deps:
        for dep_id in required_deps:
            dep_flow = self.context.get_flow(dep_id)
            if not dep_flow:
                raise FlowError(f"Required dependency {dep_id} not found")
            self._dependencies[dep_id] = DependencyType.REQUIRED
            dep_flow._dependent_flows.add(self.process_id)
            self.context.register_dependency(dep_id, self.process_id)

    if optional_deps:
        for dep_id in optional_deps:
            dep_flow = self.context.get_flow(dep_id)
            if not dep_flow:
                raise FlowError(f"Optional dependency {dep_id} not found")
            self._dependencies[dep_id] = DependencyType.OPTIONAL
            dep_flow._dependent_flows.add(self.process_id)
            self.context.register_dependency(dep_id, self.process_id)

FlowConfig

Bases: BaseModel

Configuration for a flow.

FlowResult

Bases: BaseModel

Result of a flow execution.