Skip to content

flow

flow

Flow package for managing complex workflows.

Flow

Flow(
    name: str,
    callable: Callable[[Any], Any],
    flow_tree: Optional[FlowTree] = None,
    required_prerequisites: Optional[List[Flow]] = None,
    optional_prerequisites: Optional[List[Flow]] = None,
)

Core flow implementation.

Source code in src/flow/core/flow.py
def __init__(
    self,
    name: str,
    callable: Callable[[Any], Any],
    flow_tree: Optional[FlowTree] = None,
    required_prerequisites: Optional[List[Flow]] = None,
    optional_prerequisites: Optional[List[Flow]] = None,
):
    self.callable = callable
    self.config = FlowConfig(name=name)
    self.status = FlowStatus.PENDING
    logger.debug(f"Initialized flow: {self.config.name} ({self.config.name})")
    self._flow_tree = None


    if flow_tree is not None:
        self.register_to_flow_tree(flow_tree)

    if required_prerequisites is not None:
        assert flow_tree is not None
        for prereq in required_prerequisites:
            flow_tree.add_prerequisite(flow_id=self.id, prerequisite_id=prereq.id)

    if optional_prerequisites is not None:
        assert flow_tree is not None
        for prereq in optional_prerequisites:
            flow_tree.add_prerequisite(flow_id=self.id, prerequisite_id=prereq.id)

add_prerequisite

add_prerequisite(prerequisite_flow: Flow) -> None

Add a prerequisite flow.

Source code in src/flow/core/flow.py
def add_prerequisite(self, prerequisite_flow: Flow) -> None:
    """Add a prerequisite flow."""
    self._flow_tree.add_prerequisite(self.id, prerequisite_flow.id)

execute async

execute(input_data: Dict[str, Any]) -> FlowResult

Execute the flow.

Source code in src/flow/core/flow.py
async def execute(self, input_data: Dict[str, Any]) -> FlowResult:
    """Execute the flow."""
    logger.debug(f"Executing flow {self.config.name} with input: {input_data}")
    start_time = datetime.now()
    self.status = FlowStatus.RUNNING

    try:
        output = self.callable(input_data)
        logger.debug(f"Flow {self.config.name} produced output: {output}")

        result = FlowResult.create_completed(
            process_id=self.config.name,
            output=output,
            start_time=start_time,
            metadata={"flow_name": self.config.name}
        )
        self.status = FlowStatus.COMPLETED

    except Exception as e:
        logger.error(f"Flow {self.config.name} execution failed: {e}")
        result = FlowResult.create_failed(
            process_id=self.config.name,
            error=str(e),
            start_time=start_time,
            traceback=traceback.format_exc(),
            metadata={"flow_name": self.config.name}
        )
        self.status = FlowStatus.FAILED
        raise

    return result

FlowError

Bases: Exception

Base class for flow errors.

FlowExecutionError

FlowExecutionError(message: str, traceback: str)

Bases: FlowError

Error during flow execution.

Source code in src/flow/core/errors.py
def __init__(self, message: str, traceback: str):
    super().__init__(message)
    self.traceback = traceback

FlowResult

Bases: BaseModel

Immutable result of a completed flow execution.

create_completed classmethod

create_completed(
    process_id: str,
    output: Dict[str, Any],
    start_time: datetime,
    metadata: Optional[Dict[str, Any]] = None,
) -> FlowResult

Create a result for a completed flow.

Source code in src/flow/core/results.py
@classmethod
def create_completed(
    cls, 
    process_id: str,
    output: Dict[str, Any],
    start_time: datetime,
    metadata: Optional[Dict[str, Any]] = None
) -> FlowResult:
    """Create a result for a completed flow."""
    return cls(
        process_id=process_id,
        status=FlowStatus.COMPLETED,
        start_time=start_time,
        end_time=datetime.now(),
        output=output,
        metadata=metadata or {}
    )

create_failed classmethod

create_failed(
    process_id: str,
    error: str,
    start_time: datetime,
    traceback: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None,
) -> "FlowResult"

Create a result for a failed flow.

Source code in src/flow/core/results.py
@classmethod
def create_failed(
    cls,
    process_id: str,
    error: str,
    start_time: datetime,
    traceback: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None
) -> 'FlowResult':
    """Create a result for a failed flow."""
    return cls(
        process_id=process_id,
        status=FlowStatus.FAILED,
        start_time=start_time,
        end_time=datetime.now(),
        error=error,
        traceback=traceback,
        metadata=metadata or {}
    )

FlowRetryError

FlowRetryError(message: str, original_error: Exception)

Bases: FlowError

Flow retry attempts exhausted.

Source code in src/flow/core/errors.py
def __init__(self, message: str, original_error: Exception):
    super().__init__(message)
    self.original_error = original_error

FlowStatus

Bases: Enum

Status of flow execution.

FlowTimeoutError

Bases: FlowError

Flow execution timed out.

FlowType

Bases: Enum

Type of flow execution.

MissingDependencyError

Bases: FlowError

Required dependency not found.

StorageType

Bases: Enum

Type of result storage.