Skip to content

results_manager

results_manager

AsyncFileBackend

AsyncFileBackend(
    base_dir: Union[str, Path],
    create_if_missing: bool = True,
    locks_dir: Optional[Union[str, Path]] = None,
)

Bases: AsyncResultsBackend[T]

Async wrapper for FileBackend.

Runs the synchronous FileBackend methods in a threadpool to avoid blocking the event loop.

Initialize the AsyncFileBackend.

Parameters:

Name Type Description Default
base_dir Union[str, Path]

Base directory to store results

required
create_if_missing bool

Whether to create the directory if it doesn't exist

True
locks_dir Optional[Union[str, Path]]

Directory to store lock files. If None, uses a system temp directory.

None
Source code in src/results_manager/async_backends/file_backend.py
def __init__(self, base_dir: Union[str, Path], create_if_missing: bool = True, locks_dir: Optional[Union[str, Path]] = None):
    """
    Initialize the AsyncFileBackend.

    Args:
        base_dir: Base directory to store results
        create_if_missing: Whether to create the directory if it doesn't exist
        locks_dir: Directory to store lock files. If None, uses a system temp directory.
    """
    # Create the synchronous backend
    self._backend = FileBackend(base_dir, create_if_missing, locks_dir)

clear async

clear() -> None

Asynchronously clear all stored results.

Source code in src/results_manager/async_backends/file_backend.py
async def clear(self) -> None:
    """
    Asynchronously clear all stored results.
    """
    await asyncio.to_thread(
        self._backend.clear
    )

delete async

delete(result_id: Union[str, List[str]]) -> bool

Asynchronously delete a result by ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required

Returns:

Type Description
bool

True if deleted, False if not found

Source code in src/results_manager/async_backends/file_backend.py
async def delete(self, result_id: Union[str, List[str]]) -> bool:
    """
    Asynchronously delete a result by ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result

    Returns:
        True if deleted, False if not found
    """
    return await asyncio.to_thread(
        self._backend.delete,
        result_id=result_id
    )

exists async

exists(result_id: Union[str, List[str]]) -> bool

Asynchronously check if a result exists for the given ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required

Returns:

Type Description
bool

True if the result exists, False otherwise

Source code in src/results_manager/async_backends/file_backend.py
async def exists(self, result_id: Union[str, List[str]]) -> bool:
    """
    Asynchronously check if a result exists for the given ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result

    Returns:
        True if the result exists, False otherwise
    """
    return await asyncio.to_thread(
        self._backend.exists,
        result_id=result_id
    )

get async

get(
    result_id: Union[str, List[str]],
    model_class: Optional[Type[T]] = None,
    namespace: Optional[str] = None,
) -> T

Asynchronously retrieve a result by ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required
model_class Optional[Type[T]]

Optional model class to validate against

None
namespace Optional[str]

Optional namespace to look in

None

Returns:

Type Description
T

Pydantic model instance

Source code in src/results_manager/async_backends/file_backend.py
async def get(self, 
             result_id: Union[str, List[str]], 
             model_class: Optional[Type[T]] = None,
             namespace: Optional[str] = None) -> T:
    """
    Asynchronously retrieve a result by ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result
        model_class: Optional model class to validate against
        namespace: Optional namespace to look in

    Returns:
        Pydantic model instance
    """
    return await asyncio.to_thread(
        self._backend.get,
        result_id=result_id,
        model_class=model_class,
        namespace=namespace
    )

list_ids async

list_ids(prefix: Union[str, List[str]] = None) -> List[str]

Asynchronously list all result IDs, optionally filtered by a prefix.

Parameters:

Name Type Description Default
prefix Union[str, List[str]]

Optional prefix path to filter results

None

Returns:

Type Description
List[str]

List of result IDs

Source code in src/results_manager/async_backends/file_backend.py
async def list_ids(self, prefix: Union[str, List[str]] = None) -> List[str]:
    """
    Asynchronously list all result IDs, optionally filtered by a prefix.

    Args:
        prefix: Optional prefix path to filter results

    Returns:
        List of result IDs
    """
    return await asyncio.to_thread(
        self._backend.list_ids,
        prefix=prefix
    )

set async

set(
    result_id: Union[str, List[str]],
    data: BaseModel,
    behavior: SetBehavior = RAISE_IF_EXISTS,
    namespace: Optional[str] = None,
    strict_namespace: bool = False,
) -> bool

Asynchronously store a result with the given ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required
data BaseModel

Pydantic model instance to store

required
behavior SetBehavior

How to handle existing data with the same ID

RAISE_IF_EXISTS
namespace Optional[str]

Optional namespace to store the model in

None
strict_namespace bool

If True, raises an error if model is in multiple namespaces

False

Returns:

Type Description
bool

True if data was written, False if skipped

Source code in src/results_manager/async_backends/file_backend.py
async def set(self, 
             result_id: Union[str, List[str]], 
             data: BaseModel, 
             behavior: SetBehavior = SetBehavior.RAISE_IF_EXISTS,
             namespace: Optional[str] = None,
             strict_namespace: bool = False) -> bool:
    """
    Asynchronously store a result with the given ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result
        data: Pydantic model instance to store
        behavior: How to handle existing data with the same ID
        namespace: Optional namespace to store the model in
        strict_namespace: If True, raises an error if model is in multiple namespaces

    Returns:
        True if data was written, False if skipped
    """
    return await asyncio.to_thread(
        self._backend.set,
        result_id=result_id,
        data=data,
        behavior=behavior,
        namespace=namespace,
        strict_namespace=strict_namespace
    )

AsyncResultsBackend

Bases: Generic[T], ABC

Abstract base class for async results storage backends.

