Skip to content

Commit

Permalink
Cleanup code, s3 retries
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 14, 2024
1 parent 2c7686f commit fe0574c
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 10 deletions.
19 changes: 9 additions & 10 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from typing import Optional
from concurrent.futures import ProcessPoolExecutor

from pdelfin.s3_utils import expand_s3_glob, get_s3_bytes, parse_s3_path, download_zstd_csv, upload_zstd_csv, download_directory
from pdelfin.s3_utils import expand_s3_glob, get_s3_bytes, get_s3_bytes_with_backoff, parse_s3_path, download_zstd_csv, upload_zstd_csv, download_directory
from pdelfin.data.renderpdf import render_pdf_to_base64png
from pdelfin.prompts import build_finetuning_prompt, PageResponse
from pdelfin.prompts.anchor import get_anchor_text
Expand Down Expand Up @@ -69,7 +69,7 @@
# Process pool for offloading cpu bound work, like calculating anchor texts
process_pool = ProcessPoolExecutor()

SGLANG_SERVER_PORT = 30002
SGLANG_SERVER_PORT = 30003

@dataclass(frozen=True)
class PageResult:
Expand Down Expand Up @@ -321,27 +321,26 @@ async def process_page(args, session: aiohttp.ClientSession, worker_id: int, pdf
exponential_backoffs += 1
logger.info(f"Sleeping for {sleep_delay} seconds on {pdf_s3_path}-{page_num} to allow server restart")
await asyncio.sleep(sleep_delay)
except json.JSONDecodeError as e:
logger.warning(f"JSON decode error on attempt {attempt} for {pdf_s3_path}-{page_num}: {e}")
attempt += 1
except asyncio.CancelledError:
logger.info(f"Process page {pdf_s3_path}-{page_num} cancelled")
await tracker.track_work(worker_id, f"{pdf_s3_path}-{page_num}", "cancelled")
raise
except json.JSONDecodeError as e:
logger.warning(f"JSON decode error on attempt {attempt} for {pdf_s3_path}-{page_num}: {e}")
attempt += 1
except Exception as e:
logger.warning(f"Unexpected error on attempt {attempt} for {pdf_s3_path}-{page_num}: {type(e)} - {e}")
attempt += 1

if attempt >= MAX_RETRIES:
logger.error(f"Failed to process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts.")
await tracker.track_work(worker_id, f"{pdf_s3_path}-{page_num}", "errored")
raise ValueError(f"Could not process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts")
logger.error(f"Failed to process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts.")
await tracker.track_work(worker_id, f"{pdf_s3_path}-{page_num}", "errored")
raise ValueError(f"Could not process {pdf_s3_path}-{page_num} after {MAX_RETRIES} attempts")


async def process_pdf(args, session: aiohttp.ClientSession, worker_id: int, pdf_s3_path: str):
with tempfile.NamedTemporaryFile("wb+", suffix=".pdf") as tf:
# TODO Switch to aioboto3 or something
data = await asyncio.to_thread(lambda: get_s3_bytes(pdf_s3, pdf_s3_path))
data = await asyncio.to_thread(lambda: get_s3_bytes_with_backoff(pdf_s3, pdf_s3_path))
tf.write(data)
tf.flush()

Expand Down
12 changes: 12 additions & 0 deletions pdelfin/s3_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,18 @@ def get_s3_bytes(s3_client, s3_path: str, start_index: Optional[int] = None, end

return obj['Body'].read()

def get_s3_bytes_with_backoff(s3_client, pdf_s3_path, max_retries: int=8, backoff_factor: int=2):
attempt = 0
while attempt < max_retries:
try:
return get_s3_bytes(s3_client, pdf_s3_path)
except Exception as e:
wait_time = backoff_factor ** attempt
logger.warning(f"Attempt {attempt+1} failed to get_s3_bytes for {pdf_s3_path}: {e}. Retrying in {wait_time} seconds...")
time.sleep(wait_time)
attempt += 1
logger.error(f"Failed to get_s3_bytes for {pdf_s3_path} after {max_retries} retries.")
raise Exception("Failed to get_s3_bytes after retries")

def put_s3_bytes(s3_client, s3_path: str, data: bytes):
bucket, key = parse_s3_path(s3_path)
Expand Down

0 comments on commit fe0574c

Please sign in to comment.