-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathslice.py
104 lines (84 loc) · 2.69 KB
/
slice.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import argparse
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
import soundfile as sf
from pydub import AudioSegment
from pydub.silence import split_on_silence
from utils import is_audio_file, logger
def process_file(file: Path, output_dir: Path, threshold: int):
chunks = split(file, threshold)
for i, chunk in enumerate(chunks):
chunk = chunk.set_channels(1)
chunk = chunk.set_frame_rate(44100)
out_file = output_dir / f"{file.stem}_{i}.wav"
chunk.export(out_file, format="wav", codec="pcm_s16le")
flag_file = out_file.with_suffix(".flag")
flag_file.touch()
def split(input_file: Path, threshold: int) -> list[AudioSegment]:
try:
audio, sr = sf.read(input_file)
tmp_file = input_file.name
sf.write(tmp_file, audio, sr)
audio = AudioSegment.from_file(tmp_file)
Path(tmp_file).unlink()
except Exception as e:
print(e)
return []
min_silence_len = 250
silence_thresh = -threshold
keep_silence = 200
min_chunk_len = 500
max_chunk_len = 30_000
if (len(audio) < min_chunk_len) or (len(audio) > max_chunk_len):
return []
chunks = split_on_silence(
audio,
min_silence_len=min_silence_len,
silence_thresh=silence_thresh,
keep_silence=keep_silence,
)
return [chunk for chunk in chunks if min_chunk_len < len(chunk) < max_chunk_len]
logger.add(f"logs/split.log")
if __name__ == "__main__":
logger.info("Starting slicing")
parser = argparse.ArgumentParser()
parser.add_argument(
"--input_dir",
"-i",
type=str,
default="inputs",
help="Directory of input wav files",
)
parser.add_argument(
"--output_dir",
"-o",
type=str,
)
parser.add_argument(
"--num_workers",
"-w",
type=int,
default=10,
)
parser.add_argument(
"--threshold",
"-t",
type=int,
default=40,
)
args = parser.parse_args()
logger.info(f"Slicing args: {args}")
threshold: int = args.threshold
input_dir = Path(args.input_dir)
output_dir = Path(args.output_dir)
output_dir.mkdir(exist_ok=True, parents=True)
audio_files = [x for x in input_dir.glob("*") if is_audio_file(x)]
logger.info(f"Found {len(audio_files)} audio files in {input_dir}")
with ProcessPoolExecutor(max_workers=args.num_workers) as executor:
futures = [
executor.submit(process_file, file, output_dir, threshold)
for file in audio_files
]
for future in futures:
future.result()
logger.success(f"Slice done for {input_dir}")