Skip to content

Commit

Permalink
Work queue sharing thing
Browse files Browse the repository at this point in the history
  • Loading branch information
jakep-allenai committed Nov 7, 2024
1 parent 75d4a0e commit 3d6be3c
Showing 1 changed file with 20 additions and 15 deletions.
35 changes: 20 additions & 15 deletions pdelfin/beakerpipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import redis
import random
import boto3
import atexit # New import for atexit
import atexit

from pdelfin.s3_utils import expand_s3_glob

Expand All @@ -24,7 +24,7 @@ def populate_queue_if_empty(queue, s3_glob_path):
print("Queue populated with initial work items.")

def process(item):
# Simulate processing time between 1 and 10 seconds
# Simulate processing time between 1 and 3 seconds
print(f"Processing item: {item}")
time.sleep(random.randint(1, 3))
print(f"Completed processing item: {item}")
Expand All @@ -44,14 +44,15 @@ def main():
base_sentinel_port = 26379

redis_port = base_redis_port + replica_number
sentinel_port = base_sentinel_port + replica_number
# Set sentinel_port to be the same on all nodes
sentinel_port = base_sentinel_port

if replica_number == 0:
leader_ip = args.leader_ip if args.leader_ip else '127.0.0.1'
leader_port = args.leader_port
else:
if not args.leader_ip:
print('Error: --leader_ip is required for replica nodes (replica_number >= 1)')
print('Error: --leader-ip is required for replica nodes (replica_number >= 1)')
sys.exit(1)
leader_ip = args.leader_ip
leader_port = args.leader_port
Expand Down Expand Up @@ -103,8 +104,12 @@ def handle_signal(signum, frame):
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)

# Connect to Redis
r = redis.StrictRedis(host=leader_ip, port=redis_port, decode_responses=True)
time.sleep(2)

# Use Sentinel to connect to the master
from redis.sentinel import Sentinel
sentinel = Sentinel([('127.0.0.1', sentinel_port)], socket_timeout=0.1)
r = sentinel.master_for(master_name, socket_timeout=0.1, decode_responses=True)

# Populate the work queue if this is the leader (replica 0)
if replica_number == 0:
Expand All @@ -113,15 +118,15 @@ def handle_signal(signum, frame):
try:
while True:
# Try to get an item from the queue with a 1-minute timeout for processing
# work_item = r.brpoplpush("work_queue", "processing_queue", 60)
# if work_item:
# try:
# process(work_item)
# # Remove from the processing queue if processed successfully
# r.lrem("processing_queue", 1, work_item)
# except Exception as e:
# print(f"Error processing {work_item}: {e}")
# # If an error occurs, let it be requeued after timeout
work_item = r.brpoplpush("work_queue", "processing_queue", 60)
if work_item:
try:
process(work_item)
# Remove from the processing queue if processed successfully
r.lrem("processing_queue", 1, work_item)
except Exception as e:
print(f"Error processing {work_item}: {e}")
# If an error occurs, let it be requeued after timeout

queue_length = r.llen("work_queue")
print(f"Total work items in queue: {queue_length}")
Expand Down

0 comments on commit 3d6be3c

Please sign in to comment.