Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 13, 2024
1 parent 83bb1dc commit a3b6962
Showing 1 changed file with 4 additions and 5 deletions.
9 changes: 4 additions & 5 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,12 +361,15 @@ async def process_pdf(args, session: aiohttp.ClientSession, worker_id: int, pdf_
async def worker(args, queue, semaphore, worker_id):
while True:
[work_hash, pdfs] = await queue.get()
await tracker.clear_work(worker_id)

try:
await tracker.clear_work(worker_id)

# Wait until allowed to proceed
await semaphore.acquire()

# TODO: Double check that the work item has not been done already by looking at the s3 workspace

async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3600),
connector=aiohttp.TCPConnector(limit=1000)) as session:
dolma_docs = await asyncio.gather(*[process_pdf(args, session, worker_id, pdf) for pdf in pdfs])
Expand Down Expand Up @@ -691,10 +694,6 @@ async def main():
if __name__ == "__main__":
asyncio.run(main())

# TODO
# If there is a beaker flag, then your job is to trigger this script with N replicas on beaker
# If not, then your job is to do the actual work

# TODO
# Possible future addon, in beaker, discover other nodes on this same job
# Send them a message when you take a work item off the queue

0 comments on commit a3b6962

Please sign in to comment.