Implementations should provide asynchronous storage and retrieval of Pydantic models.

clear abstractmethod async

clear() -> None

Asynchronously clear all stored results.

Source code in src/results_manager/async_backends/base.py
@abstractmethod
async def clear(self) -> None:
    """
    Asynchronously clear all stored results.
    """
    pass

delete abstractmethod async

delete(result_id: Union[str, List[str]]) -> bool

Asynchronously delete a result by ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required

Returns:

Type Description
bool

True if deleted, False if not found

Source code in src/results_manager/async_backends/base.py
@abstractmethod
async def delete(self, result_id: Union[str, List[str]]) -> bool:
    """
    Asynchronously delete a result by ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result

    Returns:
        True if deleted, False if not found
    """
    pass

exists abstractmethod async

exists(result_id: Union[str, List[str]]) -> bool

Asynchronously check if a result exists for the given ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required

Returns:

Type Description
bool

True if the result exists, False otherwise

Source code in src/results_manager/async_backends/base.py
@abstractmethod
async def exists(self, result_id: Union[str, List[str]]) -> bool:
    """
    Asynchronously check if a result exists for the given ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result

    Returns:
        True if the result exists, False otherwise
    """
    pass

get abstractmethod async

get(
    result_id: Union[str, List[str]],
    model_class: Optional[Type[T]] = None,
    namespace: Optional[str] = None,
) -> T

Asynchronously retrieve a result by ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required
model_class Optional[Type[T]]

Optional model class to validate against

None
namespace Optional[str]

Optional namespace to look in

None

Returns:

Type Description
T

Pydantic model instance

Raises:

Type Description
FileNotFoundError

If the result doesn't exist

ValueError

If the model type is not registered

Source code in src/results_manager/async_backends/base.py
@abstractmethod
async def get(self, 
             result_id: Union[str, List[str]], 
             model_class: Optional[Type[T]] = None,
             namespace: Optional[str] = None) -> T:
    """
    Asynchronously retrieve a result by ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result
        model_class: Optional model class to validate against
        namespace: Optional namespace to look in

    Returns:
        Pydantic model instance

    Raises:
        FileNotFoundError: If the result doesn't exist
        ValueError: If the model type is not registered
    """
    pass

list_ids abstractmethod async

list_ids(prefix: Union[str, List[str]] = None) -> List[str]

Asynchronously list all result IDs, optionally filtered by a prefix.

Parameters:

Name Type Description Default
prefix Union[str, List[str]]

Optional prefix path to filter results

None

Returns:

Type Description
List[str]

List of result IDs

Source code in src/results_manager/async_backends/base.py
@abstractmethod
async def list_ids(self, prefix: Union[str, List[str]] = None) -> List[str]:
    """
    Asynchronously list all result IDs, optionally filtered by a prefix.

    Args:
        prefix: Optional prefix path to filter results

    Returns:
        List of result IDs
    """
    pass

set abstractmethod async

set(
    result_id: Union[str, List[str]],
    data: BaseModel,
    behavior: SetBehavior = RAISE_IF_EXISTS,
    namespace: Optional[str] = None,
    strict_namespace: bool = False,
) -> bool

Asynchronously store a result with the given ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required
data BaseModel

Pydantic model instance to store

required
behavior SetBehavior

How to handle existing data with the same ID

RAISE_IF_EXISTS
namespace Optional[str]

Optional namespace to store the model in

None
strict_namespace bool

If True, raises an error if model is in multiple namespaces

False

Returns:

Type Description
bool

True if data was written, False if skipped

Raises:

Type Description
FileExistsError

If data already exists (for RAISE_IF_EXISTS) or if different data exists (for RAISE_IF_DIFFERENT)

Source code in src/results_manager/async_backends/base.py
@abstractmethod
async def set(self, 
             result_id: Union[str, List[str]], 
             data: BaseModel, 
             behavior: SetBehavior = SetBehavior.RAISE_IF_EXISTS,
             namespace: Optional[str] = None,
             strict_namespace: bool = False) -> bool:
    """
    Asynchronously store a result with the given ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result
        data: Pydantic model instance to store
        behavior: How to handle existing data with the same ID
        namespace: Optional namespace to store the model in
        strict_namespace: If True, raises an error if model is in multiple namespaces

    Returns:
        True if data was written, False if skipped

    Raises:
        FileExistsError: If data already exists (for RAISE_IF_EXISTS) or
                         if different data exists (for RAISE_IF_DIFFERENT)
    """
    pass

AsyncResultsManager

AsyncResultsManager(
    base_dir: Union[str, Path] = None,
    create_if_missing: bool = True,
    backend: Optional[AsyncResultsBackend] = None,
)

Bases: Generic[T]

Async version of ResultsManager for managing results from parallel processes.

Provides an asynchronous interface for storing and retrieving pydantic models.

Initialize the AsyncResultsManager.

Parameters:

Name Type Description Default
base_dir Union[str, Path]

Base directory for file storage (used only if backend is None)

None
create_if_missing bool

Whether to create the directory if it doesn't exist

True
backend Optional[AsyncResultsBackend]

Optional custom async backend to use. If None, uses AsyncFileBackend.

None
Source code in src/results_manager/async_manager.py
def __init__(self, 
             base_dir: Union[str, Path] = None, 
             create_if_missing: bool = True, 
             backend: Optional[AsyncResultsBackend] = None):
    """
    Initialize the AsyncResultsManager.

    Args:
        base_dir: Base directory for file storage (used only if backend is None)
        create_if_missing: Whether to create the directory if it doesn't exist
        backend: Optional custom async backend to use. If None, uses AsyncFileBackend.
    """
    if backend is None:
        if base_dir is None:
            raise ValueError("Must provide either base_dir or backend")
        self.backend = AsyncFileBackend(base_dir, create_if_missing)
    else:
        self.backend = backend

