Skip to content

results

results

FlowResult

Bases: BaseModel

Immutable result of a completed flow execution.

create_completed classmethod

create_completed(
    process_id: str,
    output: Dict[str, Any],
    start_time: datetime,
    metadata: Optional[Dict[str, Any]] = None,
) -> FlowResult

Create a result for a completed flow.

Source code in src/flow/core/results.py
@classmethod
def create_completed(
    cls, 
    process_id: str,
    output: Dict[str, Any],
    start_time: datetime,
    metadata: Optional[Dict[str, Any]] = None
) -> FlowResult:
    """Create a result for a completed flow."""
    return cls(
        process_id=process_id,
        status=FlowStatus.COMPLETED,
        start_time=start_time,
        end_time=datetime.now(),
        output=output,
        metadata=metadata or {}
    )

create_failed classmethod

create_failed(
    process_id: str,
    error: str,
    start_time: datetime,
    traceback: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None,
) -> "FlowResult"

Create a result for a failed flow.

Source code in src/flow/core/results.py
@classmethod
def create_failed(
    cls,
    process_id: str,
    error: str,
    start_time: datetime,
    traceback: Optional[str] = None,
    metadata: Optional[Dict[str, Any]] = None
) -> 'FlowResult':
    """Create a result for a failed flow."""
    return cls(
        process_id=process_id,
        status=FlowStatus.FAILED,
        start_time=start_time,
        end_time=datetime.now(),
        error=error,
        traceback=traceback,
        metadata=metadata or {}
    )

ResultsManager

ResultsManager(context: 'FlowContext')

Manages immutable flow results.

Source code in src/flow/core/results.py
def __init__(self, context: 'FlowContext'):
    self.context = context
    self._results: Dict[str, FlowResult] = {}
    self._lock = asyncio.Lock()
    logger.debug("ResultsManager initialized")

cleanup

cleanup() -> None

Clear all stored results.

Source code in src/flow/core/results.py
def cleanup(self) -> None:
    """Clear all stored results."""
    self._results.clear()
    logger.debug("ResultsManager cleaned up")

get_dependency_output async

get_dependency_output(process_id: str, dep_id: str) -> Optional[Dict[str, Any]]

Get output from a specific dependency.

Source code in src/flow/core/results.py
async def get_dependency_output(self, process_id: str, dep_id: str) -> Optional[Dict[str, Any]]:
    """Get output from a specific dependency."""
    async with self._lock:
        result = self._results.get(dep_id)
        if result and result.output:
            logger.debug(f"Retrieved dependency {dep_id} output for {process_id}: {result.output}")
            return result.output
        return None

get_dependency_outputs async

get_dependency_outputs(process_id: str, dep_ids: set[str]) -> Dict[str, Any]

Get combined outputs from multiple dependencies.

Source code in src/flow/core/results.py
async def get_dependency_outputs(self, process_id: str, dep_ids: set[str]) -> Dict[str, Any]:
    """Get combined outputs from multiple dependencies."""
    async with self._lock:
        outputs = {}
        for dep_id in dep_ids:
            result = self._results.get(dep_id)
            if result and result.output:
                # Store outputs with process ID as prefix to avoid collisions
                prefixed_outputs = {
                    f"{dep_id}.{k}": v 
                    for k, v in result.output.items()
                }
                outputs.update(prefixed_outputs)
        logger.debug(f"Retrieved dependency outputs for {process_id}: {outputs}")
        return outputs

get_result async

get_result(process_id: str) -> Optional[FlowResult]

Get an immutable flow result.

Source code in src/flow/core/results.py
async def get_result(self, process_id: str) -> Optional[FlowResult]:
    """Get an immutable flow result."""
    async with self._lock:
        result = self._results.get(process_id)
        logger.debug(f"Retrieved result for process {process_id}: {result}")
        return result

save_result async

save_result(process_id: str, result: FlowResult) -> None

Save an immutable flow result.

Source code in src/flow/core/results.py
async def save_result(self, process_id: str, result: FlowResult) -> None:
    """Save an immutable flow result."""
    async with self._lock:
        self._results[process_id] = result
        logger.debug(f"Saved result for process {process_id}: {result}")