Skip to content

Commit

Permalink
Fixing some reliability issues with the pipeline script
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Oct 28, 2024
1 parent 45269fa commit 7678f31
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions pdelfin/birrpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,12 +527,16 @@ def build_dolma_doc(s3_workspace: str, pdf: DatabaseManager.PDFRecord) -> Option
last_page_start_index = 0
pdf_page_spans = []

# Error out quickly if this document cannot be assembled
for target_page_num in range(1, pdf.num_pages + 1):
usable_pages = [page for page in existing_pages if page.is_usable() and page.page_num == target_page_num]

if len(usable_pages) == 0:
return None


for target_page_num in range(1, pdf.num_pages + 1):
usable_pages = [page for page in existing_pages if page.is_usable() and page.page_num == target_page_num]

usable_page_data = [get_s3_bytes(workspace_s3, page.inference_s3_path,
start_index=page.start_index,
end_index=page.start_index + page.length - 1) for page in usable_pages]
Expand Down Expand Up @@ -619,6 +623,7 @@ def get_current_round(s3_workspace: str) -> int:
parser.add_argument('--workspace_profile', help='S3 configuration profile for accessing the workspace', default=None)
parser.add_argument('--pdf_profile', help='S3 configuration profile for accessing the raw pdf documents', default=None)
parser.add_argument('--max_size_mb', type=int, default=250, help='Max file size in MB')
parser.add_argument('--workers', type=int, help='Number of workers to run in the processpool')
parser.add_argument('--reindex', action='store_true', default=False, help='Reindex all of the page_results')
parser.add_argument('--skip_build_queries', action='store_true', default=False, help='Skip generation of new pdf page queries for batch inferencing')
args = parser.parse_args()
Expand All @@ -642,7 +647,7 @@ def get_current_round(s3_workspace: str) -> int:
logger.info(f"Current round is {current_round}")

# One shared executor to rule them all
executor = ProcessPoolExecutor()
executor = ProcessPoolExecutor(max_workers=args.workers)

# If you have new PDFs, step one is to add them to the list
if args.add_pdfs:
Expand Down Expand Up @@ -682,7 +687,7 @@ def get_current_round(s3_workspace: str) -> int:
future_to_path = {executor.submit(process_jsonl_content, s3_path): (s3_path, etag) for s3_path, etag in inference_output_paths.items()}

for future in tqdm(as_completed(future_to_path), total=len(future_to_path), desc="Indexing Inference Results"):
s3_path, etag = future_to_path[future]
s3_path, etag = future_to_path.pop(future)
try:
inference_records = future.result()

Expand Down Expand Up @@ -765,7 +770,7 @@ def get_current_round(s3_workspace: str) -> int:
new_output_writer = BatchWriter(f"{args.workspace}/output", args.max_size_mb, after_flush=partial(mark_pdfs_done, args.workspace))

for future in tqdm(as_completed(future_to_path), total=len(future_to_path), desc="Assembling Dolma Docs"):
pdf = future_to_path[future]
pdf = future_to_path.pop(future)
dolma_doc = future.result()

if dolma_doc is not None:
Expand Down

0 comments on commit 7678f31

Please sign in to comment.