Skip to content

Commit

Permalink
exit handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 7, 2024
1 parent 051a7b4 commit 923231e
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
import argparse
import boto3
import signal
import os
import sys
import time
import subprocess
import atexit

Expand Down Expand Up @@ -109,17 +112,18 @@

# Donwload the model from the best place available
model_cache_dir = os.path.join(os.path.expanduser('~'), '.cache', 'pdelfin', 'model')
download_directory(args.model, model_cache_dir)
#download_directory(args.model, model_cache_dir)

# Start up the sglang server
sglang_process = subprocess.Popen([
"python3", "-m", "sglang.launch_server",
"--model-path", model_cache_dir,
"--chat-template", args.mode_chat_template,
"--context-length", args.model_max_context
"--chat-template", args.model_chat_template,
"--context-length", str(args.model_max_context),
])

# Register atexit function to guarantee process termination

# Register atexit function and signal handlers to guarantee process termination
def terminate_processes():
print("Terminating child processes...")
sglang_process.terminate()
Expand All @@ -132,14 +136,34 @@ def terminate_processes():

atexit.register(terminate_processes)

def signal_handler(sig, frame):
terminate_processes()
sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

# TODO
# Read in the work queue from s3
# Read in the done items from the s3 workspace

# TODO
# Spawn up to N workers to do:
# In a loop, take a random work item, read in the pdfs, queue in their requests
# Get results back, retry any failed pages
# Check periodically if that work is done in s3, if so, then abandon this work
# Save results back to s3 workspace output folder

# TODO
# Possible future addon, in beaker, discover other nodes on this same job
# Send them a message when you take a work item off the queue

try:
while True:
time.sleep(1)

if sglang_process.returncode is not None:
logger.error(f"Sglang server exited with code {sglang_process.returncode} exiting.")
except KeyboardInterrupt:
logger.info("Got keyboard interrupt, exiting everything")
sys.exit(1)

0 comments on commit 923231e

Please sign in to comment.