Skip to content


Experimental beaker pipeline self organizing redis idea
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 7, 2024
1 parent a14febc commit 75d4a0e
Showing 1 changed file with 133 additions and 0 deletions.
133 changes: 133 additions & 0 deletions pdelfin/
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import argparse
import subprocess
import signal
import sys
import os
import time
import tempfile
import redis
import random
import boto3
import atexit # New import for atexit

from pdelfin.s3_utils import expand_s3_glob

workspace_s3 = boto3.client('s3')
pdf_s3 = boto3.client('s3')

def populate_queue_if_empty(queue, s3_glob_path):
# Check if the queue is empty, and if so, populate it with work items
if queue.llen("work_queue") == 0:
paths = expand_s3_glob(pdf_s3, s3_glob_path)
for path in paths:
queue.rpush("work_queue", path)
print("Queue populated with initial work items.")

def process(item):
# Simulate processing time between 1 and 10 seconds
print(f"Processing item: {item}")
time.sleep(random.randint(1, 3))
print(f"Completed processing item: {item}")

def main():
parser = argparse.ArgumentParser(description='Set up Redis Sentinel-based worker queue.')
parser.add_argument('--leader-ip', help='IP address of the initial leader node')
parser.add_argument('--leader-port', type=int, default=6379, help='Port of the initial leader node')
parser.add_argument('--replica', type=int, required=True, help='Replica number (0 to N-1)')
parser.add_argument('--add-pdfs', required=True, help='S3 glob path for work items')

args = parser.parse_args()

replica_number = args.replica

base_redis_port = 6379
base_sentinel_port = 26379

redis_port = base_redis_port + replica_number
sentinel_port = base_sentinel_port + replica_number

if replica_number == 0:
leader_ip = args.leader_ip if args.leader_ip else ''
leader_port = args.leader_port
if not args.leader_ip:
print('Error: --leader_ip is required for replica nodes (replica_number >= 1)')
leader_ip = args.leader_ip
leader_port = args.leader_port

temp_dir = tempfile.mkdtemp()
redis_conf_path = os.path.join(temp_dir, 'redis.conf')
sentinel_conf_path = os.path.join(temp_dir, 'sentinel.conf')

with open(redis_conf_path, 'w') as f:
f.write(f'port {redis_port}\n')
f.write(f'dbfilename dump-{replica_number}.rdb\n')
f.write(f'appendfilename "appendonly-{replica_number}.aof"\n')
f.write(f'logfile "redis-{replica_number}.log"\n')
f.write(f'dir {temp_dir}\n')
if replica_number == 0:
f.write(f'replicaof {leader_ip} {leader_port}\n')

master_name = 'mymaster'
quorum = 2

with open(sentinel_conf_path, 'w') as f:
f.write(f'port {sentinel_port}\n')
f.write(f'dir {temp_dir}\n')
f.write(f'sentinel monitor {master_name} {leader_ip} {leader_port} {quorum}\n')
f.write(f'sentinel down-after-milliseconds {master_name} 5000\n')
f.write(f'sentinel failover-timeout {master_name} 10000\n')
f.write(f'sentinel parallel-syncs {master_name} 1\n')

redis_process = subprocess.Popen(['redis-server', redis_conf_path])
sentinel_process = subprocess.Popen(['redis-sentinel', sentinel_conf_path])

# Register atexit function to guarantee process termination
def terminate_processes():
redis_process.wait() # Ensures subprocess is cleaned up
print("Child processes terminated.")


# Also handle signal-based termination
def handle_signal(signum, frame):

signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)

# Connect to Redis
r = redis.StrictRedis(host=leader_ip, port=redis_port, decode_responses=True)

# Populate the work queue if this is the leader (replica 0)
if replica_number == 0:
populate_queue_if_empty(r, args.add_pdfs)

while True:
# Try to get an item from the queue with a 1-minute timeout for processing
# work_item = r.brpoplpush("work_queue", "processing_queue", 60)
# if work_item:
# try:
# process(work_item)
# # Remove from the processing queue if processed successfully
# r.lrem("processing_queue", 1, work_item)
# except Exception as e:
# print(f"Error processing {work_item}: {e}")
# # If an error occurs, let it be requeued after timeout

queue_length = r.llen("work_queue")
print(f"Total work items in queue: {queue_length}")
except KeyboardInterrupt:
handle_signal(None, None)

if __name__ == '__main__':

0 comments on commit 75d4a0e

Please sign in to comment.