process
process ¶
Base process implementation.
BaseProcess ¶
Bases: ABC
Abstract base class for all process implementations.
The BaseProcess class provides a framework for implementing custom processes that can be executed in different contexts (async, threaded, or multiprocess). It handles common functionality like execution tracking, error handling, and workflow integration.
Key Features
- Flexible execution strategies (async, thread, process)
- Built-in timeout handling
- Automatic execution time tracking
- Process state management
- Error handling and reporting
Attributes:
Name | Type | Description |
---|---|---|
config |
ProcessConfig
|
Configuration settings for the process |
metadata |
ProcessMetadata
|
Runtime metadata and state tracking |
_workflow |
Optional[Workflow]
|
Reference to parent workflow |
Example
Initialize the process with configuration settings.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
|
ProcessConfig
|
Process configuration including: - process_type: Execution strategy (ASYNC, THREAD, PROCESS) - process_id: Unique identifier for the process - timeout: Optional timeout duration in seconds |
required |
Source code in src/process_manager/workflow/process.py
execute
async
¶
execute(input_data: Any) -> ProcessResult
Execute the process using the appropriate execution strategy.
Uses the parent workflow's pool manager if the process is attached to a workflow, otherwise creates a standalone pool manager for independent execution.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_data
|
Any
|
Input data for the process |
required |
Returns:
Type | Description |
---|---|
ProcessResult
|
ProcessResult containing the execution result and metadata |
Source code in src/process_manager/workflow/process.py
get_process_result
classmethod
¶
Helper method to consistently extract data from process results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_data
|
Dict[str, Any]
|
Dictionary containing process results |
required |
process_id
|
ProcessId
|
ProcessId of the process whose result we want |
required |
Returns:
Type | Description |
---|---|
Any
|
The data from the specified process result |
Raises:
Type | Description |
---|---|
ValueError
|
If the process result is not found or invalid |
Source code in src/process_manager/workflow/process.py
process ¶
Main processing logic to be implemented by subclasses.
This is the primary method that users should override. It contains just the core processing logic without worrying about execution details.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_data
|
Any
|
The input data to process |
required |
Returns:
Type | Description |
---|---|
Any
|
The processed result |
Source code in src/process_manager/workflow/process.py
register_to ¶
register_to(
workflow: Workflow, dependencies: Optional[List[str | ProcessId]] = None, required: bool = True
) -> BaseProcess
Register this process as a WorkflowNode to the given workflow.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow
|
Workflow
|
The workflow to register this process to |
required |
dependencies
|
Optional[List[str | ProcessId]]
|
Optional list of process IDs that this process depends on |
None
|
required
|
bool
|
Whether this process is required for workflow completion |
True
|
Returns:
Type | Description |
---|---|
Self
|
Reference for method chaining |
Source code in src/process_manager/workflow/process.py
run
async
¶
run(input_data: Any) -> ProcessResult
Run the process with the configured execution strategy.
This method handles: 1. Process state management 2. Execution time tracking 3. Error handling 4. Result packaging
The execution strategy is determined by the process_type setting in the configuration (ASYNC, THREAD, or PROCESS).
Parameters:
Name | Type | Description | Default |
---|---|---|---|
input_data
|
Any
|
Input data for the process |
required |
Returns:
Name | Type | Description |
---|---|---|
ProcessResult |
ProcessResult
|
Object containing: - success: Whether execution completed successfully - data: Process output data - execution_time: Time taken in seconds - start_time: Execution start timestamp - end_time: Execution end timestamp - error: Error information if execution failed |
Raises:
Type | Description |
---|---|
ProcessError
|
If execution fails for any reason |
Source code in src/process_manager/workflow/process.py
set_workflow ¶
set_workflow(workflow: Workflow) -> None
Set reference to parent workflow for resource access.
This method is called by the workflow when the process is added. The workflow reference provides access to shared resources like thread and process pools.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
workflow
|
Workflow
|
Parent workflow instance |
required |