ESPnet3 Parallel
ESPnet3 Parallel
The parallel execution layer for provider/runner workflows β shard planning, local and Dask-backed dispatch, resume, and merge. For YAML settings only, see Parallel config.
01 / overview
Four roles, one pipeline
ESPnet3 parallel processing is built around four components. Each has a clear responsibility. Colors are used consistently throughout this guide.
BaseRunner
The orchestrator (Driver)
Splits data into shards, dispatches them to workers, and merges results when all workers finish. It never processes data itself.
forward()
The work unit (Worker)
A pure function that processes one sample or batch. Must be a @staticmethod β no self β so Dask can serialize and send it to remote workers.
EnvironmentProvider
The initialization blueprint
Defines how and where to build the dataset and model β locally on the driver, or once per worker process. Two methods, two different timings.
Cluster / Client
The execution environment
Set once with set_parallel(config). Changing env switches from running everything locally to submitting jobs to a SLURM cluster β no other code changes needed.
You don't need to know Dask. ESPnet3 uses Dask internally, but you only interact with set_parallel(config). The BaseRunner handles everything else.
02 / execution modes
Execution modes: who runs where
A single value in parallel_config.env fundamentally changes where execution happens. Switch tabs to see the flow for each mode.
π₯ Your machine (Driver)
π Filesystem
split.0/ β done
split.1/ β done
...
tqdm. # local mode (config.yaml)
parallel:
env: local
n_workers: 1 # not used in local mode
options: {}03 / provider
Provider: two methods, two timings
The biggest source of confusion with EnvironmentProvider is that the two methods are called at completely different times and places.
runner(indices)β¦_run_local() calls provider.build_env_local() directlydict (dataset, model, ...) is used immediatelyenv = provider.build_env_local()
# dict is returned immediately
forward(idx, **env)provider.build_worker_setup_fn() β returns a functionDictReturnWorkerPlugin and sent to workerssetup() is called β dataset/model are built on the workersetup_fn = provider.build_worker_setup_fn()
# nothing is initialized yet
# setup_fn is a closure
plugin = DictReturnWorkerPlugin(setup_fn)
# when a worker starts:
env = setup_fn() # runs on the workerWhy return a function instead of the env directly?
SLURM workers run on different machines. If the driver initialized the model and tried to send it, the full model weights would need to be serialized and transferred over the network. Instead, only the lightweight initialization recipe (closure) is sent, and each worker builds its own copy locally β significantly reducing transfer overhead.
Automatic env injection: wrap_func_with_worker_env
When calling forward() on a worker, you don't pass dataset or model explicitly. wrap_func_with_worker_env inspects the function signature and matches argument names against the worker's env keys β injecting them automatically.
@staticmethod
def forward(idx, dataset, model, device, **env):
# dataset, model, device β injected from worker env
# idx β passed by client.map()
# Worker env (returned by setup_fn()):
env = { "dataset": ds, "model": md, "device": dev, ... }
β key names match forward's argument names β auto-injected Implementation example: subclassing InferenceProvider
class EnvironmentProvider(ABC):
"""Abstract base β implement both methods."""
@abstractmethod
def build_env_local(self) -> dict:
"""Local execution: called on the driver immediately."""
...
@abstractmethod
def build_worker_setup_fn(self) -> Callable:
"""Distributed execution: the returned function is called on each worker."""
...04 / shard lifecycle
Shard lifecycle
What happens internally between calling runner(range(200)) and getting the result back. Step through each stage.
Calling runner(range(200)) triggers BaseRunner.__call__(). All indices are converted to a list. If batch_size is set, they are grouped into batches here.
Where is output_dir set? It is passed as a constructor argument to the Runner. All shard files, manifest.json, and final outputs are written under this directory.
runner = MyRunner(
provider,
output_dir="exp/decode", # β set here
shard_subdir="train", # shards go under output_dir/train/
resume=True, # skip already-done shards
)
# When using collect_stats(), output_dir is passed as a function
# argument and forwarded to the Runner automatically.# First thing BaseRunner.__call__ does
indices = list(indices)
if self.batch_size is not None:
indices = [
list(indices[i : i + self.batch_size])
for i in range(0, len(indices), self.batch_size)
]05 / config builder
Config builder
Select an execution environment and fill in the values. The YAML config and the generated SLURM script update in real time.
parallel:
env: local
n_workers: 4
options: {}Other cluster types (PBS, SGE, LSF, HTCondor, ...): Change env to "pbs", "sge", "lsf", or "htcondor". The available options vary by scheduler β see the dask-jobqueue documentation.
Related pages
Provider & Runner API
Full subclass contract β EnvironmentProvider, BaseRunner, open_writers, write_record, merge, and the static forward() rule.
InferenceProvider
The stage-facing provider used by the base inference path. Subclass this when adding a new inference stage.
Data preparation pattern
How the same provider/runner pattern is applied for dataset preparation and collect_stats.
Parallel config reference
All YAML keys for parallel backends β env, n_workers, options β with backend-specific notes.
