Skip to content

Commit

Permalink
Add fastapi support (#2383)
Browse files Browse the repository at this point in the history
* Add fastapi support

* Add fastapi support

* fix bug

* Update Code for fastapi
  • Loading branch information
rainyfly authored Feb 28, 2024
1 parent c18abc6 commit a843a3c
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 16 deletions.
27 changes: 18 additions & 9 deletions llm/fastdeploy_llm/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ def __init__(self, config):
# stream_sender is a reserved value for serving frameworks such as triton
# to send generated tokens streamly
self.stream_sender = None
self.previous_start_inference_time = time.time()
self.current_start_inference_time = self.previous_start_inference_time
self.hang_detection_lock = threading.Lock()

def _pad_noencoder_batch_task(self, batch_tasks):
new_task_counter = 0
Expand Down Expand Up @@ -223,14 +226,17 @@ def async_predict(self, batch_tasks, stop_num=None):
if self._pad_noencoder_batch_task(batch_tasks):
stop_num += 1
inputs = self._prepare_inputs(batch_tasks.tasks, stop_num)

self._request_to_engine(inputs)
self._notify_engine()
return self.thread_executor.submit(self._update_task_results,
batch_tasks.tasks)

def _update_task_results(self, tasks):
step_index = 1
last_response_time = time.time()
last_response_time = None
with self.hang_detection_lock:
self.current_start_inference_time = time.time()
while True:
filepath = f"./real_time_save.temp_ids_rank_0_step_{step_index}"
if os.path.exists(filepath):
Expand Down Expand Up @@ -286,19 +292,20 @@ def _update_task_results(self, tasks):
else:
if not self._is_engine_busy():
break
if time.time() - last_response_time > self.config.inference_response_timeout:
error_type = ErrorType.Server
error_code = ErrorCode.S0003
error_info = "Inference engine output token timeout due to some unexpectable exceptions."
error_msg = error_format.format(error_type.name, error_code.name, error_info)
warning_logger.error(error_msg)
if last_response_time is not None:
if time.time() - last_response_time > self.config.inference_response_timeout:
error_type = ErrorType.Server
error_code = ErrorCode.S0003
error_info = "Inference engine output token timeout due to some unexpectable exceptions."
error_msg = error_format.format(error_type.name, error_code.name, error_info)
warning_logger.error(error_msg)
ret = self.engine_proc.poll()
if ret is not None:
logger.error(
"The inference engine is not alive, check log/workerlog for more details."
"The inference engine is not alive, check log/infer.log for more details."
)
raise Exception(
"The inference engine is not alive, check log/workerlog for more details."
"The inference engine is not alive, check log/infer.log for more details."
)

remove_files(".", "real_time_save.temp_ids_rank_*")
Expand Down Expand Up @@ -329,6 +336,8 @@ def _update_task_results(self, tasks):
tasks[i].status = TaskStatus.FINISHED
else:
tasks[i].status = TaskStatus.DECODING
with self.hang_detection_lock:
self.previous_start_inference_time = self.current_start_inference_time

def _get_engine_info(self):
output_size = bytes(self.shm_output_data.buf[:4])
Expand Down
63 changes: 60 additions & 3 deletions llm/fastdeploy_llm/server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,10 +258,67 @@ def check_live(self):
API for detecting http app status.
"""
if self.model.model._is_engine_initialized() and (self.model.model.engine_proc.poll() is None):
logger.info("check_live: True")
return Response(status_code=200)
# 引擎进程未退出
# 判断是否推理在正常进行
# 1. current_start_inference_time表示当前推理的开始时间 previous_start_inference_time表示上次推理的开始时间
# 代码逻辑约定,当没有新请求或者当前请求推理结束的时候,current_start_inference_time和previous_start_inference_time是相同的
# 当前请求正在推理的时候,current_start_inference_time和previous_start_inference_time是不同的
with self.model.model.hang_detection_lock:
previous_start_inference_time = self.model.model.previous_start_inference_time
current_start_inference_time = self.model.model.previous_start_inference_time
if previous_start_inference_time == current_start_inference_time:
#(1) 没有新请求或者新请求推理完毕:等待10s,两者还相同,判定正常
time.sleep(10)
with self.model.model.hang_detection_lock:
new_previous_start_inference_time = self.model.model.previous_start_inference_time
new_current_start_inference_time = self.model.model.current_start_inference_time
if new_previous_start_inference_time == new_current_start_inference_time:
logger.info("check_live: True")
return Response(status_code=200)
else:
# 两者不同,说明进入了推理阶段
# new_current_start_inference_time肯定和current_start_inference_time不同
# 再等20s,正常的话肯定在处理新请求或者是旧请求已经结束
current_start_inference_time = new_current_start_inference_time
time.sleep(20)
with self.model.model.hang_detection_lock:
new_current_start_inference_time = self.model.model.current_start_inference_time
new_previous_start_inference_time = self.model.model.previous_start_inference_time
# 推理完了,且没有新的请求
if new_previous_start_inference_time == new_current_start_inference_time:
logger.info("check_live: True")
return Response(status_code=200)
else:
# 不同,可能是当前推理没结束,或者结束了又进入了新的推理。第一种情况判定为hang死,第二种情况判定为正常
if new_current_start_inference_time == current_start_inference_time:
warning_logger.error("check_live: False")
return Response(status_code=500)
else:
logger.info("check_live: True")
return Response(status_code=200)

else:
# 两者不同,说明进入了推理阶段
# 再等20s,正常的话肯定在处理新请求或者是旧请求已经结束
time.sleep(20)
with self.model.model.hang_detection_lock:
new_current_start_inference_time = self.model.model.current_start_inference_time
new_previous_start_inference_time = self.model.model.previous_start_inference_time
# 推理完了,且没有新的请求
if new_previous_start_inference_time == new_current_start_inference_time:
logger.info("check_live: True")
return Response(status_code=200)
else:
# 不同,可能是当前推理没结束,或者结束了又进入了新的推理。第一种情况判定为hang死,第二种情况判定为正常
if new_current_start_inference_time == current_start_inference_time:
warning_logger.error("check_live: False")
return Response(status_code=500)
else:
logger.info("check_live: True")
return Response(status_code=200)

else:
logger.info("check_live: False")
warning_logger.error("check_live: False")
return Response(status_code=500)


Expand Down
4 changes: 2 additions & 2 deletions llm/fastdeploy_llm/server/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ def main():
if os.path.exists(pgid_file_path):
with open(pgid_file_path, 'r') as f:
pgid = f.read().strip()
# 发送 SIGTERM 信号以停止服务
os.killpg(int(pgid), signal.SIGTERM)
# 发送 SIGINT 信号以停止服务
os.killpg(int(pgid), signal.SIGINT)
return
if os.getenv("MODEL_DIR", None) is None:
raise ValueError("Environment variable MODEL_DIR must be set")
Expand Down
3 changes: 1 addition & 2 deletions llm/fastdeploy_llm/utils/launch_infer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,5 @@ def launch(device_ids, **kwargs: dict):
pd_cmd,
shell=True,
stdout=infer_logger,
stderr=infer_logger,
preexec_fn=os.setsid)
stderr=infer_logger)
return p

0 comments on commit a843a3c

Please sign in to comment.