clear async

clear() -> None

Asynchronously clear all stored results.

Source code in src/results_manager/async_manager.py
async def clear(self) -> None:
    """
    Asynchronously clear all stored results.
    """
    await self.backend.clear()

delete async

delete(result_id: Union[str, List[str]]) -> bool

Asynchronously delete a result by ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required

Returns:

Type Description
bool

True if deleted, False if not found

Source code in src/results_manager/async_manager.py
async def delete(self, result_id: Union[str, List[str]]) -> bool:
    """
    Asynchronously delete a result by ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result

    Returns:
        True if deleted, False if not found
    """
    return await self.backend.delete(result_id)

exists async

exists(result_id: Union[str, List[str]]) -> bool

Asynchronously check if a result exists for the given ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required

Returns:

Type Description
bool

True if the result exists, False otherwise

Source code in src/results_manager/async_manager.py
async def exists(self, result_id: Union[str, List[str]]) -> bool:
    """
    Asynchronously check if a result exists for the given ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result

    Returns:
        True if the result exists, False otherwise
    """
    return await self.backend.exists(result_id)

get async

get(
    result_id: Union[str, List[str]],
    model_class: Optional[Type[T]] = None,
    namespace: Optional[str] = None,
) -> T

Asynchronously retrieve a result by ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required
model_class Optional[Type[T]]

Optional model class to validate against

None
namespace Optional[str]

Optional namespace to look in

None

Returns:

Type Description
T

Pydantic model instance

Source code in src/results_manager/async_manager.py
async def get(self, 
             result_id: Union[str, List[str]], 
             model_class: Optional[Type[T]] = None,
             namespace: Optional[str] = None) -> T:
    """
    Asynchronously retrieve a result by ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result
        model_class: Optional model class to validate against
        namespace: Optional namespace to look in

    Returns:
        Pydantic model instance
    """
    return await self.backend.get(
        result_id=result_id,
        model_class=model_class,
        namespace=namespace
    )

list_ids async

list_ids(prefix: Union[str, List[str]] = None) -> List[str]

Asynchronously list all result IDs, optionally filtered by a prefix.

Parameters:

Name Type Description Default
prefix Union[str, List[str]]

Optional prefix path to filter results

None

Returns:

Type Description
List[str]

List of result IDs

Source code in src/results_manager/async_manager.py
async def list_ids(self, prefix: Union[str, List[str]] = None) -> List[str]:
    """
    Asynchronously list all result IDs, optionally filtered by a prefix.

    Args:
        prefix: Optional prefix path to filter results

    Returns:
        List of result IDs
    """
    return await self.backend.list_ids(prefix)

set async

set(
    result_id: Union[str, List[str]],
    data: BaseModel,
    behavior: SetBehavior = RAISE_IF_EXISTS,
    namespace: Optional[str] = None,
    strict_namespace: bool = False,
) -> bool

Asynchronously store a result with the given ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required
data BaseModel

Pydantic model instance to store

required
behavior SetBehavior

How to handle existing data with the same ID

RAISE_IF_EXISTS
namespace Optional[str]

Optional namespace to store the model in

None
strict_namespace bool

If True, raises an error if model is in multiple namespaces

False

Returns:

Type Description
bool

True if data was written, False if skipped

Source code in src/results_manager/async_manager.py
async def set(self, 
             result_id: Union[str, List[str]], 
             data: BaseModel, 
             behavior: SetBehavior = SetBehavior.RAISE_IF_EXISTS,
             namespace: Optional[str] = None,
             strict_namespace: bool = False) -> bool:
    """
    Asynchronously store a result with the given ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result
        data: Pydantic model instance to store
        behavior: How to handle existing data with the same ID
        namespace: Optional namespace to store the model in
        strict_namespace: If True, raises an error if model is in multiple namespaces

    Returns:
        True if data was written, False if skipped
    """
    return await self.backend.set(
        result_id=result_id, 
        data=data, 
        behavior=behavior, 
        namespace=namespace, 
        strict_namespace=strict_namespace
    )

FileBackend

FileBackend(
    base_dir: Union[str, Path],
    create_if_missing: bool = True,
    locks_dir: Optional[Union[str, Path]] = None,
)

Bases: ResultsBackend[T]

File-based implementation of ResultsBackend.

Stores results as JSON files in a hierarchical directory structure.

Initialize the FileBackend.

Parameters:

Name Type Description Default
base_dir Union[str, Path]

Base directory to store results

required
create_if_missing bool

Whether to create the directory if it doesn't exist

True
locks_dir Optional[Union[str, Path]]

Directory to store lock files. If None, uses a system temp directory.

None
Source code in src/results_manager/backends/file_backend.py
def __init__(self, base_dir: Union[str, Path], create_if_missing: bool = True, locks_dir: Optional[Union[str, Path]] = None):
    """
    Initialize the FileBackend.

    Args:
        base_dir: Base directory to store results
        create_if_missing: Whether to create the directory if it doesn't exist
        locks_dir: Directory to store lock files. If None, uses a system temp directory.
    """
    self.base_dir = Path(base_dir)

    if create_if_missing and not self.base_dir.exists():
        self.base_dir.mkdir(parents=True)
    elif not self.base_dir.exists():
        raise FileNotFoundError(f"Base directory {self.base_dir} does not exist")

    # Set up locks directory
    if locks_dir is None:
        self.locks_dir = Path(tempfile.gettempdir()) / "results_manager_locks"
    else:
        self.locks_dir = Path(locks_dir)

    # Create locks directory if it doesn't exist
    if not self.locks_dir.exists():
        self.locks_dir.mkdir(parents=True, exist_ok=True)

