Skip to content

v4

v4

Example of nested workflows for parallel random number generation.

execute_workflow_process

execute_workflow_process(args: Tuple[int, Optional[int], Path]) -> Dict[str, Any]

Create and execute a single workflow process.

Source code in src/process_manager/examples/simple/v4.py
def execute_workflow_process(args: Tuple[int, Optional[int], Path]) -> Dict[str, Any]:
    """Create and execute a single workflow process."""
    index, seed, output_dir = args

    workflow = create_workflow(
        max_threads=1,
        process_id=f"workflow_process_{index}"
    )

    generator = RandomNumberGenerator(index=index, seed=seed)
    writer = FileWriter(index=index, output_dir=output_dir)

    workflow.add_node(WorkflowNode(
        process=generator,
        dependencies=[],
        required=True
    ))

    workflow.add_node(WorkflowNode(
        process=writer,
        dependencies=[generator.config.process_id],
        required=True
    ))

    # Run the coroutine in a new event loop
    loop = asyncio.new_event_loop()
    try:
        asyncio.set_event_loop(loop)
        return loop.run_until_complete(workflow.execute())
    finally:
        loop.close()