espnet3.parallel.parallel.parallel_for
About 1 min
espnet3.parallel.parallel.parallel_for
espnet3.parallel.parallel.parallel_for(func: Callable, args: Iterable, client: Client | None = None, setup_fn: Callable[[], dict] | None = None, **kwargs: Any) β Generator
Dispatch tasks to Dask and iterate over results as they complete.
This helper: : - Creates (or reuses) a Dask client based on the global/explicit config.
- Optionally registers a per-worker environment via setup_fn and makes its returned dict available to func automatically.
- Performs a client-side conflict check so that keyword arguments explicitly passed via kwargs donβt collide with environment-provided arguments (pre-submission).
- Wraps func with wrap_func_with_worker_env so any missing keyword parameters can be injected from the worker environment.
Iteration order: : Results are yielded in completion order (using as_completed), not in the original order of args.
- Parameters:
- func β The function to run on each element of args. It may accept positional and/or keyword parameters.
- args β Iterable of inputs to func.
- client (optional) β Existing Dask Client. If None, a temporary client is created via get_client and shut down automatically when iteration ends.
- setup_fn (optional) β A callable executed once per worker that returns a dict of environment variables. Keys that match parameter names of func (or any keys if func accepts **kwargs) are auto-injected unless explicitly provided in kwargs.
- **kwargs β Extra keyword arguments forwarded to every call of func.
- Yields: Each taskβs result as soon as it finishes.
- Raises:
- ValueError β If at least one key in kwargs conflicts with keys provided by the worker environment (pre-submit check).
- Exception β Any exception raised by func is re-raised at iteration time when accessing future.result() for the failing task.
Notes
- This generator will close the internally created client once iteration finishes or the generator is exhausted.
- Worker-side injection and an additional conflict check are also enforced by wrap_func_with_worker_env.
Example
>>> def setup_fn():
... return {"bias": 3}
>>> def add_bias(x, bias):
... return x + bias
>>> # Stream results as tasks complete (completion order):
>>> for y in parallel_for(add_bias, [1, 2, 3], setup_fn=setup_fn):
... print(y)
4
5
6