-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.py
executable file
·107 lines (86 loc) · 2.99 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# uvicorn server:app --reload
import os
import hashlib
import json
import uuid
from typing import Optional
from fastapi import FastAPI, UploadFile, Form, File, HTTPException
from fastapi.responses import FileResponse
from starlette.staticfiles import StaticFiles
from dotenv import load_dotenv
from speech_to_text import *
VIDEO_DIR = "static/media"
os.makedirs(VIDEO_DIR, exist_ok=True)
load_dotenv()
app = FastAPI()
app.mount("/static", StaticFiles(directory="static"), name="static")
if not speech_to_text_initialize():
print("Failed to init speech to text API")
allowed_extensions = ["mp4", "webm"]
@app.get("/")
async def server_root():
return FileResponse("static/index.html")
@app.post("/beach_to_text/")
async def beach_to_text(
audio: UploadFile = File(...),
n_seconds: Optional[int] = Form(float('inf'))
):
hash_md5 = hashlib.md5(audio.filename.encode()).hexdigest()
save_path = os.path.join("uploads", f"{hash_md5}.wav")
content = await audio.read()
# os.makedirs(os.path.dirname(save_path), exist_ok=True)
# with open(save_path, "wb") as buffer:
# buffer.write(await audio.read())
if not is_connected_to_gcloud():
return {"data": [], "status": "no gcloud connection"}
try:
texts = to_text(content)
except GCloudException as e:
return {"data": [], "status": str(e)}
result = []
for e in texts:
#print(e)
if len(e.words) == 0:
print("Got no words")
#TODO: error
break
if n_seconds < to_seconds(e.words[-1].end_time):
for f in cut_subs(e, n_seconds):
result.append(f)
else:
result.append({
"text": e.transcript,
"start_time": to_seconds(e.words[0].start_time),
"end_time": to_seconds(e.words[-1].end_time),
})
res = {"data": result, "status": "ok"}
#print("res", res)
return res
@app.get("/video_list")
async def server_root():
files = [
{
"name": f,
"path": os.path.join(VIDEO_DIR, f)
}
for f in os.listdir(VIDEO_DIR)
if os.path.isfile(os.path.join(VIDEO_DIR, f))
and f.split('.')[-1].lower() in allowed_extensions
]
return files
@app.post("/upload_video/")
async def beach_to_text(
file: UploadFile = File(...),
):
file_path = os.path.join(VIDEO_DIR, file.filename)
base, extension = os.path.splitext(file_path)
extension=extension.lower().lstrip(".")
if not extension in allowed_extensions:
raise HTTPException(status_code=400, detail=f"Invalid file type. Allowed: {','.join(allowed_extensions)}")
new_filename = file_path
while os.path.exists(new_filename):
random_string = uuid.uuid4().hex[:8]
new_filename = f"{base}_{random_string}.{extension}"
with open(new_filename, "wb") as buffer:
buffer.write(await file.read())
return {"filename": file.filename, "file_path": new_filename}