Skip to content

Commit

Permalink
[5.x] Make autoscaling rate configurable (#874)
Browse files Browse the repository at this point in the history
* wip

* wip

* wip

* wip

* cooldown tests

* wip

* wip

* wip

* auto scaler tests

* wip

* wip
  • Loading branch information
hivokas authored Aug 20, 2020
1 parent 1cd5d78 commit e807042
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 40 deletions.
10 changes: 6 additions & 4 deletions config/horizon.php
Original file line number Diff line number Diff line change
Expand Up @@ -168,22 +168,24 @@
'supervisor-1' => [
'connection' => 'redis',
'queue' => ['default'],
'balance' => 'simple',
'processes' => 1,
'balance' => 'auto',
'maxProcesses' => 1,
'tries' => 1,
],
],

'environments' => [
'production' => [
'supervisor-1' => [
'processes' => 10,
'maxProcesses' => 10,
'balanceCooldown' => 1,
'autoScaleMaxShift' => 5,
],
],

'local' => [
'supervisor-1' => [
'processes' => 3,
'maxProcesses' => 3,
],
],
],
Expand Down
49 changes: 30 additions & 19 deletions src/AutoScaler.php
Original file line number Diff line number Diff line change
Expand Up @@ -126,25 +126,36 @@ protected function scalePool(Supervisor $supervisor, $pool, $workers)
{
$supervisor->pruneTerminatingProcesses();

$poolProcesses = $pool->totalProcessCount();

if (ceil($workers) > $poolProcesses &&
$this->wouldNotExceedMaxProcesses($supervisor)) {
$pool->scale($poolProcesses + 1);
} elseif (ceil($workers) < $poolProcesses &&
$poolProcesses > $supervisor->options->minProcesses) {
$pool->scale($poolProcesses - 1);
$totalProcessCount = $pool->totalProcessCount();

$desiredProcessCount = ceil($workers);

if ($desiredProcessCount > $totalProcessCount) {
$maxUpShift = min(
$supervisor->options->maxProcesses - $supervisor->totalProcessCount(),
$supervisor->options->autoScaleMaxShift
);

$pool->scale(
min(
$totalProcessCount + $maxUpShift,
$supervisor->options->maxProcesses,
$desiredProcessCount
)
);
} elseif ($desiredProcessCount < $totalProcessCount) {
$maxDownShift = min(
$supervisor->totalProcessCount() - $supervisor->options->minProcesses,
$supervisor->options->autoScaleMaxShift
);

$pool->scale(
max(
$totalProcessCount - $maxDownShift,
$supervisor->options->minProcesses,
$desiredProcessCount
)
);
}
}

/**
* Determine if adding another process would exceed max process limit.
*
* @param \Laravel\Horizon\Supervisor $supervisor
* @return bool
*/
protected function wouldNotExceedMaxProcesses(Supervisor $supervisor)
{
return ($supervisor->totalProcessCount() + 1) <= $supervisor->options->maxProcesses;
}
}
8 changes: 6 additions & 2 deletions src/Console/SupervisorCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ class SupervisorCommand extends Command
{--queue= : The names of the queues to work}
{--sleep=3 : Number of seconds to sleep when no job is available}
{--timeout=60 : The number of seconds a child process can run}
{--tries=0 : Number of times to attempt a job before logging it failed}';
{--tries=0 : Number of times to attempt a job before logging it failed}
{--balance-cooldown=3 : The number of seconds to wait in between auto-scaling attempts}
{--auto-scale-max-shift=1 : The maximum number of processes to increase or decrease per one scaling}';