clear

clear() -> None

Clear all stored results.

Source code in src/results_manager/backends/file_backend.py
def clear(self) -> None:
    """
    Clear all stored results.
    """
    # For clear(), we'll use a more aggressive approach of deleting then recreating
    # the directory, which avoids having to lock individual files
    if self.base_dir.exists():
        # Create a temporary lock file for the entire directory
        lock_path = self.locks_dir / "clear_all.lock"
        with FileLock(lock_path):
            # Save the path
            path = self.base_dir
            # Delete everything
            shutil.rmtree(str(self.base_dir))
            # Recreate the directory
            self.base_dir.mkdir(parents=True)

delete

delete(result_id: Union[str, List[str]]) -> bool

Delete a result by ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required

Returns:

Type Description
bool

True if deleted, False if not found

Source code in src/results_manager/backends/file_backend.py
def delete(self, result_id: Union[str, List[str]]) -> bool:
    """
    Delete a result by ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result

    Returns:
        True if deleted, False if not found
    """
    file_path = self._get_path_from_id(result_id)
    lock_path = self._get_lock_path(file_path)

    # Use file lock to ensure thread/process safety
    with FileLock(lock_path):
        if not file_path.exists():
            return False

        file_path.unlink()

        # Try to clean up empty directories
        current_dir = file_path.parent
        while current_dir != self.base_dir:
            if not any(current_dir.iterdir()):
                current_dir.rmdir()
                current_dir = current_dir.parent
            else:
                break

        return True

exists

exists(result_id: Union[str, List[str]]) -> bool

Check if a result exists for the given ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required

Returns:

Type Description
bool

True if the result exists, False otherwise

Source code in src/results_manager/backends/file_backend.py
def exists(self, result_id: Union[str, List[str]]) -> bool:
    """
    Check if a result exists for the given ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result

    Returns:
        True if the result exists, False otherwise
    """
    file_path = self._get_path_from_id(result_id)
    lock_path = self._get_lock_path(file_path)

    # Use file lock to ensure consistent state
    with FileLock(lock_path):
        return file_path.exists()

get

get(
    result_id: Union[str, List[str]],
    model_class: Optional[Type[T]] = None,
    namespace: Optional[str] = None,
) -> T

Retrieve a result by ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required
model_class Optional[Type[T]]

Optional model class to validate against. If not provided, the stored model type will be used.

None
namespace Optional[str]

Optional namespace override to look for the model in

None

Returns:

Type Description
T

Pydantic model instance

Raises:

Type Description
FileNotFoundError

If the result doesn't exist

ValueError

If the model type is not registered

ValidationError

If the data doesn't match the model schema

Source code in src/results_manager/backends/file_backend.py
def get(self, 
        result_id: Union[str, List[str]], 
        model_class: Optional[Type[T]] = None,
        namespace: Optional[str] = None) -> T:
    """
    Retrieve a result by ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result
        model_class: Optional model class to validate against. If not provided,
                     the stored model type will be used.
        namespace: Optional namespace override to look for the model in

    Returns:
        Pydantic model instance

    Raises:
        FileNotFoundError: If the result doesn't exist
        ValueError: If the model type is not registered
        ValidationError: If the data doesn't match the model schema
    """
    file_path = self._get_path_from_id(result_id)
    lock_path = self._get_lock_path(file_path)

    # Use file lock to ensure thread/process safety
    with FileLock(lock_path):
        if not file_path.exists():
            raise FileNotFoundError(f"No result found for ID: {result_id}")

        with open(file_path, 'r') as f:
            stored_data = json.load(f)

        # Check for missing model_type even when model_class is provided
        model_type_name = stored_data.get("model_type")
        if not model_type_name:
            raise ValueError(f"Stored data missing model type information")

        # If no model class is provided, try to find it from the registry
        if not model_class:
            # Use the stored namespace if none provided
            stored_namespace = stored_data.get("namespace", DEFAULT_NAMESPACE)
            lookup_namespace = namespace if namespace is not None else stored_namespace

            model_class = get_model_class(model_type_name, namespace=lookup_namespace)

            # If not found in the specified namespace, try alternatives
            # Continue from where we left off:

        # If not found in the specified namespace, try alternatives
        if not model_class:
            # Try finding in all namespaces
            model_matches = find_model_in_all_namespaces(model_type_name)
            if model_matches:
                # Use the first match
                first_namespace, model_class = model_matches[0]
            else:
                namespaces_tried = [lookup_namespace]
                if lookup_namespace != DEFAULT_NAMESPACE:
                    namespaces_tried.append(DEFAULT_NAMESPACE)

                raise ValueError(
                    f"Model type '{model_type_name}' is not registered in "
                    f"namespace '{lookup_namespace}' or any other namespace. "
                    f"Tried namespaces: {', '.join(namespaces_tried)}"
                )

        # Get the data to validate outside the lock
        data = stored_data["data"]

    # Validate outside the lock to minimize lock time
    return model_class.model_validate(data)

list_ids

list_ids(prefix: Union[str, List[str]] = None) -> List[str]

List all result IDs, optionally filtered by a prefix.

Parameters:

Name Type Description Default
prefix Union[str, List[str]]

Optional prefix path to filter results

None

Returns:

Type Description
List[str]

List of result IDs

