Skip to content

Commit

Permalink
Fixing bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 14, 2024
1 parent b67d8e7 commit 4eab90f
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
17 changes: 10 additions & 7 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@
# Process pool for offloading cpu bound work, like calculating anchor texts
process_pool = ProcessPoolExecutor()

SGLANG_SERVER_PORT = 30002

@dataclass(frozen=True)
class PageResult:
s3_path: str
Expand Down Expand Up @@ -250,7 +252,7 @@ async def work_item_completed(args, work_hash: str) -> bool:


async def process_page(args, session: aiohttp.ClientSession, worker_id: int, pdf_s3_path: str, pdf_local_path: str, page_num: int) -> PageResult:
COMPLETION_URL = "http://localhost:30000/v1/chat/completions"
COMPLETION_URL = f"http://localhost:{SGLANG_SERVER_PORT}/v1/chat/completions"
MAX_RETRIES = 3

exponential_backoffs = 0
Expand Down Expand Up @@ -303,7 +305,7 @@ async def process_page(args, session: aiohttp.ClientSession, worker_id: int, pdf
await tracker.track_work(worker_id, f"{pdf_s3_path}-{page_num}", "cancelled")
raise
except Exception as e:
logger.warning(f"Unexpected error on attempt {attempt} for {pdf_s3_path}-{page_num}: {e}")
logger.warning(f"Unexpected error on attempt {attempt} for {pdf_s3_path}-{page_num}: {type(e)} - {e}")
attempt += 1

if attempt >= MAX_RETRIES:
Expand Down Expand Up @@ -404,12 +406,12 @@ async def worker(args, queue, semaphore, worker_id):
logger.info(f"Work {work_hash} was already completed, skipping")
continue

async with asyncio.TaskGroup() as tg, \
aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3600),
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3600),
connector=aiohttp.TCPConnector(limit=1000)) as session:
dolma_docs = [tg.create_task(process_pdf(args, session, worker_id, pdf)) for pdf in pdfs]
async with asyncio.TaskGroup() as tg:
dolma_tasks = [tg.create_task(process_pdf(args, session, worker_id, pdf)) for pdf in pdfs]

dolma_docs = [task.result() for doc in dolma_docs if task.result() is not None]
dolma_docs = [task.result() for task in dolma_tasks if task.result() is not None]

# Write the Dolma documents to a local temporary file in JSONL format
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as tf:
Expand Down Expand Up @@ -457,6 +459,7 @@ async def sglang_server_task(args, semaphore):
"--model-path", model_cache_dir,
"--chat-template", args.model_chat_template,
"--context-length", str(args.model_max_context),
"--port", str(SGLANG_SERVER_PORT),
"--log-level-http", "warning",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
Expand Down Expand Up @@ -530,7 +533,7 @@ async def sglang_server_host(args, semaphore):
async def sglang_server_ready():
max_attempts = 300
delay_sec = 1
url = 'http://localhost:30000/v1/models'
url = f'http://localhost:{SGLANG_SERVER_PORT}/v1/models'

for attempt in range(1, max_attempts + 1):
try:
Expand Down
2 changes: 1 addition & 1 deletion pdelfin/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
_MINOR = "1"
# On main and in a nightly release the patch should be one ahead of the last
# released build.
_PATCH = "10"
_PATCH = "12"
# This is mainly for nightly builds which have the suffix ".dev$DATE". See
# https://semver.org/#is-v123-a-semantic-version for the semantics.
_SUFFIX = ""
Expand Down

0 comments on commit 4eab90f

Please sign in to comment.