Skip to content

Commit

Permalink
Stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Oct 9, 2024
1 parent 991b213 commit 954b19a
Showing 1 changed file with 4 additions and 13 deletions.
17 changes: 4 additions & 13 deletions pdelfin/data/runpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def fetch_s3_file(s3_url: str, local_path: str) -> str:
s3.download_file(bucket_name, key, local_path)
return local_path

def process_pdf(pdf_path: str, first_n_pages: int, max_sample_pages: int, no_filter: bool) -> Generator[dict, None, None]:
def process_pdf(pdf_path: str, no_filter: bool) -> Generator[dict, None, None]:
if pdf_path.startswith("s3://"):
local_pdf_path = os.path.join("/tmp", os.path.basename(pdf_path))
fetch_s3_file(pdf_path, local_pdf_path)
Expand Down Expand Up @@ -119,7 +119,7 @@ def main():
# Rest of the code remains the same
cur_file_num = 0
output_dir = args.output
max_file_size = 99 * 1024 * 1024 # 99MB in bytes
max_file_size = args.max_size_mb * 1024 * 1024
cur_file_size = 0
cur_file_path = os.path.join(output_dir, f"output_{cur_file_num}.jsonl")

Expand All @@ -136,12 +136,11 @@ def main():
with ProcessPoolExecutor() as executor:
futures = []

with tqdm(desc="Processing PDFs", leave=False, total=args.num_sample_docs) as pb:
with tqdm(desc="Processing PDFs", leave=False, total=len(pdf_paths)) as pb:
for pdf_path in pdf_paths:
futures.append(executor.submit(process_pdf, pdf_path, args.first_n_pages, args.max_sample_pages, args.no_filter))
futures.append(executor.submit(process_pdf, pdf_path, args.no_filter))

for future in as_completed(futures):
has_output = False # Track if the current PDF produces at least one request
try:
request_results = future.result() # Get the result from the thread

Expand All @@ -163,16 +162,8 @@ def main():
cur_file.write("\n")
cur_file_size += request_size

has_output = True # At least one request object was generated

if has_output:
pdfs_with_output += 1
pb.update(1)

if pdfs_with_output >= args.num_sample_docs:
executor.shutdown(cancel_futures=True)
break

except Exception as e:
print(f"Error processing {pdf_path}: {str(e)}")

Expand Down

0 comments on commit 954b19a

Please sign in to comment.