Source code in src/results_manager/backends/file_backend.py
def list_ids(self, prefix: Union[str, List[str]] = None) -> List[str]:
    """
    List all result IDs, optionally filtered by a prefix.

    Args:
        prefix: Optional prefix path to filter results

    Returns:
        List of result IDs
    """
    if prefix is None:
        base_path = self.base_dir
    else:
        if isinstance(prefix, str):
            prefix = prefix.split('/')
        base_path = self.base_dir.joinpath(*prefix)

    if not base_path.exists():
        return []

    result_ids = []
    # No need for locking as we're just reading directory structure
    for path in base_path.rglob("*.json"):
        # Convert path to relative path from base_dir
        rel_path = path.relative_to(self.base_dir)
        # Remove .json extension and convert to string
        result_id = str(rel_path.with_suffix(''))
        result_ids.append(result_id)

    return result_ids

set

set(
    result_id: Union[str, List[str]],
    data: BaseModel,
    behavior: SetBehavior = RAISE_IF_EXISTS,
    namespace: Optional[str] = None,
    strict_namespace: bool = False,
) -> bool

Store a result with the given ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required
data BaseModel

Pydantic model instance to store

required
behavior SetBehavior

How to handle existing data with the same ID

RAISE_IF_EXISTS
namespace Optional[str]

Optional namespace to store the model in

None
strict_namespace bool

If True, raises an error if model is in multiple namespaces

False

Returns:

Type Description
bool

True if data was written, False if skipped

Source code in src/results_manager/backends/file_backend.py
def set(self, 
        result_id: Union[str, List[str]], 
        data: BaseModel, 
        behavior: SetBehavior = SetBehavior.RAISE_IF_EXISTS,
        namespace: Optional[str] = None,
        strict_namespace: bool = False) -> bool:
    """
    Store a result with the given ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result
        data: Pydantic model instance to store
        behavior: How to handle existing data with the same ID
        namespace: Optional namespace to store the model in
        strict_namespace: If True, raises an error if model is in multiple namespaces

    Returns:
        True if data was written, False if skipped
    """
    file_path = self._get_path_from_id(result_id)
    lock_path = self._get_lock_path(file_path)

    # Use file lock with a timeout to avoid deadlocks
    with FileLock(lock_path, timeout=10):  # 10 second timeout
        # Handle existing data according to behavior
        if file_path.exists():
            if behavior == SetBehavior.RAISE_IF_EXISTS:
                raise FileExistsError(f"Data already exists for ID: {result_id}")

            elif behavior == SetBehavior.SKIP_IF_EXISTS:
                try:
                    # Simplified logic for SKIP_IF_EXISTS
                    with open(file_path, 'r') as f:
                        stored_data = json.load(f)

                    # Compare model types
                    if stored_data.get("model_type") == data.__class__.__name__:
                        # Direct comparison of dumped data
                        if stored_data.get("data") == data.model_dump():
                            return False  # Skip if exactly the same
                except (json.JSONDecodeError, KeyError, FileNotFoundError):
                    # If any error occurs during comparison, default to overwriting
                    pass

            elif behavior == SetBehavior.RAISE_IF_DIFFERENT:
                try:
                    # Load existing data for comparison
                    with open(file_path, 'r') as f:
                        stored_data = json.load(f)

                    # Compare model types
                    if stored_data.get("model_type") == data.__class__.__name__:
                        # Direct comparison of dumped data
                        if stored_data.get("data") != data.model_dump():
                            raise FileExistsError(f"Different data already exists for ID: {result_id}")
                except (json.JSONDecodeError, KeyError, FileNotFoundError):
                    # If we can't load the file properly, treat as different
                    raise FileExistsError(f"Invalid data exists for ID: {result_id}")

        # Determine the namespace to use
        if namespace is None:
            # Try to find the namespace from the model class
            try:
                model_namespace = find_model_namespace(data.__class__, strict=strict_namespace)
                if model_namespace is not None:
                    namespace = model_namespace
                else:
                    namespace = DEFAULT_NAMESPACE
            except ValueError as e:
                # Re-raise the error about multiple namespaces
                raise ValueError(
                    f"Cannot automatically determine namespace for {data.__class__.__name__} "
                    f"when saving to '{result_id}': {str(e)}"
                ) from e

        # Ensure the directory exists
        file_path.parent.mkdir(parents=True, exist_ok=True)

        # Store the model type and namespace along with the data
        serialized_data = {
            "model_type": data.__class__.__name__,
            "namespace": namespace,
            "data": data.model_dump()
        }

        # Use atomic write pattern for extra safety
        temp_file = file_path.with_suffix('.tmp')
        with open(temp_file, 'w') as f:
            json.dump(serialized_data, f, indent=2)

        # Rename is atomic on most filesystems
        temp_file.replace(file_path)

        return True

ResultsBackend

Bases: Generic[T], ABC

Abstract base class for results storage backends.

Implementations should provide storage and retrieval of Pydantic models based on unique IDs.

clear abstractmethod

clear() -> None

Clear all stored results.

Source code in src/results_manager/backends/base.py
@abstractmethod
def clear(self) -> None:
    """
    Clear all stored results.
    """
    pass

delete abstractmethod

delete(result_id: Union[str, List[str]]) -> bool

Delete a result by ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required

Returns:

Type Description
bool

True if deleted, False if not found

Source code in src/results_manager/backends/base.py
@abstractmethod
def delete(self, result_id: Union[str, List[str]]) -> bool:
    """
    Delete a result by ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result

    Returns:
        True if deleted, False if not found
    """
    pass

exists abstractmethod

exists(result_id: Union[str, List[str]]) -> bool

Check if a result exists for the given ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required

Returns:

Type Description
bool

True if the result exists, False otherwise

Source code in src/results_manager/backends/base.py
@abstractmethod
def exists(self, result_id: Union[str, List[str]]) -> bool:
    """
    Check if a result exists for the given ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result

    Returns:
        True if the result exists, False otherwise
    """
    pass

get abstractmethod

get(
    result_id: Union[str, List[str]],
    model_class: Optional[Type[T]] = None,
    namespace: Optional[str] = None,
) -> T

