ResultsManager: Pydantic-Validated Data Storage for Parallel Processing¶
ResultsManager is a flexible storage system for Pydantic models, designed for keeping track of results from parallel processing tasks. It provides a simple yet powerful interface to store, retrieve, and manage structured data with built-in validation.
Key Features¶
- Pydantic Integration: First-class support for storing and retrieving Pydantic models with automatic validation
- Hierarchical Organization: Store data using intuitive path-like IDs (e.g., "projects/project1/results/analysis")
- Multiple Storage Backends: Seamlessly switch between file-based, SQLite, or custom backends
- Namespace Management: Organize models by namespace to prevent naming conflicts
- Concurrency Support: Thread and process safe with proper locking mechanisms
- Async Support: Full async API for use with asyncio-based applications
- Type Safety: Comprehensive type hints and runtime type validation
When to Use ResultsManager¶
ResultsManager is ideal for:
- Data Processing Pipelines: Store intermediate and final results from data transformations
- Machine Learning Workflows: Save model artifacts, parameters, and evaluation metrics
- Parallel Task Processing: Manage results from distributed or concurrent processing
- API Result Caching: Store validated results from API calls for reuse
- ETL Processes: Capture extraction, transformation, and loading outputs
Getting Started¶
Installation¶
Basic Usage¶
from pydantic import BaseModel
from typing import List, Optional
from results_manager import ResultsManager, register_model
# Define your data models
@register_model
class Person(BaseModel):
name: str
age: int
email: Optional[str] = None
@register_model
class Team(BaseModel):
name: str
members: List[Person]
# Create a manager
results = ResultsManager("./data")
# Store some data
person = Person(name="John Doe", age=30, email="john@example.com")
results.set("people/john", person)
team = Team(
name="Engineering",
members=[
Person(name="John Doe", age=30),
Person(name="Jane Smith", age=28)
]
)
results.set("teams/engineering", team)
# Retrieve data later
john = results.get("people/john")
print(f"Retrieved: {john.name}, {john.age}")
# List available results
all_ids = results.list_ids()
print(f"Available results: {all_ids}")
# Find results with a prefix
team_ids = results.list_ids("teams")
print(f"Teams: {team_ids}")
# Check if data exists
if results.exists("people/jane"):
print("Jane's data exists")
else:
print("Jane's data not found")
# Delete data when no longer needed
results.delete("people/john")
Storage Backends¶
ResultsManager offers multiple backends for different use cases:
File Backend (Default)¶
The FileBackend stores each result as a separate JSON file in a directory structure that mirrors your ID hierarchy:
from results_manager import ResultsManager, FileBackend
# Default uses FileBackend
results = ResultsManager("./data")
# Or explicitly specify it
file_backend = FileBackend("./data")
results = ResultsManager(backend=file_backend)
Best for:
- Development and testing
- Simple applications
- Small to medium datasets
- Local processing
SQLite Backend¶
The SQLiteBackend stores results in a SQLite database for better query performance and atomicity:
from results_manager import ResultsManager
from results_manager.backends.sqlite_backend import SqliteBackend
sqlite_backend = SqliteBackend("./results.db")
results = ResultsManager(backend=sqlite_backend)
Best for:
- Larger datasets
- More frequent updates
- Applications needing transactional consistency
- Situations where you need to query across many results
Custom Backends¶
You can implement custom backends by inheriting from ResultsBackend:
from results_manager import ResultsManager, ResultsBackend
class MyCustomBackend(ResultsBackend):
# Implement required methods
# ...
results = ResultsManager(backend=MyCustomBackend())
Switching Backends¶
One of ResultsManager's most powerful features is the ability to switch backends without changing your application code:
# Start with file storage during development
results = ResultsManager("./dev_data")
# Later switch to SQLite for production
sqlite_backend = SqliteBackend("./prod.db")
results.backend = sqlite_backend
# Your application code remains unchanged
results.set("key", data)
retrieved = results.get("key")
This makes it easy to scale up as your needs grow.
Async Support¶
For asyncio-based applications, ResultsManager provides a full async API:
import asyncio
from results_manager import AsyncResultsManager
async def process_data():
results = AsyncResultsManager("./data")
# All operations are async
await results.set("key", data)
retrieved = await results.get("key")
# Run operations concurrently
tasks = [
results.set(f"item/{i}", data)
for i in range(10)
]
await asyncio.gather(*tasks)
asyncio.run(process_data())
Namespace Management¶
ResultsManager uses a model registry with namespace support to avoid naming conflicts:
from results_manager import register_model, get_model_class
# Register in default namespace
@register_model
class User(BaseModel):
name: str
# Register in custom namespace
@register_model(namespace="analytics")
class User(BaseModel): # Same name, different model
user_id: str
visit_count: int
# Get the right model by namespace
user_model = get_model_class("User") # Default namespace
analytics_user = get_model_class("User", namespace="analytics")
Scaling Your Workflows¶
ResultsManager is designed to grow with your application needs:
From Single Process to Distributed Execution¶
import concurrent.futures
from results_manager import ResultsManager, SetBehavior
def process_item(item_id):
# Each process creates its own manager instance
results = ResultsManager("./results")
# Process the item
output = compute_result(item_id)
# Store with SKIP_IF_EXISTS to handle cases where another process
# already completed this item
results.set(f"items/{item_id}", output, behavior=SetBehavior.SKIP_IF_EXISTS)
return item_id
# Process items in parallel
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(process_item, i) for i in range(100)]
for future in concurrent.futures.as_completed(futures):
print(f"Completed item {future.result()}")
From Small to Large Datasets¶
As your data grows, you can easily switch to a more scalable backend:
# During development with small data
results = ResultsManager("./dev_data")
# For production with larger data
from results_manager.backends.sqlite_backend import SqliteBackend
results = ResultsManager(backend=SqliteBackend("./prod.db"))
# Future expansion to other backends
# from results_manager.backends.postgres_backend import PostgresBackend
# results.backend = PostgresBackend(connection_string)
Why ResultsManager?¶
Compared to Simple File Storage¶
- Type Safety: Automatic validation of data structures
- Organization: Hierarchical IDs vs. flat files
- Concurrency: Built-in locking for safe concurrent access
- Flexibility: Multiple backend options
Compared to Databases¶
- Simplified Interface: No SQL or ORM knowledge required
- Schema Flexibility: Models can evolve without migrations
- Type Validation: Automatic through Pydantic
- Python-Native: Works directly with Python objects
Compared to Key-Value Stores¶
- Rich Data Models: Full Pydantic model support vs. simple values
- Hierarchical Structure: Natural organization vs. flat namespaces
- Type Safety: Strongly typed vs. schema-less
Real-World Use Cases¶
Machine Learning Experiment Tracking¶
from results_manager import ResultsManager, register_model
@register_model
class ModelMetrics(BaseModel):
model_name: str
accuracy: float
precision: float
recall: float
training_time: float
parameters: Dict[str, Any]
results = ResultsManager("./experiments")
# Track experiment results
metrics = ModelMetrics(
model_name="RandomForest",
accuracy=0.92,
precision=0.89,
recall=0.94,
training_time=45.2,
parameters={"n_estimators": 100, "max_depth": 10}
)
results.set("models/random_forest/run_1", metrics)
# Later, analyze all experiments
for result_id in results.list_ids("models/random_forest"):
metrics = results.get(result_id, ModelMetrics)
print(f"{result_id}: Accuracy={metrics.accuracy}, Time={metrics.training_time}s")
Data Processing Pipeline¶
from results_manager import ResultsManager, register_model
@register_model
class RawData(BaseModel):
# Raw data schema
...
@register_model
class ProcessedData(BaseModel):
# Processed data schema
...
@register_model
class AnalysisResult(BaseModel):
# Analysis results schema
...
results = ResultsManager("./pipeline_data")
# Stage 1: Extract data
raw_data = extract_data()
results.set("pipeline/extraction", raw_data)
# Stage 2: Process data
raw = results.get("pipeline/extraction", RawData)
processed = process_data(raw)
results.set("pipeline/processing", processed)
# Stage 3: Analyze data
processed = results.get("pipeline/processing", ProcessedData)
analysis = analyze_data(processed)
results.set("pipeline/analysis", analysis)
# Get final results any time later
final_results = results.get("pipeline/analysis", AnalysisResult)
Conclusion¶
ResultsManager provides a robust solution for managing structured data in Python applications. Its combination of type safety, flexible storage options, and intuitive interface makes it an ideal choice for data processing, machine learning workflows, and parallel task management.
Whether you're working on a small personal project or a large-scale data processing pipeline, ResultsManager adapts to your needs and grows with your application.