/**
* The console command description.
Expand Down Expand Up @@ -112,7 +114,9 @@ protected function supervisorOptions()
$this->option('sleep'),
$this->option('tries'),
$this->option('force'),
$this->option('nice')
$this->option('nice'),
$this->option('balance-cooldown'),
$this->option('auto-scale-max-shift')
);
}

Expand Down
2 changes: 1 addition & 1 deletion src/ProcessPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public function __construct(SupervisorOptions $options, Closure $output = null)
*/
public function scale($processes)
{
$processes = max(0, $processes);
$processes = max(0, (int) $processes);

if ($processes === count($this->processes)) {
return;
Expand Down
11 changes: 2 additions & 9 deletions src/Supervisor.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,6 @@ class Supervisor implements Pausable, Restartable, Terminable
*/
public $lastAutoScaled;

/**
* The number of seconds to wait in between auto-scaling attempts.
*
* @var int
*/
public $autoScaleCooldown = 3;

/**
* The output handler.
*
Expand Down Expand Up @@ -332,9 +325,9 @@ protected function processPendingCommands()
protected function autoScale()
{
$this->lastAutoScaled = $this->lastAutoScaled ?:
CarbonImmutable::now()->subSeconds($this->autoScaleCooldown + 1);
CarbonImmutable::now()->subSeconds($this->options->balanceCooldown + 1);

if (CarbonImmutable::now()->subSeconds($this->autoScaleCooldown)->gte($this->lastAutoScaled)) {
if (CarbonImmutable::now()->subSeconds($this->options->balanceCooldown)->gte($this->lastAutoScaled)) {
$this->lastAutoScaled = CarbonImmutable::now();

app(AutoScaler::class)->scale($this);
Expand Down
5 changes: 3 additions & 2 deletions src/SupervisorCommandString.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ public static function fromOptions(SupervisorOptions $options)
*/
public static function toOptionsString(SupervisorOptions $options)
{
return sprintf('%s --balance=%s --max-processes=%s --min-processes=%s --nice=%s',
return sprintf('%s --balance=%s --max-processes=%s --min-processes=%s --nice=%s --balance-cooldown=%s --auto-scale-max-shift=%s',
QueueCommandString::toOptionsString($options), $options->balance,
$options->maxProcesses, $options->minProcesses, $options->nice
$options->maxProcesses, $options->minProcesses, $options->nice,
$options->balanceCooldown, $options->autoScaleMaxShift
);
}

Expand Down
23 changes: 22 additions & 1 deletion src/SupervisorOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ class SupervisorOptions extends WorkerOptions
*/
public $directory;

/**
* The number of seconds to wait in between auto-scaling attempts.
*
* @var int
*/
public $balanceCooldown = 3;

/**
* The maximum number of processes to increase or decrease per one scaling.
*
* @var int
*/
public $autoScaleMaxShift = 1;

/**
* Create a new worker options instance.
*
Expand All @@ -78,10 +92,13 @@ class SupervisorOptions extends WorkerOptions
* @param int $maxTries
* @param bool $force
* @param int $nice
* @param int $balanceCooldown
* @param int $autoScaleMaxShift
*/
public function __construct($name, $connection, $queue = null, $balance = 'off',
$delay = 0, $maxProcesses = 1, $minProcesses = 1, $memory = 128,
$timeout = 60, $sleep = 3, $maxTries = 0, $force = false, $nice = 0)
$timeout = 60, $sleep = 3, $maxTries = 0, $force = false, $nice = 0,
$balanceCooldown = 3, $autoScaleMaxShift = 1)
{
$this->name = $name;
$this->nice = $nice;
Expand All @@ -90,6 +107,8 @@ public function __construct($name, $connection, $queue = null, $balance = 'off',
$this->maxProcesses = $maxProcesses;
$this->minProcesses = $minProcesses;
$this->queue = $queue ?: config('queue.connections.'.$connection.'.queue');
$this->balanceCooldown = $balanceCooldown;
$this->autoScaleMaxShift = $autoScaleMaxShift;

parent::__construct($delay, $memory, $timeout, $sleep, $maxTries, $force);
}
Expand Down Expand Up @@ -178,6 +197,8 @@ public function toArray()
'name' => $this->name,
'sleep' => $this->sleep,
'timeout' => $this->timeout,
'balanceCooldown' => $this->balanceCooldown,
'autoScaleMaxShift' => $this->autoScaleMaxShift,
];
}

Expand Down
2 changes: 1 addition & 1 deletion tests/Feature/AddSupervisorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public function test_add_supervisor_command_creates_new_supervisor_on_master_pro
$this->assertCount(1, $master->supervisors);

$this->assertEquals(
'exec '.$phpBinary.' artisan horizon:supervisor my-supervisor redis --delay=0 --memory=128 --queue="default" --sleep=3 --timeout=60 --tries=0 --balance=off --max-processes=1 --min-processes=1 --nice=0',
'exec '.$phpBinary.' artisan horizon:supervisor my-supervisor redis --delay=0 --memory=128 --queue="default" --sleep=3 --timeout=60 --tries=0 --balance=off --max-processes=1 --min-processes=1 --nice=0 --balance-cooldown=3 --auto-scale-max-shift=1',
$master->supervisors->first()->process->getCommandLine()
);
}
Expand Down
39 changes: 38 additions & 1 deletion tests/Feature/AutoScalerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function test_scaler_attempts_to_get_closer_to_proper_balance_on_each_ite
$this->assertEquals(13, $supervisor->processPools['first']->totalProcessCount());
$this->assertEquals(7, $supervisor->processPools['second']->totalProcessCount());

