Skip to content

Commit

Permalink
More convservative filtering
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 21, 2024
1 parent cb800d6 commit 212d391
Showing 1 changed file with 53 additions and 40 deletions.
93 changes: 53 additions & 40 deletions pdelfin/filter/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,16 @@ def filter_out_pdf(self, local_pdf_path: str) -> bool:

base_text = pdftotext_result.stdout.decode("utf-8")

alpha_count = sum(c.isalpha() for c in base_text)

if len(base_text) < 200:
logger.info(f"Keeping {local_pdf_path} on the safe side because not enough text exists in it to analyze")
return False # keep the pdf

if alpha_count/len(base_text) < 0.50:
logger.info(f"Keeping {local_pdf_path} on the safe side because it's text does not contain many letters so it might be OCRed badly")
return False # keep the pdf

# Language check
language = self.language_detector.detect_language_of(base_text)
if language not in self.languages_to_keep:
Expand All @@ -116,7 +126,7 @@ def filter_out_pdf(self, local_pdf_path: str) -> bool:
import tempfile
import boto3
from pdelfin.s3_utils import parse_s3_path
from concurrent.futures import ProcessPoolExecutor, as_completed
from concurrent.futures import ProcessPoolExecutor, wait, FIRST_COMPLETED
from tqdm import tqdm

# Quiet logs from pypdf
Expand Down Expand Up @@ -154,45 +164,48 @@ def process_pdf(s3_path):
keep_path = "/home/ubuntu/s2pdf_paths_filter_keep.txt"
remove_path = "/home/ubuntu/s2pdf_paths_filter_remove.txt"

# Max number of concurrent futures
max_concurrent_futures = 1000
max_pending = 20 # Limit on the number of concurrent futures
total_pdfs = len(s3_work_paths)
pdf_iter = iter(s3_work_paths) # Iterator for PDFs

# Process the PDFs in parallel with limited concurrent futures
# Process the PDFs with limited concurrent futures
with open(keep_path, "w") as fkeep, open(remove_path, "w") as fremove:
with ProcessPoolExecutor(max_workers=max_concurrent_futures) as executor:
futures = {}
with tqdm(total=len(s3_work_paths), desc="Processing PDFs") as pbar:
for s3_path in s3_work_paths:
# Submit a new future if we haven't reached the max_concurrent_futures
with ProcessPoolExecutor(max_workers=max_pending) as executor:
pending_futures = {}

with tqdm(total=total_pdfs, desc="Processing PDFs") as pbar:
# Submit initial batch of futures
for _ in range(min(max_pending, total_pdfs)):
s3_path = next(pdf_iter)
future = executor.submit(process_pdf, s3_path)
futures[future] = s3_path

# Monitor completed futures
try:
while len(futures) >= max_concurrent_futures:
for completed_future in as_completed(futures, timeout=0.1):
s3_path = futures.pop(completed_future)
try:
s3_path, result = completed_future.result()
if result == "keep":
fkeep.write(s3_path + "\n")
elif result == "remove":
fremove.write(s3_path + "\n")
except Exception as e:
print(f"Error processing {s3_path}: {e}")
pbar.update(1)
except TimeoutError:
pass

# Process remaining futures after all tasks have been submitted
for completed_future in as_completed(futures):
s3_path = futures.pop(completed_future)
try:
s3_path, result = completed_future.result()
if result == "keep":
fkeep.write(s3_path + "\n")
elif result == "remove":
fremove.write(s3_path + "\n")
except Exception as e:
print(f"Error processing {s3_path}: {e}")
pbar.update(1)
pending_futures[future] = s3_path

while pending_futures:
# Wait for the next future to complete
done, _ = wait(
pending_futures.keys(),
timeout=0.1,
return_when=FIRST_COMPLETED,
)

for future in done:
s3_path = pending_futures.pop(future)
try:
s3_path, result = future.result()
if result == "keep":
fkeep.write(s3_path + "\n")
elif result == "remove":
fremove.write(s3_path + "\n")
except Exception as e:
print(f"Error processing {s3_path}: {e}")

# Update the progress bar
pbar.update(1)

# Submit a new future if there are more PDFs
try:
s3_path = next(pdf_iter)
future = executor.submit(process_pdf, s3_path)
pending_futures[future] = s3_path
except StopIteration:
pass # No more PDFs to process

0 comments on commit 212d391

Please sign in to comment.