Skip to content

Commit

Permalink
pipeline script
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Oct 10, 2024
1 parent 49b5b23 commit 312ee8d
Showing 1 changed file with 35 additions and 45 deletions.
80 changes: 35 additions & 45 deletions pdelfin/assemblepipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
import sys
import hashlib
import boto3
import duckdb
Expand Down Expand Up @@ -42,16 +41,13 @@ def build_index(s3_path):
# List all .json and .jsonl files under s3_path with their ETags
files = list_s3_files(s3, bucket, prefix)

# Filter out files that have already been processed
files_to_process = filter_processed_files(db_path, files)

if not files_to_process:
print("All files have been processed. Nothing to do.")
if not files:
print("No .json or .jsonl files found in the specified S3 path.")
return

# Use ThreadPoolExecutor to process files with tqdm progress bar
with ThreadPoolExecutor(max_workers=8) as executor:
futures = [executor.submit(process_file, s3, bucket, key, etag, db_path) for key, etag in files_to_process.items()]
with ThreadPoolExecutor() as executor:
futures = [executor.submit(process_file, s3, bucket, key, etag, db_path) for key, etag in files.items()]
for _ in tqdm(as_completed(futures), total=len(futures), desc="Processing files"):
pass

Expand All @@ -75,50 +71,44 @@ def list_s3_files(s3, bucket, prefix):
files[key] = obj['ETag'].strip('"')
return files

def filter_processed_files(db_path, files):
conn = duckdb.connect(database=db_path)
cursor = conn.cursor()

# Retrieve processed files
cursor.execute("SELECT s3_path, etag FROM processed_files")
processed = dict(cursor.fetchall())

# Filter out files that are already processed with the same ETag
files_to_process = {}
for key, etag in files.items():
if key not in processed or processed[key] != etag:
files_to_process[key] = etag

conn.close()
return files_to_process

def process_file(s3, bucket, key, etag, db_path):
s3_path = f's3://{bucket}/{key}'
try:
# Get the object
obj = s3.get_object(Bucket=bucket, Key=key)
s3_path = f's3://{bucket}/{key}'

# Read the content as bytes
content = obj['Body'].read()

# Connect to duckdb
conn = duckdb.connect(database=db_path)
cursor = conn.cursor()

# Process the file as JSONL
process_jsonl_content(content, s3_path, cursor)

# Update the processed_files table
cursor.execute("""
INSERT INTO processed_files (s3_path, etag)
VALUES (?, ?)
ON CONFLICT (s3_path) DO UPDATE SET etag=excluded.etag
""", (key, etag))

conn.commit()
conn.close()
# Check if file has already been processed with the same ETag
cursor.execute("SELECT etag FROM processed_files WHERE s3_path = ?", (key,))
result = cursor.fetchone()

if result and result[0] == etag:
# File has already been processed with the same ETag
# Optionally, log that the file was skipped
# print(f"Skipping already processed file: {s3_path}")
conn.close()
return
else:
# Get the object
obj = s3.get_object(Bucket=bucket, Key=key)

# Read the content as bytes
content = obj['Body'].read()

# Process the file as JSONL
process_jsonl_content(content, s3_path, cursor)

# Update the processed_files table
cursor.execute("""
INSERT INTO processed_files (s3_path, etag)
VALUES (?, ?)
ON CONFLICT (s3_path) DO UPDATE SET etag=excluded.etag
""", (key, etag))

conn.commit()
conn.close()
except Exception as e:
print(f"Error processing file {key}: {e}")
print(f"Error processing file {s3_path}: {e}")

def process_jsonl_content(content, s3_path, cursor):
start_index = 0
Expand Down

0 comments on commit 312ee8d

Please sign in to comment.