-
Notifications
You must be signed in to change notification settings - Fork 468
/
Copy pathutils_vad.py
488 lines (385 loc) · 18.3 KB
/
utils_vad.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
import torch
import torchaudio
from typing import Callable, List
import warnings
languages = ['ru', 'en', 'de', 'es']
class OnnxWrapper():
def __init__(self, path, force_onnx_cpu=False):
import numpy as np
global np
import onnxruntime
opts = onnxruntime.SessionOptions()
opts.inter_op_num_threads = 1
opts.intra_op_num_threads = 1
if force_onnx_cpu and 'CPUExecutionProvider' in onnxruntime.get_available_providers():
self.session = onnxruntime.InferenceSession(path, providers=['CPUExecutionProvider'], sess_options=opts)
else:
self.session = onnxruntime.InferenceSession(path, sess_options=opts)
self.reset_states()
self.sample_rates = [8000, 16000]
def _validate_input(self, x, sr: int):
if x.dim() == 1:
x = x.unsqueeze(0)
if x.dim() > 2:
raise ValueError(f"Too many dimensions for input audio chunk {x.dim()}")
if sr != 16000 and (sr % 16000 == 0):
step = sr // 16000
x = x[:,::step]
sr = 16000
if sr not in self.sample_rates:
raise ValueError(f"Supported sampling rates: {self.sample_rates} (or multiply of 16000)")
if sr / x.shape[1] > 31.25:
raise ValueError("Input audio chunk is too short")
return x, sr
def reset_states(self, batch_size=1):
self._state = torch.zeros((2, batch_size, 128)).float()
self._context = torch.zeros(0)
self._last_sr = 0
self._last_batch_size = 0
def __call__(self, x, sr: int):
x, sr = self._validate_input(x, sr)
num_samples = 512 if sr == 16000 else 256
if x.shape[-1] != num_samples:
raise ValueError(f"Provided number of samples is {x.shape[-1]} (Supported values: 256 for 8000 sample rate, 512 for 16000)")
batch_size = x.shape[0]
context_size = 64 if sr == 16000 else 32
if not self._last_batch_size:
self.reset_states(batch_size)
if (self._last_sr) and (self._last_sr != sr):
self.reset_states(batch_size)
if (self._last_batch_size) and (self._last_batch_size != batch_size):
self.reset_states(batch_size)
if not len(self._context):
self._context = torch.zeros(batch_size, context_size)
x = torch.cat([self._context, x], dim=1)
if sr in [8000, 16000]:
ort_inputs = {'input': x.numpy(), 'state': self._state.numpy(), 'sr': np.array(sr, dtype='int64')}
ort_outs = self.session.run(None, ort_inputs)
out, state = ort_outs
self._state = torch.from_numpy(state)
else:
raise ValueError()
self._context = x[..., -context_size:]
self._last_sr = sr
self._last_batch_size = batch_size
out = torch.from_numpy(out)
return out
def audio_forward(self, x, sr: int):
outs = []
x, sr = self._validate_input(x, sr)
self.reset_states()
num_samples = 512 if sr == 16000 else 256
if x.shape[1] % num_samples:
pad_num = num_samples - (x.shape[1] % num_samples)
x = torch.nn.functional.pad(x, (0, pad_num), 'constant', value=0.0)
for i in range(0, x.shape[1], num_samples):
wavs_batch = x[:, i:i+num_samples]
out_chunk = self.__call__(wavs_batch, sr)
outs.append(out_chunk)
stacked = torch.cat(outs, dim=1)
return stacked.cpu()
class Validator():
def __init__(self, url, force_onnx_cpu):
self.onnx = True if url.endswith('.onnx') else False
torch.hub.download_url_to_file(url, 'inf.model')
if self.onnx:
import onnxruntime
if force_onnx_cpu and 'CPUExecutionProvider' in onnxruntime.get_available_providers():
self.model = onnxruntime.InferenceSession('inf.model', providers=['CPUExecutionProvider'])
else:
self.model = onnxruntime.InferenceSession('inf.model')
else:
self.model = init_jit_model(model_path='inf.model')
def __call__(self, inputs: torch.Tensor):
with torch.no_grad():
if self.onnx:
ort_inputs = {'input': inputs.cpu().numpy()}
outs = self.model.run(None, ort_inputs)
outs = [torch.Tensor(x) for x in outs]
else:
outs = self.model(inputs)
return outs
def read_audio(path: str,
sampling_rate: int = 16000):
sox_backends = set(['sox', 'sox_io'])
audio_backends = torchaudio.list_audio_backends()
if len(sox_backends.intersection(audio_backends)) > 0:
effects = [
['channels', '1'],
['rate', str(sampling_rate)]
]
wav, sr = torchaudio.sox_effects.apply_effects_file(path, effects=effects)
else:
wav, sr = torchaudio.load(path)
if wav.size(0) > 1:
wav = wav.mean(dim=0, keepdim=True)
if sr != sampling_rate:
transform = torchaudio.transforms.Resample(orig_freq=sr,
new_freq=sampling_rate)
wav = transform(wav)
sr = sampling_rate
assert sr == sampling_rate
return wav.squeeze(0)
def save_audio(path: str,
tensor: torch.Tensor,
sampling_rate: int = 16000):
torchaudio.save(path, tensor.unsqueeze(0), sampling_rate, bits_per_sample=16)
def init_jit_model(model_path: str,
device=torch.device('cpu')):
model = torch.jit.load(model_path, map_location=device)
model.eval()
return model
def make_visualization(probs, step):
import pandas as pd
pd.DataFrame({'probs': probs},
index=[x * step for x in range(len(probs))]).plot(figsize=(16, 8),
kind='area', ylim=[0, 1.05], xlim=[0, len(probs) * step],
xlabel='seconds',
ylabel='speech probability',
colormap='tab20')
@torch.no_grad()
def get_speech_timestamps(audio: torch.Tensor,
model,
threshold: float = 0.5,
sampling_rate: int = 16000,
min_speech_duration_ms: int = 250,
max_speech_duration_s: float = float('inf'),
min_silence_duration_ms: int = 100,
speech_pad_ms: int = 30,
return_seconds: bool = False,
visualize_probs: bool = False,
progress_tracking_callback: Callable[[float], None] = None,
window_size_samples: int = 512,):
"""
This method is used for splitting long audios into speech chunks using silero VAD
Parameters
----------
audio: torch.Tensor, one dimensional
One dimensional float torch.Tensor, other types are casted to torch if possible
model: preloaded .jit/.onnx silero VAD model
threshold: float (default - 0.5)
Speech threshold. Silero VAD outputs speech probabilities for each audio chunk, probabilities ABOVE this value are considered as SPEECH.
It is better to tune this parameter for each dataset separately, but "lazy" 0.5 is pretty good for most datasets.
sampling_rate: int (default - 16000)
Currently silero VAD models support 8000 and 16000 (or multiply of 16000) sample rates
min_speech_duration_ms: int (default - 250 milliseconds)
Final speech chunks shorter min_speech_duration_ms are thrown out
max_speech_duration_s: int (default - inf)
Maximum duration of speech chunks in seconds
Chunks longer than max_speech_duration_s will be split at the timestamp of the last silence that lasts more than 100ms (if any), to prevent agressive cutting.
Otherwise, they will be split aggressively just before max_speech_duration_s.
min_silence_duration_ms: int (default - 100 milliseconds)
In the end of each speech chunk wait for min_silence_duration_ms before separating it
speech_pad_ms: int (default - 30 milliseconds)
Final speech chunks are padded by speech_pad_ms each side
return_seconds: bool (default - False)
whether return timestamps in seconds (default - samples)
visualize_probs: bool (default - False)
whether draw prob hist or not
progress_tracking_callback: Callable[[float], None] (default - None)
callback function taking progress in percents as an argument
window_size_samples: int (default - 512 samples)
!!! DEPRECATED, DOES NOTHING !!!
Returns
----------
speeches: list of dicts
list containing ends and beginnings of speech chunks (samples or seconds based on return_seconds)
"""
if not torch.is_tensor(audio):
try:
audio = torch.Tensor(audio)
except:
raise TypeError("Audio cannot be casted to tensor. Cast it manually")
if len(audio.shape) > 1:
for i in range(len(audio.shape)): # trying to squeeze empty dimensions
audio = audio.squeeze(0)
if len(audio.shape) > 1:
raise ValueError("More than one dimension in audio. Are you trying to process audio with 2 channels?")
if sampling_rate > 16000 and (sampling_rate % 16000 == 0):
step = sampling_rate // 16000
sampling_rate = 16000
audio = audio[::step]
warnings.warn('Sampling rate is a multiply of 16000, casting to 16000 manually!')
else:
step = 1
if sampling_rate not in [8000, 16000]:
raise ValueError("Currently silero VAD models support 8000 and 16000 (or multiply of 16000) sample rates")
window_size_samples = 512 if sampling_rate == 16000 else 256
model.reset_states()
min_speech_samples = sampling_rate * min_speech_duration_ms / 1000
speech_pad_samples = sampling_rate * speech_pad_ms / 1000
max_speech_samples = sampling_rate * max_speech_duration_s - window_size_samples - 2 * speech_pad_samples
min_silence_samples = sampling_rate * min_silence_duration_ms / 1000
min_silence_samples_at_max_speech = sampling_rate * 98 / 1000
audio_length_samples = len(audio)
speech_probs = []
for current_start_sample in range(0, audio_length_samples, window_size_samples):
chunk = audio[current_start_sample: current_start_sample + window_size_samples]
if len(chunk) < window_size_samples:
chunk = torch.nn.functional.pad(chunk, (0, int(window_size_samples - len(chunk))))
speech_prob = model(chunk, sampling_rate).item()
speech_probs.append(speech_prob)
# caculate progress and seng it to callback function
progress = current_start_sample + window_size_samples
if progress > audio_length_samples:
progress = audio_length_samples
progress_percent = (progress / audio_length_samples) * 100
if progress_tracking_callback:
progress_tracking_callback(progress_percent)
triggered = False
speeches = []
current_speech = {}
neg_threshold = threshold - 0.15
temp_end = 0 # to save potential segment end (and tolerate some silence)
prev_end = next_start = 0 # to save potential segment limits in case of maximum segment size reached
for i, speech_prob in enumerate(speech_probs):
if (speech_prob >= threshold) and temp_end:
temp_end = 0
if next_start < prev_end:
next_start = window_size_samples * i
if (speech_prob >= threshold) and not triggered:
triggered = True
current_speech['start'] = window_size_samples * i
continue
if triggered and (window_size_samples * i) - current_speech['start'] > max_speech_samples:
if prev_end:
current_speech['end'] = prev_end
speeches.append(current_speech)
current_speech = {}
if next_start < prev_end: # previously reached silence (< neg_thres) and is still not speech (< thres)
triggered = False
else:
current_speech['start'] = next_start
prev_end = next_start = temp_end = 0
else:
current_speech['end'] = window_size_samples * i
speeches.append(current_speech)
current_speech = {}
prev_end = next_start = temp_end = 0
triggered = False
continue
if (speech_prob < neg_threshold) and triggered:
if not temp_end:
temp_end = window_size_samples * i
if ((window_size_samples * i) - temp_end) > min_silence_samples_at_max_speech : # condition to avoid cutting in very short silence
prev_end = temp_end
if (window_size_samples * i) - temp_end < min_silence_samples:
continue
else:
current_speech['end'] = temp_end
if (current_speech['end'] - current_speech['start']) > min_speech_samples:
speeches.append(current_speech)
current_speech = {}
prev_end = next_start = temp_end = 0
triggered = False
continue
if current_speech and (audio_length_samples - current_speech['start']) > min_speech_samples:
current_speech['end'] = audio_length_samples
speeches.append(current_speech)
for i, speech in enumerate(speeches):
if i == 0:
speech['start'] = int(max(0, speech['start'] - speech_pad_samples))
if i != len(speeches) - 1:
silence_duration = speeches[i+1]['start'] - speech['end']
if silence_duration < 2 * speech_pad_samples:
speech['end'] += int(silence_duration // 2)
speeches[i+1]['start'] = int(max(0, speeches[i+1]['start'] - silence_duration // 2))
else:
speech['end'] = int(min(audio_length_samples, speech['end'] + speech_pad_samples))
speeches[i+1]['start'] = int(max(0, speeches[i+1]['start'] - speech_pad_samples))
else:
speech['end'] = int(min(audio_length_samples, speech['end'] + speech_pad_samples))
if return_seconds:
for speech_dict in speeches:
speech_dict['start'] = round(speech_dict['start'] / sampling_rate, 1)
speech_dict['end'] = round(speech_dict['end'] / sampling_rate, 1)
elif step > 1:
for speech_dict in speeches:
speech_dict['start'] *= step
speech_dict['end'] *= step
if visualize_probs:
make_visualization(speech_probs, window_size_samples / sampling_rate)
return speeches
class VADIterator:
def __init__(self,
model,
threshold: float = 0.5,
sampling_rate: int = 16000,
min_silence_duration_ms: int = 100,
speech_pad_ms: int = 30
):
"""
Class for stream imitation
Parameters
----------
model: preloaded .jit/.onnx silero VAD model
threshold: float (default - 0.5)
Speech threshold. Silero VAD outputs speech probabilities for each audio chunk, probabilities ABOVE this value are considered as SPEECH.
It is better to tune this parameter for each dataset separately, but "lazy" 0.5 is pretty good for most datasets.
sampling_rate: int (default - 16000)
Currently silero VAD models support 8000 and 16000 sample rates
min_silence_duration_ms: int (default - 100 milliseconds)
In the end of each speech chunk wait for min_silence_duration_ms before separating it
speech_pad_ms: int (default - 30 milliseconds)
Final speech chunks are padded by speech_pad_ms each side
"""
self.model = model
self.threshold = threshold
self.sampling_rate = sampling_rate
if sampling_rate not in [8000, 16000]:
raise ValueError('VADIterator does not support sampling rates other than [8000, 16000]')
self.min_silence_samples = sampling_rate * min_silence_duration_ms / 1000
self.speech_pad_samples = sampling_rate * speech_pad_ms / 1000
self.reset_states()
def reset_states(self):
self.model.reset_states()
self.triggered = False
self.temp_end = 0
self.current_sample = 0
@torch.no_grad()
def __call__(self, x, return_seconds=False):
"""
x: torch.Tensor
audio chunk (see examples in repo)
return_seconds: bool (default - False)
whether return timestamps in seconds (default - samples)
"""
if not torch.is_tensor(x):
try:
x = torch.Tensor(x)
except:
raise TypeError("Audio cannot be casted to tensor. Cast it manually")
window_size_samples = len(x[0]) if x.dim() == 2 else len(x)
self.current_sample += window_size_samples
speech_prob = self.model(x, self.sampling_rate).item()
if (speech_prob >= self.threshold) and self.temp_end:
self.temp_end = 0
if (speech_prob >= self.threshold) and not self.triggered:
self.triggered = True
speech_start = self.current_sample - self.speech_pad_samples - window_size_samples
return {'start': int(speech_start) if not return_seconds else round(speech_start / self.sampling_rate, 1)}
if (speech_prob < self.threshold - 0.15) and self.triggered:
if not self.temp_end:
self.temp_end = self.current_sample
if self.current_sample - self.temp_end < self.min_silence_samples:
return None
else:
speech_end = self.temp_end + self.speech_pad_samples - window_size_samples
self.temp_end = 0
self.triggered = False
return {'end': int(speech_end) if not return_seconds else round(speech_end / self.sampling_rate, 1)}
return None
def collect_chunks(tss: List[dict],
wav: torch.Tensor):
chunks = []
for i in tss:
chunks.append(wav[i['start']: i['end']])
return torch.cat(chunks)
def drop_chunks(tss: List[dict],
wav: torch.Tensor):
chunks = []
cur_start = 0
for i in tss:
chunks.append((wav[cur_start: i['start']]))
cur_start = i['end']
return torch.cat(chunks)