Create and execute a single workflow process.
Source code in src/process_manager/examples/simple/v4.py
| def execute_workflow_process(args: Tuple[int, Optional[int], Path]) -> Dict[str, Any]:
"""Create and execute a single workflow process."""
index, seed, output_dir = args
workflow = create_workflow(
max_threads=1,
process_id=f"workflow_process_{index}"
)
generator = RandomNumberGenerator(index=index, seed=seed)
writer = FileWriter(index=index, output_dir=output_dir)
workflow.add_node(WorkflowNode(
process=generator,
dependencies=[],
required=True
))
workflow.add_node(WorkflowNode(
process=writer,
dependencies=[generator.config.process_id],
required=True
))
# Run the coroutine in a new event loop
loop = asyncio.new_event_loop()
try:
asyncio.set_event_loop(loop)
return loop.run_until_complete(workflow.execute())
finally:
loop.close()
|