Skip to content

Commit

Permalink
Fixing timeout situation
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 15, 2024
1 parent 65763de commit 80ba562
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 23 deletions.
50 changes: 28 additions & 22 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -510,42 +510,39 @@ async def sglang_server_task(args, semaphore):
"-m", "sglang.launch_server",
"--model-path", model_cache_dir,
"--chat-template", args.model_chat_template,

# TODO Had to comment this out, I thought it would be good to enforce a context limit on the server side, but it causes crashes
#"--context-length", str(args.model_max_context),

# "--context-length", str(args.model_max_context), # Commented out due to crashes
"--port", str(SGLANG_SERVER_PORT),
"--log-level-http", "warning",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
)

# Make sure we kill this subprocess on exit
# Ensure the subprocess is terminated on exit
def _kill_proc():
proc.terminate()

atexit.register(_kill_proc)

last_running_req, last_queue_req = 0, 0 # To track transitions
can_release_automatically = False
# Shared variables between tasks
last_running_req, last_queue_req = 0, 0
can_release_automatically = True
last_semaphore_release = time.time()

async def process_line(line):
nonlocal last_running_req, last_queue_req, can_release_automatically, last_semaphore_release
sglang_logger.info(line)

match = re.search(r'#running-req: (\d+)', line)
if match:
last_running_req = int(match.group(1))

if last_running_req > 0:
can_release_automatically = True

# Parse the line and update semaphore if necessary

match = re.search(r'#queue-req: (\d+)', line)
if match:
queue_req = int(match.group(1))
logger.info(f"sglang running req: {last_running_req} queue req: {queue_req}")

if last_queue_req != 0 and queue_req == 0:
# Release the semaphore when queue_req transitions from non-zero to zero
if semaphore.locked():
Expand All @@ -555,13 +552,6 @@ async def process_line(line):

last_queue_req = queue_req

# And have a semaphore release automatically if there are no queued requests for > 30 seconds
if last_queue_req == 0 and can_release_automatically and time.time() - last_semaphore_release > 30 and semaphore.locked():
semaphore.release()
last_semaphore_release = time.time()
can_release_automatically = False
logger.info("Semaphore released due to timeout, allowing a worker to proceed.")

async def read_stream(stream):
while True:
line = await stream.readline()
Expand All @@ -570,13 +560,29 @@ async def read_stream(stream):
line = line.decode('utf-8').rstrip()
await process_line(line)

# Start tasks to read stdout and stderr
async def timeout_task():
nonlocal last_running_req, last_queue_req, can_release_automatically, last_semaphore_release
try:
while True:
await asyncio.sleep(1) # Check every second
logger.info(f"{last_queue_req}, {can_release_automatically}, {time.time() - last_semaphore_release}, {semaphore.locked()}")
if (last_queue_req == 0 and can_release_automatically and
time.time() - last_semaphore_release > 30 and semaphore.locked()):
semaphore.release()
last_semaphore_release = time.time()
can_release_automatically = False
logger.info("Semaphore released due to timeout, allowing a worker to proceed.")
except asyncio.CancelledError:
pass # Clean up if the task is cancelled

# Start tasks to read stdout, stderr, and handle timeout logic
stdout_task = asyncio.create_task(read_stream(proc.stdout))
stderr_task = asyncio.create_task(read_stream(proc.stderr))
timeout_task_future = asyncio.create_task(timeout_task())

await proc.wait()
await stdout_task
await stderr_task
timeout_task_future.cancel()
await asyncio.gather(stdout_task, stderr_task, timeout_task_future, return_exceptions=True)


async def sglang_server_host(args, semaphore):
Expand Down
2 changes: 1 addition & 1 deletion pdelfin/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
_MINOR = "1"
# On main and in a nightly release the patch should be one ahead of the last
# released build.
_PATCH = "19"
_PATCH = "20"
# This is mainly for nightly builds which have the suffix ".dev$DATE". See
# https://semver.org/#is-v123-a-semantic-version for the semantics.
_SUFFIX = ""
Expand Down

0 comments on commit 80ba562

Please sign in to comment.