Skip to content

Commit

Permalink
Page calc
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 14, 2024
1 parent 4eab90f commit 8217e49
Showing 1 changed file with 28 additions and 4 deletions.
32 changes: 28 additions & 4 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,37 @@ async def populate_pdf_work_queue(args):
new_pdfs = all_pdfs - existing_pdf_set
logger.info(f"{len(new_pdfs):,} new pdf paths to add to the workspace")

# Group the new PDFs into chunks of group_size
# TODO: Figure out the group size automatically by sampling a few pdfs, and taking the mean/median number of pages, etc.
sample_size = min(100, len(new_pdfs))
sampled_pdfs = random.sample(list(new_pdfs), sample_size)

page_counts = []

for pdf in tqdm(sampled_pdfs, desc="Sampling PDFs to calculate optimial length"):
try:
# Download the PDF to a temp file
with tempfile.NamedTemporaryFile(suffix=".pdf") as tmp_file:
s3_bucket, s3_key = parse_s3_path(pdf)
pdf_s3.download_fileobj(s3_bucket, s3_key, tmp_file)
tmp_file.flush()
reader = PdfReader(tmp_file.name)
page_counts.append(len(reader.pages))
except Exception as e:
logger.warning(f"Failed to read {pdf}: {e}")

if page_counts:
avg_pages_per_pdf = sum(page_counts) / len(page_counts)
else:
logger.warning("Could not read any PDFs to estimate average page count.")
avg_pages_per_pdf = 10 # Default to 10 pages per PDF if sampling fails

group_size = max(1, int(args.pages_per_group / avg_pages_per_pdf))
logger.info(f"Calculated group_size: {group_size} based on average pages per PDF: {avg_pages_per_pdf:.2f}")

new_groups = []
current_group = []
for pdf in sorted(new_pdfs): # Sort for consistency
current_group.append(pdf)
if len(current_group) == args.group_size:
if len(current_group) == group_size:
group_hash = compute_workgroup_sha1(current_group)
new_groups.append((group_hash, current_group))
current_group = []
Expand Down Expand Up @@ -645,7 +669,7 @@ async def main():
parser.add_argument('--pdfs', help='Path to add pdfs stored in s3 to the workspace, can be a glob path s3://bucket/prefix/*.pdf or path to file containing list of pdf paths', default=None)
parser.add_argument('--workspace_profile', help='S3 configuration profile for accessing the workspace', default=None)
parser.add_argument('--pdf_profile', help='S3 configuration profile for accessing the raw pdf documents', default=None)
parser.add_argument('--group_size', type=int, default=20, help='Number of pdfs that will be part of each work item in the work queue.')
parser.add_argument('--pages_per_group', type=int, default=500, help='Aiming for this many pdf pages per work item group')
parser.add_argument('--workers', type=int, default=8, help='Number of workers to run at a time')

# Model parameters
Expand Down

0 comments on commit 8217e49

Please sign in to comment.