Skip to content

Commit

Permalink
Creds and other things
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 13, 2024
1 parent a3b6962 commit fe3c9a2
Showing 1 changed file with 20 additions and 1 deletion.
21 changes: 20 additions & 1 deletion pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,20 @@ async def load_pdf_work_queue(args) -> asyncio.Queue:

return queue

async def work_item_completed(args, work_hash: str) -> bool:
# Check if work item has already been completed
output_s3_path = os.path.join(args.workspace, 'dolma_documents', f'output_{work_hash}.jsonl')
bucket, key = parse_s3_path(output_s3_path)

try:
# Check if the output file already exists
await asyncio.to_thread(workspace_s3.head_object, Bucket=bucket, Key=key)
return True
except workspace_s3.exceptions.ClientError as e:
pass

return False


async def process_page(args, session: aiohttp.ClientSession, worker_id: int, pdf_s3_path: str, pdf_local_path: str, page_num: int) -> PageResult:
COMPLETION_URL = "http://localhost:30000/v1/chat/completions"
Expand Down Expand Up @@ -368,7 +382,9 @@ async def worker(args, queue, semaphore, worker_id):
# Wait until allowed to proceed
await semaphore.acquire()

# TODO: Double check that the work item has not been done already by looking at the s3 workspace
if await work_item_completed(args, work_hash):
logger.info(f"Work {work_hash} was already completed, skipping")
continue

async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3600),
connector=aiohttp.TCPConnector(limit=1000)) as session:
Expand Down Expand Up @@ -629,6 +645,9 @@ async def main():
os.makedirs(os.path.dirname(cred_path), exist_ok=True)
with open(cred_path, "w") as f:
f.write(os.environ.get("AWS_CREDENTIALS_FILE"))
global workspace_s3, pdf_s3
workspace_s3 = workspace_session.client("s3")
pdf_s3 = pdf_session.client("s3")

if args.workspace_profile:
global workspace_s3
Expand Down

0 comments on commit fe3c9a2

Please sign in to comment.