Skip to content

Commit

Permalink
Prepping work script
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 7, 2024
1 parent a65e12b commit 051a7b4
Showing 1 changed file with 22 additions and 0 deletions.
22 changes: 22 additions & 0 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import argparse
import boto3
import os
import subprocess
import atexit

from tqdm import tqdm

Expand Down Expand Up @@ -76,6 +78,7 @@
logger.info(f"{len(new_pdfs):,} new pdf paths to add to the workspace")

# Group the new PDFs into chunks of group_size
# TODO: Figure out the group size automatically by sampling a few pdfs, and taking the mean/median number of pages, etc.
new_groups = []
current_group = []
for pdf in sorted(new_pdfs): # Sort for consistency
Expand Down Expand Up @@ -109,6 +112,25 @@
download_directory(args.model, model_cache_dir)

# Start up the sglang server
sglang_process = subprocess.Popen([
"python3", "-m", "sglang.launch_server",
"--model-path", model_cache_dir,
"--chat-template", args.mode_chat_template,
"--context-length", args.model_max_context
])

# Register atexit function to guarantee process termination
def terminate_processes():
print("Terminating child processes...")
sglang_process.terminate()
try:
sglang_process.wait(timeout=30)
except subprocess.TimeoutExpired:
print("Forcing termination of child processes.")
sglang_process.kill()
print("Child processes terminated.")

atexit.register(terminate_processes)

# Read in the work queue from s3
# Read in the done items from the s3 workspace
Expand Down

0 comments on commit 051a7b4

Please sign in to comment.