Skip to content

Commit

Permalink
some cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 8, 2024
1 parent 6590164 commit 9d51935
Showing 1 changed file with 16 additions and 21 deletions.
37 changes: 16 additions & 21 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,37 +187,34 @@ async def load_pdf_work_queue(args) -> asyncio.Queue:
return queue


async def process_page(session, pdf_path, page_num, args):
async def process_page(session, pdf_path, page_num, args) -> PageResponse:
COMPLETION_URL = "http://localhost:30000/v1/chat/completions"

query = await build_page_query(
pdf_path,
page_num,
args.target_longest_image_dim,
args.target_anchor_text_len
)
URL = "http://localhost:30000/v1/chat/completions"

logger.info(f"Got page query for {pdf_path}-{page_num}")


try:
async with session.post(URL, json=query) as response:

logger.info(f"Got response for {pdf_path}-{page_num}")

if response.status == 200:
result = await response.json()
return (page_num, result)
else:
async with session.post(COMPLETION_URL, json=query) as response:
if response.status != 200:
logger.warning(f"Request failed with status {response.status} for page {page_num}")
return None

try:
base_response_data = await response.json()
model_response_json = orjson.loads(base_response_data["outputs"][0]["text"])
page_response = PageResponse(**model_response_json)
except Exception as e:
logger.warning(f"Could not parse response for {pdf_path}-{page_num}")
except Exception as e:
logger.error(f"Exception while processing page {page_num}: {e}")
return None


async def process_pdf(args, pdf_s3_path):
URL = "http://localhost:30000/v1/chat/completions"


with tempfile.NamedTemporaryFile("wb+", suffix=".pdf") as tf:
# TODO Switch to aioboto3 or something
data = await asyncio.to_thread(lambda: get_s3_bytes(pdf_s3, pdf_s3_path))
Expand Down Expand Up @@ -317,16 +314,14 @@ async def sglang_server_task(args):
"--context-length", str(args.model_max_context),
)

def _cleanup_sglang_handler():
proc.kill()

atexit.register(_cleanup_sglang_handler)
# Make really sure we kill this subprocess on exit
atexit.register(lambda: proc.kill())

await proc.wait()


async def sglang_server_ready():
max_attempts = 60
max_attempts = 300
delay_sec = 1
url = 'http://localhost:30000/v1/models'

Expand Down

0 comments on commit 9d51935

Please sign in to comment.