Retrieve a result by ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required
model_class Optional[Type[T]]

Optional model class to validate against

None
namespace Optional[str]

Optional namespace to look in

None

Returns:

Type Description
T

Pydantic model instance

Raises:

Type Description
FileNotFoundError

If the result doesn't exist

ValueError

If the model type is not registered

Source code in src/results_manager/backends/base.py
@abstractmethod
def get(self, 
        result_id: Union[str, List[str]], 
        model_class: Optional[Type[T]] = None,
        namespace: Optional[str] = None) -> T:
    """
    Retrieve a result by ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result
        model_class: Optional model class to validate against
        namespace: Optional namespace to look in

    Returns:
        Pydantic model instance

    Raises:
        FileNotFoundError: If the result doesn't exist
        ValueError: If the model type is not registered
    """
    pass

list_ids abstractmethod

list_ids(prefix: Union[str, List[str]] = None) -> List[str]

List all result IDs, optionally filtered by a prefix.

Parameters:

Name Type Description Default
prefix Union[str, List[str]]

Optional prefix path to filter results

None

Returns:

Type Description
List[str]

List of result IDs

Source code in src/results_manager/backends/base.py
@abstractmethod
def list_ids(self, prefix: Union[str, List[str]] = None) -> List[str]:
    """
    List all result IDs, optionally filtered by a prefix.

    Args:
        prefix: Optional prefix path to filter results

    Returns:
        List of result IDs
    """
    pass

set abstractmethod

set(
    result_id: Union[str, List[str]],
    data: BaseModel,
    behavior: SetBehavior = RAISE_IF_EXISTS,
    namespace: Optional[str] = None,
    strict_namespace: bool = False,
) -> bool

Store a result with the given ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required
data BaseModel

Pydantic model instance to store

required
behavior SetBehavior

How to handle existing data with the same ID

RAISE_IF_EXISTS
namespace Optional[str]

Optional namespace to store the model in

None
strict_namespace bool

If True, raises an error if model is in multiple namespaces

False

Returns:

Type Description
bool

True if data was written, False if skipped

Raises:

Type Description
FileExistsError

If data already exists (for RAISE_IF_EXISTS) or if different data exists (for RAISE_IF_DIFFERENT)

Source code in src/results_manager/backends/base.py
@abstractmethod
def set(self, 
        result_id: Union[str, List[str]], 
        data: BaseModel, 
        behavior: SetBehavior = SetBehavior.RAISE_IF_EXISTS,
        namespace: Optional[str] = None,
        strict_namespace: bool = False) -> bool:
    """
    Store a result with the given ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result
        data: Pydantic model instance to store
        behavior: How to handle existing data with the same ID
        namespace: Optional namespace to store the model in
        strict_namespace: If True, raises an error if model is in multiple namespaces

    Returns:
        True if data was written, False if skipped

    Raises:
        FileExistsError: If data already exists (for RAISE_IF_EXISTS) or
                         if different data exists (for RAISE_IF_DIFFERENT)
    """
    pass

ResultsManager

ResultsManager(
    base_dir: Union[str, Path] = None,
    create_if_missing: bool = True,
    backend: Optional[ResultsBackend] = None,
)

Bases: Generic[T]

Manages results from parallel processes, storing and retrieving pydantic models.

This class provides a unified interface to different storage backends.

Initialize the ResultsManager.

Parameters:

Name Type Description Default
base_dir Union[str, Path]

Base directory for file storage (used only if backend is None)

None
create_if_missing bool

Whether to create the directory if it doesn't exist

True
backend Optional[ResultsBackend]

Optional custom backend to use. If None, uses FileBackend.

None
Source code in src/results_manager/manager.py
def __init__(self, 
             base_dir: Union[str, Path] = None, 
             create_if_missing: bool = True, 
             backend: Optional[ResultsBackend] = None):
    """
    Initialize the ResultsManager.

    Args:
        base_dir: Base directory for file storage (used only if backend is None)
        create_if_missing: Whether to create the directory if it doesn't exist
        backend: Optional custom backend to use. If None, uses FileBackend.
    """
    if backend is None:
        if base_dir is None:
            raise ValueError("Must provide either base_dir or backend")
        self.backend = FileBackend(base_dir, create_if_missing)
    else:
        self.backend = backend

clear

clear() -> None

Clear all stored results.

Source code in src/results_manager/manager.py
def clear(self) -> None:
    """
    Clear all stored results.
    """
    self.backend.clear()

delete

delete(result_id: Union[str, List[str]]) -> bool

Delete a result by ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required

Returns:

Type Description
bool

True if deleted, False if not found

Source code in src/results_manager/manager.py
def delete(self, result_id: Union[str, List[str]]) -> bool:
    """
    Delete a result by ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result

    Returns:
        True if deleted, False if not found
    """
    return self.backend.delete(result_id)

exists

exists(result_id: Union[str, List[str]]) -> bool

Check if a result exists for the given ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required

Returns:

Type Description
bool

True if the result exists, False otherwise

Source code in src/results_manager/manager.py
def exists(self, result_id: Union[str, List[str]]) -> bool:
    """
    Check if a result exists for the given ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result

    Returns:
        True if the result exists, False otherwise
    """
    return self.backend.exists(result_id)

get

get(
    result_id: Union[str, List[str]],
    model_class: Optional[Type[T]] = None,
    namespace: Optional[str] = None,
) -> T

Retrieve a result by ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required
model_class Optional[Type[T]]

Optional model class to validate against. If not provided, the stored model type will be used.

None
namespace Optional[str]

Optional namespace override to look for the model in

None

Returns:

Type Description
T

Pydantic model instance

