Skip to content

Commit

Permalink
Merge pull request #61 from rainforestapp/RF-30293-revert-to-4.0.0.al…
Browse files Browse the repository at this point in the history
…pha13

[RF-30293] Revert to 4.0.0.alpha13
  • Loading branch information
CChanHY authored Sep 6, 2023
2 parents dd72041 + 08fa342 commit 00f7487
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 85 deletions.
12 changes: 4 additions & 8 deletions lib/queue_classic_plus.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,14 @@ module QueueClassicPlus

def self.migrate(c = QC::default_conn_adapter.connection)
conn = QC::ConnAdapter.new(connection: c)
conn.execute("ALTER TABLE queue_classic_jobs ADD COLUMN IF NOT EXISTS last_error TEXT")
conn.execute("ALTER TABLE queue_classic_jobs ADD COLUMN IF NOT EXISTS remaining_retries INTEGER")
conn.execute("ALTER TABLE queue_classic_jobs ADD COLUMN IF NOT EXISTS lock BOOLEAN NOT NULL DEFAULT FALSE")
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS index_queue_classic_jobs_enqueue_lock on queue_classic_jobs(q_name, method, args) WHERE lock IS TRUE")
conn.execute("ALTER TABLE queue_classic_jobs ADD COLUMN last_error TEXT")
conn.execute("ALTER TABLE queue_classic_jobs ADD COLUMN remaining_retries INTEGER")
end

def self.demigrate(c = QC::default_conn_adapter.connection)
conn = QC::ConnAdapter.new(connection: c)
conn.execute("ALTER TABLE queue_classic_jobs DROP COLUMN IF EXISTS last_error")
conn.execute("ALTER TABLE queue_classic_jobs DROP COLUMN IF EXISTS remaining_retries")
conn.execute("DROP INDEX IF EXISTS index_queue_classic_jobs_enqueue_lock")
conn.execute("ALTER TABLE queue_classic_jobs DROP COLUMN IF EXISTS lock")
conn.execute("ALTER TABLE queue_classic_jobs DROP COLUMN last_error")
conn.execute("ALTER TABLE queue_classic_jobs DROP COLUMN remaining_retries")
end

def self.exception_handler
Expand Down
26 changes: 25 additions & 1 deletion lib/queue_classic_plus/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,32 @@ def self.logger
QueueClassicPlus.logger
end

def self.can_enqueue?(method, *args)
if locked?
max_lock_time = ENV.fetch("QUEUE_CLASSIC_MAX_LOCK_TIME", 10 * 60).to_i

q = "SELECT COUNT(1) AS count
FROM
(
SELECT 1
FROM queue_classic_jobs
WHERE q_name = $1 AND method = $2 AND args::text = $3::text
AND (locked_at IS NULL OR locked_at > current_timestamp - interval '#{max_lock_time} seconds')
LIMIT 1
)
AS x"

result = QC.default_conn_adapter.execute(q, @queue, method, JSON.dump(serialized(args)))
result['count'].to_i == 0
else
true
end
end

def self.enqueue(method, *args)
queue.enqueue(method, *serialized(args), lock: locked?)
if can_enqueue?(method, *args)
queue.enqueue(method, *serialized(args))
end
end

def self.enqueue_perform(*args)
Expand Down
20 changes: 0 additions & 20 deletions lib/queue_classic_plus/queue_classic/queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -50,25 +50,5 @@ def lock
end
end

def enqueue(method, *args, lock: false)
QC.log_yield(:measure => 'queue.enqueue') do
insert_sql = <<-EOF
INSERT INTO #{QC.table_name} (q_name, method, args, lock)
VALUES ($1, $2, $3, $4)
ON CONFLICT (q_name, method, args) WHERE lock IS TRUE DO NOTHING
RETURNING id
EOF
begin
retries ||= 0
conn_adapter.execute(insert_sql, name, method, JSON.dump(args), lock)
rescue PG::Error => error
if (retries += 1) < 2
retry
else
raise
end
end
end
end
end
end
2 changes: 1 addition & 1 deletion lib/queue_classic_plus/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module QueueClassicPlus
VERSION = '4.0.0.alpha17'.freeze
VERSION = '4.0.0.alpha18'.freeze
end
64 changes: 9 additions & 55 deletions spec/base_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,6 @@

describe QueueClassicPlus::Base do
context "A child of QueueClassicPlus::Base" do
subject do
Class.new(QueueClassicPlus::Base) do
@queue = :test
end
end

it "allows multiple enqueues" do
threads = []
10.times do
threads << Thread.new do
subject.do
end
end
threads.each(&:join)

expect(subject).to have_queue_size_of(10)
end

context "that is locked" do
subject do
Class.new(QueueClassicPlus::Base) do
Expand All @@ -30,28 +12,9 @@
end

it "does not allow multiple enqueues" do
threads = []
10.times do
threads << Thread.new do
subject.do
expect(subject).to have_queue_size_of(1)
end
end
threads.each(&:join)
end

it "allows enqueueing same job with different arguments" do
threads = []
(1..3).each do |arg|
10.times do
threads << Thread.new do
subject.do(arg)
end
end
end
threads.each(&:join)

expect(subject).to have_queue_size_of(3)
subject.do
subject.do
expect(subject).to have_queue_size_of(1)
end

it "checks for an existing job using the same serializing as job enqueuing" do
Expand All @@ -65,22 +28,13 @@
subject.do(date)
expect(subject).to have_queue_size_of(1)
end
end

context "when in a transaction" do
subject do
Class.new(QueueClassicPlus::Base) do
@queue = :test
lock!
end
end

it "does not create another transaction when enqueueing" do
conn = QC.default_conn_adapter.connection
expect(conn).to receive(:transaction).exactly(1).times.and_call_original
conn.transaction do
subject.do
end
it "does allow multiple enqueues if something got locked for too long" do
subject.do
one_day_ago = Time.now - 60*60*24
execute "UPDATE queue_classic_jobs SET locked_at = '#{one_day_ago}' WHERE q_name = 'test'"
subject.do
expect(subject).to have_queue_size_of(2)
end
end

Expand Down

0 comments on commit 00f7487

Please sign in to comment.