X-Vector Extraction with Provider and Runner
X-Vector Extraction with Provider and Runner
This page walks through an example of the Provider / Runner codes: computing x-vectors (speaker embeddings) for every utterance in the LibriTTS VITS recipe.
Read Provider and Runner first if you are not familiar with provider and runner yet.
How the pieces fit together
Set the parallel config below and step through each phase to see exactly how the Provider and Runner interact.
provider.build_env_local() once on the driver, then processes all indices sequentially in a single process. n_workers is ignored. parallel:
env: local
n_workers: 3 # ignored in local mode (1 shard)
options: {}
runner:
batch_size: null # int idx per callConfiguration reference
All xvector settings live under the xvector: key in training_config.yaml. Select a toolkit to update the config preview, then expand the field reference for a description of each argument.
ECAPA-TDNN encoder via SpeechBrain. No resampling needed β AudioNormalizer handles it.
xvector:
toolkit: speechbrain
pretrained_model: speechbrain/spkrec-ecapa-voxceleb
device: cuda:0
spk_embed_tag: spkrec-ecapa-voxceleb
save_path: ${data_dir}/x_vectors
batch_size: 20
manifest_paths:
train: ${data_dir}/manifest/train.tsv
valid: ${data_dir}/manifest/valid.tsv
test: ${data_dir}/manifest/test.tsv
splits:
- train
- valid
- testXVectorProvider extends EnvironmentProvider and builds the runtime environment β model, manifest, and output directory β once per process.
Called once on the driver. Builds and returns the env dict immediately.
Returns a zero-arg closure. Each Dask worker calls it once at startup.
from espnet3.parallel.env_provider import EnvironmentProvider
from speechbrain.inference.classifiers import EncoderClassifier
class XVectorProvider(EnvironmentProvider):
def build_env_local(self):
xvec_cfg = self.config.xvector
model = EncoderClassifier.from_hparams(
source=xvec_cfg.get("pretrained_model",
"speechbrain/spkrec-ecapa-voxceleb"),
run_opts={"device": xvec_cfg.get("device", "cpu")},
)
utterances, _ = self._load_manifest(self.params["manifest_path"])
return {
"model": model,
"toolkit": "speechbrain",
"device": xvec_cfg.get("device", "cpu"),
"utterances": utterances,
"output_dir": Path(self.params["output_dir"]),
}
def build_worker_setup_fn(self):
config, params = self.config, self.params
def setup():
xvec_cfg = config.xvector
model = EncoderClassifier.from_hparams(
source=xvec_cfg.get("pretrained_model",
"speechbrain/spkrec-ecapa-voxceleb"),
run_opts={"device": xvec_cfg.get("device", "cpu")},
)
utterances, _ = XVectorProvider._load_manifest(
params["manifest_path"])
return {
"model": model,
"toolkit": "speechbrain",
"device": xvec_cfg.get("device", "cpu"),
"utterances": utterances,
"output_dir": Path(params["output_dir"]),
}
return setup # β return the function, not the dict!Manifest format
_load_manifest is a helper used by XVectorProvider in this LibriTTS recipe. It parses a tab-separated file with one utterance per line:
utt_id \t wav_path \t text \t speaker_idExample:
1089-134686-0000 /data/LibriTTS/train-clean-360/1089/134686/1089-134686-0000.wav HE BEGAN 1089
1089-134686-0001 /data/LibriTTS/train-clean-360/1089/134686/1089-134686-0001.wav A LONG 1089It returns utterances β a list[(utt_id, wav_path)] indexed by position β stored in the env dict so the runner can resolve utterances[idx].
When writing a Provider for a different task, replace _load_manifest with whatever data loading your forward() needs β a dataset class, a file list, a HuggingFace dataset, etc. The pattern is the same: load it in the Provider, store it under a key, declare that key as a parameter in forward().
Output layout
After extraction the output directory looks like:
{save_path}/
{spk_embed_tag}/
train/
{utt_id}.pt β one float32 tensor per utterance
valid/
...
test/
...Adapting for a new task
To write your own Provider / Runner for a different task, follow these steps:
Step 1 β decide what forward() needs. List everything the per-item function needs: a model, a dataset, a tokenizer, an output path. Those become the keys of your env dict.
Step 2 β implement the Provider. Return those keys from build_env_local(), and mirror the same logic inside the setup() closure in build_worker_setup_fn(). Inside setup(), use only the variables captured from the outer scope β self is not available because the closure runs on a remote worker, not on the driver.
Step 3 β implement the Runner. Declare forward() as a @staticmethod with parameter names that match the dict keys. Handle both isinstance(idx, int) and iterable idx if you set batch_size.
Step 4 β instantiate and call. Pass the provider to the runner and call runner(range(n)).
Minimal skeleton:
from pathlib import Path
from espnet3.parallel.env_provider import EnvironmentProvider
from espnet3.parallel.base_runner import BaseRunner
class MyProvider(EnvironmentProvider):
def __init__(self, config, params=None):
super().__init__(config)
self.params = params or {} # runtime args (paths, splits, β¦)
def build_env_local(self):
return {
"model": load_my_model(self.config),
"dataset": load_my_dataset(self.params["data_path"]),
}
def build_worker_setup_fn(self):
config, params = self.config, self.params # capture, not self
def setup():
# self is not available here β use captured config and params
return {
"model": load_my_model(config),
"dataset": load_my_dataset(params["data_path"]),
}
return setup # β function, not dict
class MyRunner(BaseRunner):
@staticmethod
def forward(idx, model, dataset, **env): # names match Provider keys
if isinstance(idx, int):
return MyRunner._process_one(idx, model, dataset)
return [MyRunner._process_one(i, model, dataset) for i in idx]
@staticmethod
def _process_one(idx, model, dataset):
item = dataset[idx]
result = model(item)
# write to disk here, or return the result for BaseRunner to collect
return {"idx": idx, "status": "ok"}
# Usage
provider = MyProvider(config, params={"data_path": "data/manifest.tsv"})
runner = MyRunner(provider)
runner(range(len(dataset)))Common mistakes
Key name mismatch between Provider and Runner
If the Provider returns {"mdl": model} but forward() declares model, the argument is silently not injected and the call raises TypeError: missing required argument. Always copy-paste key names from the dict directly into the parameter list.
build_worker_setup_fn returns a dict instead of a function
# β This runs setup on the driver, not the worker β GPU is on the wrong machine
def build_worker_setup_fn(self):
return {"model": load_model(self.config)} # wrong
# β This runs setup on each worker when it starts
def build_worker_setup_fn(self):
config = self.config
def setup():
return {"model": load_model(config)}
return setupLoading the model inside forward()
# β Loads from disk on every single utterance β extremely slow
@staticmethod
def forward(idx, config, **env):
model = load_model(config) # wrong β runs N times
...
# β Model loaded once in Provider, injected into every forward() call
@staticmethod
def forward(idx, model, **env): # model comes from Provider env
...Not handling both int and list[int] for idx
When batch_size is set, idx is a list[int]. If forward() always treats idx as an int, it silently indexes with a list and produces wrong results or crashes.
@staticmethod
def forward(idx, model, dataset, **env):
if isinstance(idx, int):
return process_one(idx, model, dataset)
return [process_one(i, model, dataset) for i in idx]See also
Provider and Runner
Read the base contracts for EnvironmentProvider and BaseRunner.
Data Preparation
See how providers and runners are used for dataset preparation stages.
Inference Provider
See the inference-stage provider pattern.
EnvironmentProvider API
Read the generated contract for local and worker env setup.
BaseRunner API
Read the generated contract for forward, writer hooks, and merge.
Parallel Config
Review local, local GPU, and cluster backend settings.
