Skip to content

Commit

Permalink
Adding stats
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 18, 2024
1 parent b4ca563 commit 68543d4
Showing 1 changed file with 63 additions and 1 deletion.
64 changes: 63 additions & 1 deletion pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,6 @@ async def metrics_reporter(queue):
await asyncio.sleep(10)



def submit_beaker_job(args):
from beaker import (
Beaker,
Expand Down Expand Up @@ -702,6 +701,64 @@ def submit_beaker_job(args):

print(f"Experiment URL: https://beaker.org/ex/{experiment_data.id}")

def print_stats(args):
import concurrent.futures
from tqdm import tqdm

# Get total work items and completed items
index_file_s3_path = os.path.join(args.workspace, "pdf_index_list.csv.zstd")
output_glob = os.path.join(args.workspace, "dolma_documents", "*.jsonl")

work_queue_lines = download_zstd_csv(workspace_s3, index_file_s3_path)
done_work_items = expand_s3_glob(workspace_s3, output_glob)

total_items = len([line for line in work_queue_lines if line.strip()])
completed_items = len(done_work_items)

print(f"\nWork Items Status:")
print(f"Total work items: {total_items:,}")
print(f"Completed items: {completed_items:,}")
print(f"Remaining items: {total_items - completed_items:,}")

def process_output_file(s3_path):
try:
data = get_s3_bytes(workspace_s3, s3_path)
doc_count = 0
total_input_tokens = 0
total_output_tokens = 0

for line in data.decode('utf-8').splitlines():
if line.strip():
doc = json.loads(line)
doc_count += 1
total_input_tokens += doc["metadata"]["total-input-tokens"]
total_output_tokens += doc["metadata"]["total-output-tokens"]

return doc_count, total_input_tokens, total_output_tokens
except Exception as e:
logger.warning(f"Error processing {s3_path}: {e}")
return 0, 0, 0

print("\nProcessing output files...")
docs_total = 0
input_tokens_total = 0
output_tokens_total = 0

with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(process_output_file, item): item for item in done_work_items}

for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)):
doc_count, input_tokens, output_tokens = future.result()
docs_total += doc_count
input_tokens_total += input_tokens
output_tokens_total += output_tokens

print(f"\nResults:")
print(f"Total documents processed: {docs_total:,}")
print(f"Total input tokens: {input_tokens_total:,}")
print(f"Total output tokens: {output_tokens_total:,}")
print(f"Average input tokens per doc: {input_tokens_total/max(1,docs_total):,.1f}")
print(f"Average output tokens per doc: {output_tokens_total/max(1,docs_total):,.1f}")

async def main():
parser = argparse.ArgumentParser(description='Manager for running millions of PDFs through a batch inference pipeline')
Expand All @@ -711,6 +768,7 @@ async def main():
parser.add_argument('--pdf_profile', help='S3 configuration profile for accessing the raw pdf documents', default=None)
parser.add_argument('--pages_per_group', type=int, default=500, help='Aiming for this many pdf pages per work item group')
parser.add_argument('--workers', type=int, default=8, help='Number of workers to run at a time')
parser.add_argument('--stats', action='store_true', help='Instead of running any job, reports some statistics about the current workspace')

# Model parameters
parser.add_argument('--model', help='List of paths where you can find the model to convert this pdf. You can specify several different paths here, and the script will try to use the one which is fastest to access',
Expand Down Expand Up @@ -756,6 +814,10 @@ async def main():
logger.info("Got --pdfs argument, going to add to the work queue")
await populate_pdf_work_queue(args)

if args.stats:
print_stats(args)
return

if args.beaker:
submit_beaker_job(args)
return
Expand Down

0 comments on commit 68543d4

Please sign in to comment.