Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add fastapi support #2383

Merged
merged 5 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions llm/fastdeploy_llm/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ def __init__(self, config):
# stream_sender is a reserved value for serving frameworks such as triton
# to send generated tokens streamly
self.stream_sender = None
self.previous_start_inference_time = time.time()
self.current_start_inference_time = self.previous_start_inference_time
self.hang_detection_lock = threading.Lock()

def _pad_noencoder_batch_task(self, batch_tasks):
new_task_counter = 0
Expand Down Expand Up @@ -223,14 +226,17 @@ def async_predict(self, batch_tasks, stop_num=None):
if self._pad_noencoder_batch_task(batch_tasks):
stop_num += 1
inputs = self._prepare_inputs(batch_tasks.tasks, stop_num)

self._request_to_engine(inputs)
self._notify_engine()
return self.thread_executor.submit(self._update_task_results,
batch_tasks.tasks)

def _update_task_results(self, tasks):
step_index = 1
last_response_time = time.time()
last_response_time = None
with self.hang_detection_lock:
self.current_start_inference_time = time.time()
while True:
filepath = f"./real_time_save.temp_ids_rank_0_step_{step_index}"
if os.path.exists(filepath):
Expand Down Expand Up @@ -286,19 +292,20 @@ def _update_task_results(self, tasks):
else:
if not self._is_engine_busy():
break
if time.time() - last_response_time > self.config.inference_response_timeout:
error_type = ErrorType.Server
error_code = ErrorCode.S0003
error_info = "Inference engine output token timeout due to some unexpectable exceptions."
error_msg = error_format.format(error_type.name, error_code.name, error_info)
warning_logger.error(error_msg)
if last_response_time is not None:
if time.time() - last_response_time > self.config.inference_response_timeout:
error_type = ErrorType.Server
error_code = ErrorCode.S0003
error_info = "Inference engine output token timeout due to some unexpectable exceptions."
error_msg = error_format.format(error_type.name, error_code.name, error_info)
warning_logger.error(error_msg)
ret = self.engine_proc.poll()
if ret is not None:
logger.error(
"The inference engine is not alive, check log/workerlog for more details."
"The inference engine is not alive, check log/infer.log for more details."
)
raise Exception(
"The inference engine is not alive, check log/workerlog for more details."
"The inference engine is not alive, check log/infer.log for more details."
)

remove_files(".", "real_time_save.temp_ids_rank_*")
Expand Down Expand Up @@ -329,6 +336,8 @@ def _update_task_results(self, tasks):
tasks[i].status = TaskStatus.FINISHED
else:
tasks[i].status = TaskStatus.DECODING
with self.hang_detection_lock:
self.previous_start_inference_time = self.current_start_inference_time

def _get_engine_info(self):
output_size = bytes(self.shm_output_data.buf[:4])
Expand Down
63 changes: 60 additions & 3 deletions llm/fastdeploy_llm/server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,67 @@ def check_live(self):
API for detecting http app status.
"""
if self.model.model._is_engine_initialized() and (self.model.model.engine_proc.poll() is None):
logger.info("check_live: True")
return Response(status_code=200)
# 引擎进程未退出
# 判断是否推理在正常进行
# 1. current_start_inference_time表示当前推理的开始时间 previous_start_inference_time表示上次推理的开始时间
# 代码逻辑约定,当没有新请求或者当前请求推理结束的时候,current_start_inference_time和previous_start_inference_time是相同的
# 当前请求正在推理的时候,current_start_inference_time和previous_start_inference_time是不同的
with self.model.model.hang_detection_lock:
previous_start_inference_time = self.model.model.previous_start_inference_time
current_start_inference_time = self.model.model.previous_start_inference_time
if previous_start_inference_time == current_start_inference_time:
#(1) 没有新请求或者新请求推理完毕:等待10s,两者还相同,判定正常
time.sleep(10)
with self.model.model.hang_detection_lock:
new_previous_start_inference_time = self.model.model.previous_start_inference_time
new_current_start_inference_time = self.model.model.current_start_inference_time
if new_previous_start_inference_time == new_current_start_inference_time:
logger.info("check_live: True")
return Response(status_code=200)
else:
# 两者不同,说明进入了推理阶段
# new_current_start_inference_time肯定和current_start_inference_time不同
# 再等20s,正常的话肯定在处理新请求或者是旧请求已经结束
current_start_inference_time = new_current_start_inference_time
time.sleep(20)
with self.model.model.hang_detection_lock:
new_current_start_inference_time = self.model.model.current_start_inference_time
new_previous_start_inference_time = self.model.model.previous_start_inference_time
# 推理完了,且没有新的请求
if new_previous_start_inference_time == new_current_start_inference_time:
logger.info("check_live: True")
return Response(status_code=200)
else:
# 不同,可能是当前推理没结束,或者结束了又进入了新的推理。第一种情况判定为hang死,第二种情况判定为正常
if new_current_start_inference_time == current_start_inference_time:
warning_logger.error("check_live: False")
return Response(status_code=500)
else:
logger.info("check_live: True")
return Response(status_code=200)

else:
# 两者不同,说明进入了推理阶段
# 再等20s,正常的话肯定在处理新请求或者是旧请求已经结束
time.sleep(20)
with self.model.model.hang_detection_lock:
new_current_start_inference_time = self.model.model.current_start_inference_time
new_previous_start_inference_time = self.model.model.previous_start_inference_time
# 推理完了,且没有新的请求
if new_previous_start_inference_time == new_current_start_inference_time:
logger.info("check_live: True")
return Response(status_code=200)
else:
# 不同,可能是当前推理没结束,或者结束了又进入了新的推理。第一种情况判定为hang死,第二种情况判定为正常
if new_current_start_inference_time == current_start_inference_time:
warning_logger.error("check_live: False")
return Response(status_code=500)
else:
logger.info("check_live: True")
return Response(status_code=200)

else:
logger.info("check_live: False")
warning_logger.error("check_live: False")
return Response(status_code=500)


Expand Down
4 changes: 2 additions & 2 deletions llm/fastdeploy_llm/server/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def main():
if os.path.exists(pgid_file_path):
with open(pgid_file_path, 'r') as f:
pgid = f.read().strip()
# 发送 SIGTERM 信号以停止服务
os.killpg(int(pgid), signal.SIGTERM)
# 发送 SIGINT 信号以停止服务
os.killpg(int(pgid), signal.SIGINT)
return
if os.getenv("MODEL_DIR", None) is None:
raise ValueError("Environment variable MODEL_DIR must be set")
Expand Down
3 changes: 1 addition & 2 deletions llm/fastdeploy_llm/utils/launch_infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,5 @@ def launch(device_ids, **kwargs: dict):
pd_cmd,
shell=True,
stdout=infer_logger,
stderr=infer_logger,
preexec_fn=os.setsid)
stderr=infer_logger)
return p
Loading