Skip to content

Commit

Permalink
Debugging timeout errors and other things
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 15, 2024
1 parent fd17652 commit bfe4211
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
# Process pool for offloading cpu bound work, like calculating anchor texts
process_pool = ProcessPoolExecutor()

SGLANG_SERVER_PORT = 30002
SGLANG_SERVER_PORT = 30024

@dataclass(frozen=True)
class PageResult:
Expand Down Expand Up @@ -244,8 +244,8 @@ async def load_pdf_work_queue(args) -> asyncio.Queue:
}

# Determine remaining work
remaining_work_hashes = set(work_queue) - done_work_hashes
#remaining_work_hashes = set(["0e779f21fbb75d38ed4242c7e5fe57fa9a636bac"])
#remaining_work_hashes = set(work_queue) - done_work_hashes
remaining_work_hashes = set(["0e779f21fbb75d38ed4242c7e5fe57fa9a636bac"])
remaining_work_queue = {
hash_: work_queue[hash_]
for hash_ in remaining_work_hashes
Expand Down Expand Up @@ -318,7 +318,7 @@ async def process_page(args, session: aiohttp.ClientSession, worker_id: int, pdf
input_tokens=base_response_data["usage"].get("prompt_tokens", 0),
output_tokens=base_response_data["usage"].get("completion_tokens", 0)
)
except aiohttp.ClientError as e:
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
logger.warning(f"Client error on attempt {attempt} for {pdf_s3_path}-{page_num}: {e}")

# Now we want to do exponential backoff, and not count this as an actual page retry
Expand All @@ -339,7 +339,7 @@ async def process_page(args, session: aiohttp.ClientSession, worker_id: int, pdf
logger.warning(f"ValueError on attempt {attempt} for {pdf_s3_path}-{page_num}: {type(e)} - {e}")
attempt += 1
except Exception as e:
logger.warning(f"Unexpected error on attempt {attempt} for {pdf_s3_path}-{page_num}: {type(e)} - {e}")
logger.exception(f"Unexpected error on attempt {attempt} for {pdf_s3_path}-{page_num}: {type(e)} - {e}")
attempt += 1

logger.error(f"Failed to process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts.")
Expand Down Expand Up @@ -401,6 +401,7 @@ def build_dolma_document(pdf_s3_path, page_results):
pdf_page_spans.append([start_pos, current_char_pos, page_result.page_num])

if not document_text:
logger.info(f"No document text for {pdf_s3_path}")
return None # Return None if the document text is empty

# Build the Dolma document
Expand Down Expand Up @@ -442,10 +443,15 @@ async def worker(args, queue, semaphore, worker_id):
else:
logger.info(f"Proceeding with {work_hash} on worker {worker_id}")

async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3600),
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=60),
connector=aiohttp.TCPConnector(limit=1000)) as session:
async with asyncio.TaskGroup() as tg:
dolma_tasks = [tg.create_task(process_pdf(args, session, worker_id, pdf)) for pdf in pdfs]
logger.info(f"Created all tasks for {work_hash}")

logger.info(f"Finished TaskGroup for worker on {work_hash}")

logger.info(f"Closed ClientSession for {work_hash}")

dolma_docs = []
for task in dolma_tasks:
Expand All @@ -457,6 +463,8 @@ async def worker(args, queue, semaphore, worker_id):

if result is not None:
dolma_docs.append(result)

logger.info(f"Got {len(dolma_docs)} docs for {work_hash}")

# Write the Dolma documents to a local temporary file in JSONL format
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as tf:
Expand Down Expand Up @@ -779,14 +787,11 @@ async def main():

# Wait for server to stop
sglang_server.cancel()
await sglang_server

metrics_task.cancel()
await metrics_task

logger.info("Work done")

if __name__ == "__main__":
asyncio.run(main())
asyncio.run(main(), debug=True)

# TODO
# Possible future addon, in beaker, discover other nodes on this same job
Expand Down

0 comments on commit bfe4211

Please sign in to comment.