Skip to content

Commit

Permalink
Pipeline stability fixes hopefully and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Oct 29, 2024
1 parent ce2e4ba commit 232c445
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ gnarly_previews/*
s2orc_previews/*
s2orc_previews_3200/*
/*.html
debug.log
birrpipeline-debug.log


# build artifacts
Expand Down
44 changes: 43 additions & 1 deletion pdelfin/birrpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import posixpath
import threading
import logging
import psutil
import boto3.session
import urllib3.exceptions

Expand All @@ -32,7 +33,26 @@

# Initialize logger
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG) # Set to DEBUG for the file handler to capture everything

# Console handler for INFO level and above
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)

# File handler for DEBUG level and above with line-by-line flushing
class FlushFileHandler(logging.FileHandler):
def emit(self, record):
super().emit(record)
self.flush() # Explicitly flush after every log entry

file_handler = FlushFileHandler('birrpipeline-debug.log', mode='a')
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))

# Add handlers to the logger
logger.handlers.clear()
logger.addHandler(console_handler)
logger.addHandler(file_handler)

# Global s3 client for the whole script, feel free to adjust params if you need it
workspace_s3 = boto3.client('s3')
Expand Down Expand Up @@ -525,13 +545,16 @@ def build_pdf_queries(s3_workspace: str, pdf: DatabaseManager.PDFRecord, cur_rou

# Shortcut out of downloading the actual PDF
if set(page.page_num for page in existing_pages if page.is_usable()) == set(range(1, pdf.num_pages + 1)):
db.close()
return []

try:
with tempfile.NamedTemporaryFile("wb+", suffix=".pdf") as tf:
tf.write(get_s3_bytes(pdf_s3, pdf.s3_path))
tf.flush()

logger.debug(f"build_pdf_queries for {pdf.s3_path} with {pdf.num_pages} pages")

for target_page_num in range(1, pdf.num_pages + 1):
# Is there an existing page that has no error
if any(page.is_usable() and page.page_num == target_page_num for page in existing_pages):
Expand All @@ -547,6 +570,7 @@ def build_pdf_queries(s3_workspace: str, pdf: DatabaseManager.PDFRecord, cur_rou
rotated_page_data = _get_page_data([page for page in existing_pages if page.page_num == target_page_num and page.error == "rotation_invalid"])
rotation_corrections = set(page_data.rotation_correction for page_data in rotated_page_data)
for correction in rotation_corrections:
logger.debug(f"Adding {correction}-degree rotation query for {pdf.s3_path}-{target_page_num}")
new_queries.append({**build_page_query(tf.name, pdf.s3_path, target_page_num, target_longest_image_dim, target_anchor_text_len, image_rotation=correction), "round": cur_round})

# TODO: Try to provide a smaller prompt hint if that was the error
Expand All @@ -555,6 +579,7 @@ def build_pdf_queries(s3_workspace: str, pdf: DatabaseManager.PDFRecord, cur_rou
except Exception as ex:
logger.warning(f"Warning, could not get batch inferences lines for {pdf.s3_path} due to {ex}")

db.close()
return new_queries

def build_dolma_doc(s3_workspace: str, pdf: DatabaseManager.PDFRecord) -> Optional[dict]:
Expand All @@ -569,6 +594,7 @@ def build_dolma_doc(s3_workspace: str, pdf: DatabaseManager.PDFRecord) -> Option
usable_pages = [page for page in existing_pages if page.is_usable() and page.page_num == target_page_num]

if len(usable_pages) == 0:
db.close()
return None

for target_page_num in range(1, pdf.num_pages + 1):
Expand Down Expand Up @@ -608,11 +634,13 @@ def build_dolma_doc(s3_workspace: str, pdf: DatabaseManager.PDFRecord) -> Option
}
}

db.close()
return dolma_doc

def mark_pdfs_done(s3_workspace: str, dolma_docs: list[dict]):
db = DatabaseManager(s3_workspace, skip_init=True)
db.update_pdf_statuses({doc["metadata"]["Source-File"]: "completed" for doc in dolma_docs})
db.close()

def get_current_round(s3_workspace: str) -> int:
path = s3_workspace[5:]
Expand Down Expand Up @@ -761,6 +789,20 @@ def get_current_round(s3_workspace: str) -> int:
return_when=concurrent.futures.FIRST_COMPLETED,
)



# Get current Python process memory usage
# process = psutil.Process()
# memory_usage_python = process.memory_info().rss # Resident Set Size (RSS) in bytes
# logger.debug(f"Current Python memory usage: {memory_usage_python / (1024 ** 2):.2f} MB")

# # Get total memory usage on the host
# total_memory = psutil.virtual_memory().total
# used_memory = psutil.virtual_memory().used
# logger.debug(f"Total memory on host: {total_memory / (1024 ** 3):.2f} GB")
# logger.debug(f"Used memory on host: {used_memory / (1024 ** 3):.2f} GB")


for future in done:
pdf = pending_futures.pop(future)
inference_lines = future.result()
Expand Down

0 comments on commit 232c445

Please sign in to comment.