Skip to content

Commit

Permalink
Adding image merging to pdf report/hint/anchor
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Oct 8, 2024
1 parent 57d9a21 commit c8a4d14
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 19 deletions.
112 changes: 93 additions & 19 deletions pdelfin/prompts/anchor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

# coherency score best of these three
import subprocess
import sys
import json
import math
import ftfy
from dataclasses import dataclass
from typing import Literal, List
Expand Down Expand Up @@ -103,7 +102,6 @@ class BoundingBox:
def from_rectangle(rect: RectangleObject) -> "BoundingBox":
return BoundingBox(rect[0], rect[1], rect[2], rect[3])


@dataclass
class TextElement(Element):
text: str
Expand All @@ -118,18 +116,20 @@ class ImageElement(Element):
@dataclass
class PageReport:
mediabox: BoundingBox
elements: List[Element]
text_elements: List[TextElement]
image_elements: List[ImageElement]

def _pdf_report(local_pdf_path: str, page: int) -> PageReport:
reader = PdfReader(local_pdf_path)
page = reader.pages[page - 1]
resources = page.get("/Resources", {})
xobjects = resources.get("/XObject", {})
elements = []
text_elements, image_elements = [], []


def visitor_body(text, cm, tm, font_dict, font_size):
txt2user = mult(tm, cm)
elements.append(TextElement(text, txt2user[4], txt2user[5]))
text_elements.append(TextElement(text, txt2user[4], txt2user[5]))

def visitor_op(op, args, cm, tm):
if op == b"Do":
Expand All @@ -142,30 +142,104 @@ def visitor_op(op, args, cm, tm):
height = xobject.get("/Height")
x0, y0 = _transform_point(0, 0, cm)
x1, y1 = _transform_point(1, 1, cm)
elements.append(ImageElement(xobject_name, BoundingBox(min(x0, x1), min(y0, y1), max(x0, x1), max(y0, y1))))
image_elements.append(ImageElement(xobject_name, BoundingBox(min(x0, x1), min(y0, y1), max(x0, x1), max(y0, y1))))

page.extract_text(visitor_text=visitor_body, visitor_operand_before=visitor_op)

return PageReport(
mediabox=BoundingBox.from_rectangle(page.mediabox),
elements=elements,
text_elements=text_elements,
image_elements=image_elements,
)


def _merge_image_elements(images: List[ImageElement], tolerance: float=0.5) -> List[ImageElement]:
n = len(images)
parent = list(range(n)) # Initialize Union-Find parent pointers

def find(i):
# Find with path compression
root = i
while parent[root] != root:
root = parent[root]
while parent[i] != i:
parent_i = parent[i]
parent[i] = root
i = parent_i
return root

def union(i, j):
# Union by attaching root of one tree to another
root_i = find(i)
root_j = find(j)
if root_i != root_j:
parent[root_i] = root_j

def bboxes_overlap(b1: BoundingBox, b2: BoundingBox, tolerance: float) -> bool:
# Compute horizontal and vertical distances between boxes
h_dist = max(0, max(b1.x0, b2.x0) - min(b1.x1, b2.x1))
v_dist = max(0, max(b1.y0, b2.y0) - min(b1.y1, b2.y1))
# Check if distances are within tolerance
return h_dist <= tolerance and v_dist <= tolerance

# Union overlapping images
for i in range(n):
for j in range(i + 1, n):
if bboxes_overlap(images[i].bbox, images[j].bbox, tolerance):
union(i, j)

# Group images by their root parent
groups = {}
for i in range(n):
root = find(i)
groups.setdefault(root, []).append(i)

# Merge images in the same group
merged_images = []
for indices in groups.values():
# Initialize merged bounding box
merged_bbox = images[indices[0]].bbox
merged_name = images[indices[0]].name

for idx in indices[1:]:
bbox = images[idx].bbox
# Expand merged_bbox to include the current bbox
merged_bbox = BoundingBox(
x0=min(merged_bbox.x0, bbox.x0),
y0=min(merged_bbox.y0, bbox.y0),
x1=max(merged_bbox.x1, bbox.x1),
y1=max(merged_bbox.y1, bbox.y1),
)
# Optionally, update the name
merged_name += f"+{images[idx].name}"

merged_images.append(ImageElement(name=merged_name, bbox=merged_bbox))

# Return the merged images along with other elements
return merged_images


def _linearize_pdf_report(report: PageReport) -> str:
result = ""

result += f"Page dimensions: {report.mediabox.x1:.1f}x{report.mediabox.y1:.1f}\n"

for index, element in enumerate(report.elements):
if isinstance(element, ImageElement):
result += f"[Image {element.bbox.x0:.0f}x{element.bbox.y0:.0f} to {element.bbox.x1:.0f}x{element.bbox.y1:.0f}]"
if isinstance(element, TextElement):
if len(element.text.strip()) == 0:
continue

# Need to use ftfy to fix text, because occasionally there are invalid surrogate pairs and other UTF issues that cause
# pyarrow to fail to load the json later
result += f"[{element.x:.0f}x{element.y:.0f}]{ftfy.fix_text(element.text)}"

#images = report.image_elements
images = _merge_image_elements(report.image_elements)

for index, element in enumerate(images):
result += f"[Image {element.bbox.x0:.0f}x{element.bbox.y0:.0f} to {element.bbox.x1:.0f}x{element.bbox.y1:.0f}]"

for index, element in enumerate(report.text_elements):
if len(element.text.strip()) == 0:
continue

element_text = ftfy.fix_text(element.text)
# Replace square brackets with something else not to throw off the syntax
element_text = element_text.replace("[", "\[").replace("]", "\[")

# Need to use ftfy to fix text, because occasionally there are invalid surrogate pairs and other UTF issues that cause
# pyarrow to fail to load the json later
result += f"[{element.x:.0f}x{element.y:.0f}]{element_text}"

return result
Binary file added tests/gnarly_pdfs/large_prompt_hint1.pdf
Binary file not shown.
Binary file added tests/gnarly_pdfs/large_prompt_hint2.pdf
Binary file not shown.
Binary file added tests/gnarly_pdfs/newspaper.pdf
Binary file not shown.
25 changes: 25 additions & 0 deletions tests/test_anchor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,31 @@ def testBadUTFSurrogatePairsGeneration(self):
buffer = io.BytesIO(jsondata.encode('utf-8'))
paj.read_json(buffer, read_options=paj.ReadOptions(use_threads=False, block_size=len(jsondata)))

def testLargePromptHint1(self):
local_pdf_path = os.path.join(os.path.dirname(__file__), "gnarly_pdfs", "large_prompt_hint1.pdf")

anchor_text = get_anchor_text(local_pdf_path, 4, pdf_engine="pdfreport")

print(anchor_text)
print(len(anchor_text))

def testLargePromptHint2(self):
local_pdf_path = os.path.join(os.path.dirname(__file__), "gnarly_pdfs", "large_prompt_hint2.pdf")

anchor_text = get_anchor_text(local_pdf_path, 2, pdf_engine="pdfreport")

print(anchor_text)
print(len(anchor_text))

def testNewsPaperPromptHint(self):
local_pdf_path = os.path.join(os.path.dirname(__file__), "gnarly_pdfs", "newspaper.pdf")

anchor_text = get_anchor_text(local_pdf_path, 1, pdf_engine="pdfreport")

print(anchor_text)
print(len(anchor_text))




class BuildSilverTest(unittest.TestCase):
Expand Down

0 comments on commit c8a4d14

Please sign in to comment.