From 68543d40cef494af0c9246b08653cde726600477 Mon Sep 17 00:00:00 2001 From: Jake Poznanski Date: Mon, 18 Nov 2024 07:57:39 -0800 Subject: [PATCH] Adding stats --- pdelfin/beakerpipeline.py | 64 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/pdelfin/beakerpipeline.py b/pdelfin/beakerpipeline.py index 1e88c48..edc7bf2 100644 --- a/pdelfin/beakerpipeline.py +++ b/pdelfin/beakerpipeline.py @@ -625,7 +625,6 @@ async def metrics_reporter(queue): await asyncio.sleep(10) - def submit_beaker_job(args): from beaker import ( Beaker, @@ -702,6 +701,64 @@ def submit_beaker_job(args): print(f"Experiment URL: https://beaker.org/ex/{experiment_data.id}") +def print_stats(args): + import concurrent.futures + from tqdm import tqdm + + # Get total work items and completed items + index_file_s3_path = os.path.join(args.workspace, "pdf_index_list.csv.zstd") + output_glob = os.path.join(args.workspace, "dolma_documents", "*.jsonl") + + work_queue_lines = download_zstd_csv(workspace_s3, index_file_s3_path) + done_work_items = expand_s3_glob(workspace_s3, output_glob) + + total_items = len([line for line in work_queue_lines if line.strip()]) + completed_items = len(done_work_items) + + print(f"\nWork Items Status:") + print(f"Total work items: {total_items:,}") + print(f"Completed items: {completed_items:,}") + print(f"Remaining items: {total_items - completed_items:,}") + + def process_output_file(s3_path): + try: + data = get_s3_bytes(workspace_s3, s3_path) + doc_count = 0 + total_input_tokens = 0 + total_output_tokens = 0 + + for line in data.decode('utf-8').splitlines(): + if line.strip(): + doc = json.loads(line) + doc_count += 1 + total_input_tokens += doc["metadata"]["total-input-tokens"] + total_output_tokens += doc["metadata"]["total-output-tokens"] + + return doc_count, total_input_tokens, total_output_tokens + except Exception as e: + logger.warning(f"Error processing {s3_path}: {e}") + return 0, 0, 0 + + print("\nProcessing output files...") + docs_total = 0 + input_tokens_total = 0 + output_tokens_total = 0 + + with concurrent.futures.ThreadPoolExecutor() as executor: + futures = {executor.submit(process_output_file, item): item for item in done_work_items} + + for future in tqdm(concurrent.futures.as_completed(futures), total=len(futures)): + doc_count, input_tokens, output_tokens = future.result() + docs_total += doc_count + input_tokens_total += input_tokens + output_tokens_total += output_tokens + + print(f"\nResults:") + print(f"Total documents processed: {docs_total:,}") + print(f"Total input tokens: {input_tokens_total:,}") + print(f"Total output tokens: {output_tokens_total:,}") + print(f"Average input tokens per doc: {input_tokens_total/max(1,docs_total):,.1f}") + print(f"Average output tokens per doc: {output_tokens_total/max(1,docs_total):,.1f}") async def main(): parser = argparse.ArgumentParser(description='Manager for running millions of PDFs through a batch inference pipeline') @@ -711,6 +768,7 @@ async def main(): parser.add_argument('--pdf_profile', help='S3 configuration profile for accessing the raw pdf documents', default=None) parser.add_argument('--pages_per_group', type=int, default=500, help='Aiming for this many pdf pages per work item group') parser.add_argument('--workers', type=int, default=8, help='Number of workers to run at a time') + parser.add_argument('--stats', action='store_true', help='Instead of running any job, reports some statistics about the current workspace') # Model parameters parser.add_argument('--model', help='List of paths where you can find the model to convert this pdf. You can specify several different paths here, and the script will try to use the one which is fastest to access', @@ -756,6 +814,10 @@ async def main(): logger.info("Got --pdfs argument, going to add to the work queue") await populate_pdf_work_queue(args) + if args.stats: + print_stats(args) + return + if args.beaker: submit_beaker_job(args) return