// Asset scaler stays at target values...
// Assert scaler stays at target values...
$scaler->scale($supervisor);

$this->assertEquals(13, $supervisor->processPools['first']->totalProcessCount());
Expand Down Expand Up @@ -181,4 +181,41 @@ protected function with_scaling_scenario($maxProcesses, array $pools, array $ext

return [$scaler, $supervisor];
}

public function test_scaler_considers_max_shift_and_attempts_to_get_closer_to_proper_balance_on_each_iteration()
{
[$scaler, $supervisor] = $this->with_scaling_scenario(150, [
'first' => ['current' => 75, 'size' => 600, 'runtime' => 75],
'second' => ['current' => 75, 'size' => 300, 'runtime' => 75],
]);

$supervisor->options->autoScaleMaxShift = 10;

$scaler->scale($supervisor);

$this->assertEquals(85, $supervisor->processPools['first']->totalProcessCount());
$this->assertEquals(65, $supervisor->processPools['second']->totalProcessCount());

$scaler->scale($supervisor);

$this->assertEquals(95, $supervisor->processPools['first']->totalProcessCount());
$this->assertEquals(55, $supervisor->processPools['second']->totalProcessCount());

$scaler->scale($supervisor);

$this->assertEquals(100, $supervisor->processPools['first']->totalProcessCount());
$this->assertEquals(50, $supervisor->processPools['second']->totalProcessCount());

// Assert scaler stays at target values...
$scaler->scale($supervisor);

$this->assertEquals(100, $supervisor->processPools['first']->totalProcessCount());
$this->assertEquals(50, $supervisor->processPools['second']->totalProcessCount());

// Assert scaler still stays at target values...
$scaler->scale($supervisor);

$this->assertEquals(100, $supervisor->processPools['first']->totalProcessCount());
$this->assertEquals(50, $supervisor->processPools['second']->totalProcessCount());
}
}
34 changes: 34 additions & 0 deletions tests/Feature/SupervisorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,40 @@ public function test_auto_scaler_is_called_on_loop_when_auto_scaling()
$supervisor->loop();
}

public function test_auto_scaler_is_not_called_on_loop_during_cooldown()
{
$options = $this->supervisorOptions();
$options->autoScale = true;
$this->supervisor = $supervisor = new Supervisor($options);

// Start the supervisor...
$supervisor->scale(1);

$time = CarbonImmutable::create();

$this->assertNull($supervisor->lastAutoScaled);

$supervisor->lastAutoScaled = null;
CarbonImmutable::setTestNow($time);
$supervisor->loop();
$this->assertTrue($supervisor->lastAutoScaled->eq($time));

$supervisor->lastAutoScaled = $time;
CarbonImmutable::setTestNow($time->addSeconds($supervisor->options->balanceCooldown - 0.01));
$supervisor->loop();
$this->assertTrue($supervisor->lastAutoScaled->eq($time));

$supervisor->lastAutoScaled = $time;
CarbonImmutable::setTestNow($time->addSeconds($supervisor->options->balanceCooldown));
$supervisor->loop();
$this->assertTrue($supervisor->lastAutoScaled->eq($time->addSeconds($supervisor->options->balanceCooldown)));

$supervisor->lastAutoScaled = $time;
CarbonImmutable::setTestNow($time->addSeconds($supervisor->options->balanceCooldown + 0.01));
$supervisor->loop();
$this->assertTrue($supervisor->lastAutoScaled->eq($time->addSeconds($supervisor->options->balanceCooldown)));
}

public function test_supervisor_with_duplicate_name_cant_be_started()
{
$this->expectException(Exception::class);
Expand Down

0 comments on commit e807042

Please sign in to comment.