Raises:

Type Description
FileNotFoundError

If the result doesn't exist

ValueError

If the model type is not registered

ValidationError

If the data doesn't match the model schema

Source code in src/results_manager/manager.py
def get(self, 
        result_id: Union[str, List[str]], 
        model_class: Optional[Type[T]] = None,
        namespace: Optional[str] = None) -> T:
    """
    Retrieve a result by ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result
        model_class: Optional model class to validate against. If not provided,
                     the stored model type will be used.
        namespace: Optional namespace override to look for the model in

    Returns:
        Pydantic model instance

    Raises:
        FileNotFoundError: If the result doesn't exist
        ValueError: If the model type is not registered
        ValidationError: If the data doesn't match the model schema
    """
    return self.backend.get(
        result_id=result_id,
        model_class=model_class,
        namespace=namespace
    )

list_ids

list_ids(prefix: Union[str, List[str]] = None) -> List[str]

List all result IDs, optionally filtered by a prefix.

Parameters:

Name Type Description Default
prefix Union[str, List[str]]

Optional prefix path to filter results

None

Returns:

Type Description
List[str]

List of result IDs

Source code in src/results_manager/manager.py
def list_ids(self, prefix: Union[str, List[str]] = None) -> List[str]:
    """
    List all result IDs, optionally filtered by a prefix.

    Args:
        prefix: Optional prefix path to filter results

    Returns:
        List of result IDs
    """
    return self.backend.list_ids(prefix)

set

set(
    result_id: Union[str, List[str]],
    data: BaseModel,
    behavior: SetBehavior = RAISE_IF_EXISTS,
    namespace: Optional[str] = None,
    strict_namespace: bool = False,
) -> bool

Store a result with the given ID.

Parameters:

Name Type Description Default
result_id Union[str, List[str]]

Unique identifier or hierarchical path for the result

required
data BaseModel

Pydantic model instance to store

required
behavior SetBehavior

How to handle existing data with the same ID

RAISE_IF_EXISTS
namespace Optional[str]

Optional namespace to store the model in. If None, will try to determine the namespace from the model class automatically.

None
strict_namespace bool

If True, raises an error if the model is registered in multiple non-default namespaces

False

Returns:

Type Description
bool

True if data was written, False if skipped (only for SKIP_IF_EXISTS)

Raises:

Type Description
FileExistsError

If data already exists (for RAISE_IF_EXISTS) or if different data exists (for RAISE_IF_DIFFERENT)

Source code in src/results_manager/manager.py
def set(self, 
        result_id: Union[str, List[str]], 
        data: BaseModel, 
        behavior: SetBehavior = SetBehavior.RAISE_IF_EXISTS,
        namespace: Optional[str] = None,
        strict_namespace: bool = False) -> bool:
    """
    Store a result with the given ID.

    Args:
        result_id: Unique identifier or hierarchical path for the result
        data: Pydantic model instance to store
        behavior: How to handle existing data with the same ID
        namespace: Optional namespace to store the model in. If None, will try to
                  determine the namespace from the model class automatically.
        strict_namespace: If True, raises an error if the model is registered 
                         in multiple non-default namespaces

    Returns:
        True if data was written, False if skipped (only for SKIP_IF_EXISTS)

    Raises:
        FileExistsError: If data already exists (for RAISE_IF_EXISTS) or
                         if different data exists (for RAISE_IF_DIFFERENT)
    """
    return self.backend.set(
        result_id=result_id, 
        data=data, 
        behavior=behavior, 
        namespace=namespace, 
        strict_namespace=strict_namespace
    )

SetBehavior

Bases: Enum

Defines behavior when setting data for an ID that already exists.

clear_registry

clear_registry(namespace: Optional[str] = None)

Clear the model registry, optionally only for a specific namespace.

Parameters:

Name Type Description Default
namespace Optional[str]

If provided, only clear this namespace. Otherwise, clear all.

None
Source code in src/results_manager/model_registry.py
def clear_registry(namespace: Optional[str] = None):
    """
    Clear the model registry, optionally only for a specific namespace.

    Args:
        namespace: If provided, only clear this namespace. Otherwise, clear all.
    """
    if namespace is None:
        _MODEL_REGISTRY.clear()
    elif namespace in _MODEL_REGISTRY:
        _MODEL_REGISTRY[namespace].clear()

find_model_in_all_namespaces

find_model_in_all_namespaces(model_name: str) -> List[Tuple[str, Type[BaseModel]]]

Find a model by name in all namespaces.

Parameters:

Name Type Description Default
model_name str

The name of the model class

required

Returns:

Type Description
List[Tuple[str, Type[BaseModel]]]

List of (namespace, model_class) tuples for all matches

Source code in src/results_manager/model_registry.py
def find_model_in_all_namespaces(model_name: str) -> List[Tuple[str, Type[BaseModel]]]:
    """
    Find a model by name in all namespaces.

    Args:
        model_name: The name of the model class

    Returns:
        List of (namespace, model_class) tuples for all matches
    """
    results = []
    for namespace, models in _MODEL_REGISTRY.items():
        if model_name in models:
            results.append((namespace, models[model_name]))
    return results

find_model_namespace

find_model_namespace(model_class: Type[BaseModel], strict: bool = False) -> Optional[str]

Find the namespace for a model class.

If the model is registered in multiple namespaces, behavior depends on the 'strict' parameter: - If strict=False (default): Prioritizes non-default namespaces, returns the first one found - If strict=True: Raises ValueError if found in multiple non-default namespaces

Parameters:

Name Type Description Default
model_class Type[BaseModel]

The model class to find the namespace for

required
strict bool

Whether to raise an error if the model is in multiple non-default namespaces

False

Returns:

