diff --git a/app/code/community/Algolia/Algoliasearch/Model/Queue.php b/app/code/community/Algolia/Algoliasearch/Model/Queue.php index 497d4981..d41a6368 100644 --- a/app/code/community/Algolia/Algoliasearch/Model/Queue.php +++ b/app/code/community/Algolia/Algoliasearch/Model/Queue.php @@ -150,8 +150,8 @@ public function run($maxJobs) $model->{$method}(new Varien_Object($job['data'])); // Delete one by one - $where = $this->db->quoteInto('job_id IN (?)', $job['merged_ids']); - $this->db->delete($this->table, $where); + $this->db->delete($this->table, array('job_id IN (?)' => $job['merged_ids'])); + $this->logRecord['processed_jobs'] += count($job['merged_ids']); } catch (\Exception $e) { @@ -230,11 +230,6 @@ private function getJobs($maxJobs, $pid) break; } - // If $jobs is empty, it's the first run - if (empty($jobs)) { - $firstJobId = $rawJobs[0]['job_id']; - } - $rawJobs = $this->prepareJobs($rawJobs); $rawJobs = array_merge($jobs, $rawJobs); $rawJobs = $this->mergeJobs($rawJobs); @@ -265,14 +260,7 @@ private function getJobs($maxJobs, $pid) } } - if (isset($firstJobId)) { - $lastJobId = $this->maxValueInArray($jobs, 'job_id'); - - // Reserve all new jobs since last run - $this->db->query("UPDATE {$this->db->quoteIdentifier($this->table, true)} - SET pid = " . $pid . ", locked_at = '" . date('Y-m-d H:i:s') . "' - WHERE job_id >= " . $firstJobId . " AND job_id <= " . $lastJobId); - } + $this->lockJobs($jobs); $this->db->commit(); } catch (\Exception $e) { @@ -442,19 +430,32 @@ private function arrayMultisort() return array_pop($args); } - private function maxValueInArray($array, $keyToSearch) + /** + * @param array $jobs + */ + private function lockJobs($jobs) { - $currentMax = null; + $jobsIds = $this->getJobsIdsFromMergedJobs($jobs); - foreach ($array as $arr) { - foreach ($arr as $key => $value) { - if ($key == $keyToSearch && ($value >= $currentMax)) { - $currentMax = $value; - } - } + if ($jobsIds !== array()) { + $pid = getmypid(); + $this->db->update($this->table, array('pid' => $pid), array('job_id IN (?)' => $jobsIds)); + } + } + + /** + * @param array $mergedJobs + * + * @return string[] + */ + private function getJobsIdsFromMergedJobs($mergedJobs) + { + $jobsIds = array(); + foreach ($mergedJobs as $job) { + $jobsIds = array_merge($jobsIds, $job['merged_ids']); } - return $currentMax; + return $jobsIds; } private function clearOldLogRecords()