Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: addressing methods #13

Merged
merged 9 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 145 additions & 104 deletions app/http.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,50 +86,57 @@
App::setResource('balancer', function (Algorithm $algorithm, Request $request) {
global $state;
$runtimeId = $request->getHeader('x-opr-runtime-id', '');
$method = $request->getHeader('x-opr-addressing-method', 'anycast-efficient');

$group = new Group();

if ($method === 'anycast-fast') {
$algorithm = new Random();
}

// Cold-started-only options
$balancer1 = new Balancer($algorithm);

// Only online executors
$balancer1->addFilter(fn ($option) => $option->getState('status', 'offline') === 'online');

// Only low host-cpu usage
$balancer1->addFilter(function ($option) {
/**
* @var array<string,mixed> $state
*/
$state = \json_decode($option->getState('state', '{}'), true);
return ($state['usage'] ?? 100) < 80;
});

// Only low runtime-cpu usage
if (!empty($runtimeId)) {
$balancer1->addFilter(function ($option) use ($runtimeId) {
if ($method === 'anycast-efficient') {
// Only low host-cpu usage
$balancer1->addFilter(function ($option) {
/**
* @var array<string,mixed> $state
*/
$state = \json_decode($option->getState('state', '{}'), true);
return ($state['usage'] ?? 100) < 80;
});

/**
* @var array<string,mixed> $runtimes
*/
$runtimes = $state['runtimes'];

/**
* @var array<string,mixed> $runtime
*/
$runtime = $runtimes[$runtimeId] ?? [];
// Only low runtime-cpu usage
if (!empty($runtimeId)) {
$balancer1->addFilter(function ($option) use ($runtimeId) {
/**
* @var array<string,mixed> $state
*/
$state = \json_decode($option->getState('state', '{}'), true);

/**
* @var array<string,mixed> $runtimes
*/
$runtimes = $state['runtimes'];

/**
* @var array<string,mixed> $runtime
*/
$runtime = $runtimes[$runtimeId] ?? [];

return ($runtime['usage'] ?? 100) < 80;
});
}

return ($runtime['usage'] ?? 100) < 80;
});
// Any options
$balancer2 = new Balancer($algorithm);
$balancer2->addFilter(fn ($option) => $option->getState('status', 'offline') === 'online');
}

// Any options
$balancer2 = new Balancer($algorithm);
$balancer2->addFilter(fn ($option) => $option->getState('status', 'offline') === 'online');

foreach ($state as $stateItem) {
if (App::isDevelopment()) {
Console::log("Adding balancing option: " . \json_encode($stateItem));
Expand All @@ -139,12 +146,17 @@
* @var array<string,mixed> $stateItem
*/
$balancer1->addOption(new Option($stateItem));
$balancer2->addOption(new Option($stateItem));

if(isset($balancer2)) {
$balancer2->addOption(new Option($stateItem));
}
}

$group
->add($balancer1)
->add($balancer2);
$group->add($balancer1);

if(isset($balancer2)) {
$group->add($balancer2);
}

return $group;
}, ['algorithm', 'request']);
Expand Down Expand Up @@ -242,109 +254,138 @@ function logError(Throwable $error, string $action, ?Logger $logger, Utopia\Rout
->inject('request')
->inject('response')
->action(function (Group $balancer, Request $request, Response $response) {
global $state;
$option = $balancer->run();
$method = $request->getHeader('x-opr-addressing-method', 'anycast-efficient');

if (!isset($option)) {
throw new Exception('No online executor found', 404);
}
$proxyRequest = function (string $hostname) use ($request) {
if (App::isDevelopment()) {
Console::info("Executing on " . $hostname);
}

/**
* @var string $hostname
*/
$hostname = $option->getState('hostname') ?? '';
// Optimistic update. Mark runtime up instantly to prevent race conditions
// Next health check with confirm it started well, and update usage stats
$runtimeId = $request->getHeader('x-opr-runtime-id', '');
if (!empty($runtimeId)) {
global $state;
$record = $state->get($hostname);

if (App::isDevelopment()) {
Console::info("Executing on " . $hostname);
}
$stateItem = \json_decode($record['state'] ?? '{}', true);

// Optimistic update. Mark runtime up instantly to prevent race conditions
// Next health check with confirm it started well, and update usage stats
$runtimeId = $request->getHeader('x-opr-runtime-id', '');
if (!empty($runtimeId)) {
$record = $state->get($hostname);
if (!isset($stateItem['runtimes'])) {
$stateItem['runtimes'] = [];
}

$stateItem = \json_decode($record['state'] ?? '{}', true);
if (!isset($stateItem['runtimes'][$runtimeId])) {
$stateItem['runtimes'][$runtimeId] = [];
}

if (!isset($stateItem['runtimes'])) {
$stateItem['runtimes'] = [];
$stateItem['runtimes'][$runtimeId]['status'] = 'pass';
$stateItem['runtimes'][$runtimeId]['usage'] = 0;

$record['state'] = \json_encode($stateItem);

$state->set($hostname, $record);
}

if (!isset($stateItem['runtimes'][$runtimeId])) {
$stateItem['runtimes'][$runtimeId] = [];
$headers = \array_merge($request->getHeaders(), [
'authorization' => 'Bearer ' . App::getEnv('OPR_PROXY_EXECUTOR_SECRET', '')
]);

// Header used for testing
if (App::isDevelopment()) {
$headers = \array_merge($headers, [
'x-opr-executor-hostname' => $hostname
]);
}

$stateItem['runtimes'][$runtimeId]['status'] = 'pass';
$stateItem['runtimes'][$runtimeId]['usage'] = 0;
$body = $request->getRawPayload();

$record['state'] = \json_encode($stateItem);
$ch = \curl_init();

$state->set($hostname, $record);
}
\curl_setopt($ch, CURLOPT_URL, $hostname . $request->getURI());
\curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $request->getMethod());
\curl_setopt($ch, CURLOPT_POSTFIELDS, $body);
\curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
\curl_setopt($ch, CURLOPT_HEADER, true);
\curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 10);

$headers = \array_merge($request->getHeaders(), [
'authorization' => 'Bearer ' . App::getEnv('OPR_PROXY_EXECUTOR_SECRET', '')
]);
$curlHeaders = [];
foreach ($headers as $header => $value) {
$curlHeaders[] = "{$header}: {$value}";
}

// Header used for testing
if (App::isDevelopment()) {
$headers = \array_merge($headers, [
'x-opr-executor-hostname' => $hostname
]);
}
\curl_setopt($ch, CURLOPT_HTTPHEADER, $curlHeaders);

$body = $request->getRawPayload();
$executorResponse = \curl_exec($ch);

$ch = \curl_init();
$statusCode = \curl_getinfo($ch, CURLINFO_HTTP_CODE);

\curl_setopt($ch, CURLOPT_URL, $hostname . $request->getURI());
\curl_setopt($ch, CURLOPT_CUSTOMREQUEST, $request->getMethod());
\curl_setopt($ch, CURLOPT_POSTFIELDS, $body);
\curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
\curl_setopt($ch, CURLOPT_HEADER, true);
\curl_setopt($ch, CURLOPT_CONNECTTIMEOUT, 10);
$error = \curl_error($ch);

$curlHeaders = [];
foreach ($headers as $header => $value) {
$curlHeaders[] = "{$header}: {$value}";
}
$errNo = \curl_errno($ch);

\curl_setopt($ch, CURLOPT_HTTPHEADER, $curlHeaders);
$header_size = curl_getinfo($ch, CURLINFO_HEADER_SIZE);
$headers = substr(\strval($executorResponse), 0, $header_size);
$body = substr(\strval($executorResponse), $header_size);

\curl_close($ch);

if ($errNo !== 0) {
throw new Exception('Unexpected curl error between proxy and executor: ' . $error);
}

$executorResponse = \curl_exec($ch);
$headersArr = [];
if (!empty($headers)) {
$headers = preg_split("/\r\n|\n|\r/", $headers);
if ($headers) {
foreach ($headers as $header) {
if (\str_contains($header, ':')) {
[ $key, $value ] = \explode(':', $header, 2);
$headersArr[$key] = $value;
}
}
}
}

$statusCode = \curl_getinfo($ch, CURLINFO_HTTP_CODE);
return [
'statusCode' => $statusCode,
'body' => $body,
'headers' => $headersArr
];
};

$error = \curl_error($ch);
if ($method === 'broadcast') {
foreach ($balancer->getOptions() as $option) {
/**
* @var string $hostname
*/
$hostname = $option->getState('hostname') ?? '';

$errNo = \curl_errno($ch);
$proxyRequest($hostname);
}

$header_size = curl_getinfo($ch, CURLINFO_HEADER_SIZE);
$headers = substr(\strval($executorResponse), 0, $header_size);
$body = substr(\strval($executorResponse), $header_size);
$response->noContent();
} else {
$option = $balancer->run();

\curl_close($ch);
if (!isset($option)) {
throw new Exception('No online executor found', 404);
}

if ($errNo !== 0) {
throw new Exception('Unexpected curl error between proxy and executor: ' . $error);
}
/**
* @var string $hostname
*/
$hostname = $option->getState('hostname') ?? '';

if (!empty($headers)) {
$headers = preg_split("/\r\n|\n|\r/", $headers);
if ($headers) {
foreach ($headers as $header) {
if (\str_contains($header, ':')) {
[ $key, $value ] = \explode(':', $header, 2);
$result = $proxyRequest($hostname);

$response->addHeader($key, $value);
}
}
foreach ($result['headers'] as $key => $value) {
$response->addHeader($key, $value);
}
}

$response
->setStatusCode($statusCode)
->send($body);
$response
->setStatusCode($result['statusCode'])
->send($result['body']);
}
});

App::error()
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
"utopia-php/registry": "0.6.*",
"utopia-php/cli": "0.13.*",
"utopia-php/logger": "0.3.*",
"utopia-php/balancer": "0.3.*"
"utopia-php/balancer": "dev-feat-get-options as 0.3.99"
},
"require-dev": {
"swoole/ide-helper": "4.8.5",
Expand Down
Loading