Skip to content

Commit

Permalink
Some errors dealt with
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 11, 2024
1 parent 24a9d23 commit 732300a
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,16 @@
from pdelfin.prompts.anchor import get_anchor_text
from pdelfin.check import check_poppler_version

# Basic logging setup for now
# Initialize logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logging.basicConfig(level=logging.INFO)
logger.setLevel(logging.DEBUG)

file_handler = logging.FileHandler('beakerpipeline-debug.log', mode='a')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))

# Add handlers to the logger
logger.addHandler(file_handler)

# Quiet logs from pypdf
logging.getLogger("pypdf").setLevel(logging.ERROR)
Expand Down Expand Up @@ -63,7 +69,9 @@ async def build_page_query(local_pdf_path: str, page: int, target_longest_image_

# Allow the page rendering to process in the background while we get the anchor text (which blocks the main thread)
image_base64 = asyncio.to_thread(render_pdf_to_base64png, local_pdf_path, page, target_longest_image_dim=target_longest_image_dim)
anchor_text = asyncio.to_thread(get_anchor_text, local_pdf_path, page, pdf_engine="pdfreport", target_length=target_anchor_text_len)

# GET ANCHOR TEXT IS NOT THREAD SAFE!! Ahhhh..... don't try to do it
anchor_text = get_anchor_text(local_pdf_path, page, pdf_engine="pdfreport", target_length=target_anchor_text_len)

image_base64 = await image_base64
if image_rotation != 0:
Expand All @@ -78,8 +86,6 @@ async def build_page_query(local_pdf_path: str, page: int, target_longest_image_
# Encode the rotated image back to base64
image_base64 = base64.b64encode(buffered.getvalue()).decode('utf-8')

anchor_text = await anchor_text

return {
"model": "Qwen/Qwen2-VL-7B-Instruct",
"messages": [
Expand Down Expand Up @@ -248,7 +254,7 @@ async def process_pdf(args, pdf_s3_path: str):
# List to hold the tasks for processing each page
page_tasks = []

async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3600), connector=TCPConnector(limit=100)) as session:
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3600), connector=aiohttp.TCPConnector(limit=50)) as session:
for page_num in range(1, num_pages + 1):
# Create a task for each page
task = asyncio.create_task(process_page(args, session, pdf_s3_path, tf.name, page_num))
Expand Down Expand Up @@ -397,7 +403,7 @@ async def sglang_server_ready():
else:
logger.info(f"Attempt {attempt}: Unexpected status code {response.status}")
except Exception as e:
logger.warning(f"Attempt {attempt}: Exception occurred: {e}")
logger.warning(f"Attempt {attempt}: {e}")

await asyncio.sleep(delay_sec)

Expand All @@ -413,7 +419,7 @@ async def main():
parser.add_argument('--workspace_profile', help='S3 configuration profile for accessing the workspace', default=None)
parser.add_argument('--pdf_profile', help='S3 configuration profile for accessing the raw pdf documents', default=None)
parser.add_argument('--group_size', type=int, default=20, help='Number of pdfs that will be part of each work item in the work queue.')
parser.add_argument('--workers', type=int, default=2, help='Number of workers to run at a time')
parser.add_argument('--workers', type=int, default=1, help='Number of workers to run at a time')

parser.add_argument('--model', help='List of paths where you can find the model to convert this pdf. You can specify several different paths here, and the script will try to use the one which is fastest to access',
default=["weka://oe-data-default/jakep/Qwen_Qwen2-VL-7B-Instruct-e4ecf8-01JAH8GMWHTJ376S2N7ETXRXH4/best_bf16/",
Expand Down

0 comments on commit 732300a

Please sign in to comment.