diff --git a/src/Illuminate/Queue/CallQueuedHandler.php b/src/Illuminate/Queue/CallQueuedHandler.php index 4bd5aa30feb..46a2c1e08f8 100644 --- a/src/Illuminate/Queue/CallQueuedHandler.php +++ b/src/Illuminate/Queue/CallQueuedHandler.php @@ -66,10 +66,6 @@ public function call(Job $job, array $data) return $this->handleModelNotFound($job, $e); } - if ($command instanceof ShouldBeUniqueUntilProcessing) { - $this->ensureUniqueJobLockIsReleased($command); - } - $this->dispatchThroughMiddleware($job, $command); if (! $job->isReleased() && ! $command instanceof ShouldBeUniqueUntilProcessing) { @@ -123,6 +119,10 @@ protected function dispatchThroughMiddleware(Job $job, $command) return (new Pipeline($this->container))->send($command) ->through(array_merge(method_exists($command, 'middleware') ? $command->middleware() : [], $command->middleware ?? [])) ->then(function ($command) use ($job) { + if ($command instanceof ShouldBeUniqueUntilProcessing) { + $this->ensureUniqueJobLockIsReleased($command); + } + return $this->dispatcher->dispatchNow( $command, $this->resolveHandler($job, $command) ); diff --git a/tests/Integration/Queue/UniqueUntilProcessingJobTest.php b/tests/Integration/Queue/UniqueUntilProcessingJobTest.php new file mode 100644 index 00000000000..79ce03e6569 --- /dev/null +++ b/tests/Integration/Queue/UniqueUntilProcessingJobTest.php @@ -0,0 +1,89 @@ +set('queue.default', 'database'); + $app['config']->set('cache.default', 'database'); + $this->driver = 'database'; + } + + public function testShouldBeUniqueUntilProcessingReleasesLockWhenJobIsReleasedByAMiddleware() + { + // Job that does not release and gets processed + UniqueTestJobThatDoesNotRelease::dispatch(); + $lockKey = DB::table('cache_locks')->orderBy('id')->first()->key; + $this->assertNotNull($lockKey); + $this->runQueueWorkerCommand(['--once' => true]); + $this->assertFalse(UniqueTestJobThatDoesNotRelease::$released); + $lockKey = DB::table('cache_locks')->first()->key ?? null; + $this->assertNull($lockKey); + $this->assertDatabaseCount('jobs', 0); + + // Job that releases and does not get processed + UniqueUntilProcessingJobThatReleases::dispatch(); + $lockKey = DB::table('cache_locks')->first()->key; + $this->assertNotNull($lockKey); + $this->runQueueWorkerCommand(['--once' => true]); + $this->assertFalse(UniqueUntilProcessingJobThatReleases::$handled); + $this->assertTrue(UniqueUntilProcessingJobThatReleases::$released); + $lockKey = DB::table('cache_locks')->orderBy('id')->first()->key ?? null; + $this->assertNotNull($lockKey); + + UniqueUntilProcessingJobThatReleases::dispatch(); + $this->assertDatabaseCount('jobs', 1); + } +} + +class UniqueTestJobThatDoesNotRelease implements ShouldQueue, ShouldBeUniqueUntilProcessing +{ + use InteractsWithQueue, Queueable, Dispatchable; + + public static $handled = false; + public static $released = false; + + public function __construct() + { + static::$handled = false; + static::$released = false; + } + + public function handle() + { + static::$handled = true; + } +} + +class UniqueUntilProcessingJobThatReleases extends UniqueTestJobThatDoesNotRelease +{ + public function middleware() + { + return [ + function ($job) { + static::$released = true; + + return $job->release(30); + }, + ]; + } + + public function uniqueId() + { + return 100; + } +}