Type Description
Optional[str]

The namespace name if found, None otherwise

Raises:

Type Description
ValueError

If strict=True and the model is registered in multiple non-default namespaces

Source code in src/results_manager/model_registry.py
def find_model_namespace(model_class: Type[BaseModel], strict: bool = False) -> Optional[str]:
    """
    Find the namespace for a model class.

    If the model is registered in multiple namespaces, behavior depends on the 'strict' parameter:
    - If strict=False (default): Prioritizes non-default namespaces, returns the first one found
    - If strict=True: Raises ValueError if found in multiple non-default namespaces

    Args:
        model_class: The model class to find the namespace for
        strict: Whether to raise an error if the model is in multiple non-default namespaces

    Returns:
        The namespace name if found, None otherwise

    Raises:
        ValueError: If strict=True and the model is registered in multiple non-default namespaces
    """
    model_name = model_class.__name__
    found_namespaces = []

    # Find all namespaces containing this model class
    for namespace, models in _MODEL_REGISTRY.items():
        if model_name in models and models[model_name] is model_class:
            found_namespaces.append(namespace)

    if not found_namespaces:
        return None

    # Filter to just non-default namespaces
    non_default_namespaces = [ns for ns in found_namespaces if ns != DEFAULT_NAMESPACE]

    # If strict mode and multiple non-default namespaces, raise error
    if strict and len(non_default_namespaces) > 1:
        raise ValueError(
            f"Model '{model_name}' is registered in multiple non-default namespaces: "
            f"{', '.join(non_default_namespaces)}. Specify a namespace explicitly."
        )

    # Prioritize: first non-default namespace, or default namespace
    if non_default_namespaces:
        return non_default_namespaces[0]
    elif DEFAULT_NAMESPACE in found_namespaces:
        return DEFAULT_NAMESPACE
    else:
        return None  # Should not reach here, but just in case

get_model_class

get_model_class(model_name: str, namespace: str = DEFAULT_NAMESPACE) -> Optional[Type[BaseModel]]

Retrieve a model class from the registry by name and namespace.

Parameters:

Name Type Description Default
model_name str

The name of the model class

required
namespace str

The namespace to look in

DEFAULT_NAMESPACE

Returns:

Type Description
Optional[Type[BaseModel]]

The model class if found, None otherwise

Source code in src/results_manager/model_registry.py
def get_model_class(model_name: str, namespace: str = DEFAULT_NAMESPACE) -> Optional[Type[BaseModel]]:
    """
    Retrieve a model class from the registry by name and namespace.

    Args:
        model_name: The name of the model class
        namespace: The namespace to look in

    Returns:
        The model class if found, None otherwise
    """
    namespace_registry = _MODEL_REGISTRY.get(namespace, {})
    return namespace_registry.get(model_name)

get_models_in_namespace

get_models_in_namespace(namespace: str = DEFAULT_NAMESPACE) -> List[str]

Get a list of all model names in a namespace.

Parameters:

Name Type Description Default
namespace str

The namespace to get models from

DEFAULT_NAMESPACE

Returns:

Type Description
List[str]

List of model names

Source code in src/results_manager/model_registry.py
def get_models_in_namespace(namespace: str = DEFAULT_NAMESPACE) -> List[str]:
    """
    Get a list of all model names in a namespace.

    Args:
        namespace: The namespace to get models from

    Returns:
        List of model names
    """
    return list(_MODEL_REGISTRY.get(namespace, {}).keys())

get_namespaces

get_namespaces() -> List[str]

Get a list of all registered namespaces.

Returns:

Type Description
List[str]

List of namespace names

Source code in src/results_manager/model_registry.py
def get_namespaces() -> List[str]:
    """
    Get a list of all registered namespaces.

    Returns:
        List of namespace names
    """
    return list(_MODEL_REGISTRY.keys())

register_model

register_model(model_class_or_namespace: Any = None, *, namespace: str = DEFAULT_NAMESPACE)

Register a pydantic model class in the registry.

Can be used as a decorator with or without arguments:

@register_model class MyModel(BaseModel): ...

@register_model(namespace="custom") class MyModel(BaseModel): ...

Or programmatically: register_model(MyModel, namespace="custom")

Parameters:

Name Type Description Default
model_class_or_namespace Any

The model class to register or a namespace string

None
namespace str

The namespace to register the model in (when used programmatically)

DEFAULT_NAMESPACE

Returns:

Type Description

The decorator function or the registered model class

Source code in src/results_manager/model_registry.py
def register_model(model_class_or_namespace: Any = None, *, namespace: str = DEFAULT_NAMESPACE):
    """
    Register a pydantic model class in the registry.

    Can be used as a decorator with or without arguments:

    @register_model
    class MyModel(BaseModel):
        ...

    @register_model(namespace="custom")
    class MyModel(BaseModel):
        ...

    Or programmatically:
    register_model(MyModel, namespace="custom")

    Args:
        model_class_or_namespace: The model class to register or a namespace string
        namespace: The namespace to register the model in (when used programmatically)

    Returns:
        The decorator function or the registered model class
    """
    # Handle case where register_model is called directly with a model class
    if isinstance(model_class_or_namespace, type) and issubclass(model_class_or_namespace, BaseModel):
        return _register_model(model_class_or_namespace, namespace)

    # Handle case where register_model is used as a decorator with or without arguments
    def decorator(model_class):
        if not isinstance(model_class, type) or not issubclass(model_class, BaseModel):
            raise TypeError("Registered model must be a subclass of BaseModel")

        # If model_class_or_namespace is a string, use it as namespace
        ns = model_class_or_namespace if isinstance(model_class_or_namespace, str) else namespace
        return _register_model(model_class, ns)

    return decorator