Skip to content

Commit

Permalink
feat: Implement BulkheadIsolationPolicy
Browse files Browse the repository at this point in the history
Closes #199.
  • Loading branch information
luczsoma committed Mar 29, 2020
1 parent f4003ce commit 0a6fd61
Show file tree
Hide file tree
Showing 17 changed files with 807 additions and 181 deletions.
92 changes: 88 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ _Note: This package is built as an ES6 package. You will not be able to use `req

After installation, you can import policies and other helper classes into your project, then wrap your code into one or more policies.

Every policy extends the abstract `Policy` class, which has an `execute` method. Your code wrapped into the policy gets executed when you invoke `execute`. The `execute` method is asynchronous, so it returns a `Promise` resolving with the return value of the executed method.
Every policy extends the abstract `Policy` class, which has an `execute` method. Your code wrapped into a policy gets executed when you invoke `execute`. The `execute` method is asynchronous, so it returns a `Promise` resolving with the return value of the executed method (or rejecting with an exception thrown by the method).

The wrapped method can be synchronous or asynchronous, it will be awaited in either case:

Expand Down Expand Up @@ -123,9 +123,10 @@ Resily offers **reactive** and **proactive** policies:

#### Proactive policies summary

| Policy | What does it claim? | How does it work? |
| ----------------------------------- | ----------------------------------------------------------------- | ------------------------------------------------------------------------- |
| [**TimeoutPolicy**](#timeoutpolicy) | After some time, it is unlikely that the call will be successful. | Ensures the caller does not have to wait more than the specified timeout. |
| Policy | What does it claim? | How does it work? |
| ------------------------------------------------------- | ----------------------------------------------------------------- | ------------------------------------------------------------------------- |
| [**TimeoutPolicy**](#timeoutpolicy) | After some time, it is unlikely that the call will be successful. | Ensures the caller does not have to wait more than the specified timeout. |
| [**BulkheadIsolationPolicy**](#bulkheadisolationpolicy) | Too many concurrent calls can overload a resource. | Limits the number of concurrently executed actions as specified. |

### Reactive policies

Expand Down Expand Up @@ -804,6 +805,89 @@ try {
// onTimeoutRan is false
```

#### BulkheadIsolationPolicy

`BulkheadIsolationPolicy` claims that too many concurrent calls can overload a resource. It limits the number of concurrently executed actions as specified.

Method calls executed via the policy are placed into a size-limited bulkhead compartment, limiting the maximum number of concurrent executions.

If the bulkhead compartment is full — meaning the maximum number of concurrent executions is reached —, additional calls can be queued up, ready to be executed whenever a place falls vacant in the bulkhead compartment (i.e. an execution finishes). Queuing up these calls ensures that the resource protected by the policy is always at maximum utilization, while limiting the number of concurrent actions ensures that the resource is not overloaded. The queue is a simple FIFO buffer.

When the policy's `execute` method is invoked with a method to be executed, the policy's operation can be described as follows:

- `(1)` If there is an execution slot available in the bulkhead compartment, execute the method immediately.

- `(2)` Else if there is still space in the queue, enqueue the execution intent of the method — without actually executing the method —, then wait asynchronously until the method can be executed.

An execution intent gets dequeued — and its corresponding method gets executed — each time an execution slot becomes available in the bulkhead compartment.

- `(3)` Else throw a `BulkheadCompartmentRejectedException`.

From the caller's point of view, this is all transparent: the promise returned by the `execute` method is

- either eventually resolved with the return value of the wrapped method (cases `(1)` and `(2)`),
- or eventually rejected with an exception thrown by the wrapped method (cases `(1)` and `(2)`),
- or rejected with a `BulkheadCompartmentRejectedException` (case `(3)`).

Configure the size of the bulkhead compartment:

```typescript
import { BulkheadIsolationPolicy } from '@diplomatiq/resily';

// the wrapped method is supposed to return a string
const policy = new BulkheadIsolationPolicy<string>();

// allow maximum 3 concurrent executions
policy.maxConcurrency(3);

// this overwrites the previous value
policy.maxConcurrency(5);
```

Configure the size of the queue:

```typescript
import { BulkheadIsolationPolicy } from '@diplomatiq/resily';

// the wrapped method is supposed to return a string
const policy = new BulkheadIsolationPolicy<string>();

// allow maximum 3 queued actions
policy.maxQueuedActions(3);

// this overwrites the previous value
policy.maxQueuedActions(5);
```

Get usage information about the bulkhead compartment:

```typescript
import { BulkheadIsolationPolicy } from '@diplomatiq/resily';

// the wrapped method is supposed to return a string
const policy = new BulkheadIsolationPolicy<string>();

// the number of available (free) execution slots in the bulkhead compartment
policy.getAvailableSlotsCount();

// the number of available (free) spaces in the queue
policy.getAvailableQueuedActionsCount();
```

### Modifying a policy's configuration

All policies' configuration parameters are set via setter methods. This could imply that all policies can be safely reconfigured whenever needed, but providing setter methods instead of constructor parameters is merely because this way the policies are more convenient to use. If you need to reconfigure a policy, you can do that, but not while it is still executing one or more methods: reconfiguring while executing could lead to unexpected side-effects. Therefore, if you tries to reconfigure a policy while executing, a `PolicyModificationNotAllowedException` is thrown.

To safely reconfigure a policy, check whether it is executing or not:

```typescript
const policy =// any policy

if (!policy.isExecuting()) {
// you can reconfigure the policy
}
```

## Development

See [CONTRIBUTING.md](https://github.com/Diplomatiq/resily/blob/develop/CONTRIBUTING.md) for details.
Expand Down
5 changes: 5 additions & 0 deletions src/main.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
export { RandomGenerator } from './interfaces/randomGenerator';
export { Policy } from './policies/policy';
export { BulkheadCompartmentRejectedException } from './policies/proactive/bulkheadIsolationPolicy/bulkheadCompartmentRejectedException';
export { BulkheadIsolationPolicy } from './policies/proactive/bulkheadIsolationPolicy/bulkheadIsolationPolicy';
export { ProactivePolicy } from './policies/proactive/proactivePolicy';
export { ExecutionException } from './policies/proactive/timeoutPolicy/executionException';
export { OnTimeoutFn } from './policies/proactive/timeoutPolicy/onTimeoutFn';
Expand All @@ -22,5 +24,8 @@ export { BackoffStrategy } from './policies/reactive/retryPolicy/backoffStrategy
export { BackoffStrategyFactory } from './policies/reactive/retryPolicy/backoffStrategyFactory';
export { OnRetryFn } from './policies/reactive/retryPolicy/onRetryFn';
export { RetryPolicy } from './policies/reactive/retryPolicy/retryPolicy';
export { ExecutedFn } from './types/executedFn';
export { OnFinallyFn } from './types/onFinallyFn';
export { PolicyModificationNotAllowedException } from './types/policyModificationNotAllowedException';
export { Predicate } from './types/predicate';
export { SuccessDeferred } from './utils/successDeferred';
26 changes: 25 additions & 1 deletion src/policies/policy.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
import { ExecutedFn } from '../types/executedFn';
import { PolicyModificationNotAllowedException } from '../types/policyModificationNotAllowedException';

export abstract class Policy<ResultType> {
public abstract async execute(fn: () => ResultType | Promise<ResultType>): Promise<ResultType>;
private executing = 0;

public async execute(fn: ExecutedFn<ResultType>): Promise<ResultType> {
try {
this.executing++;
return await this.policyExecutorImpl(fn);
} finally {
this.executing--;
}
}

public isExecuting(): boolean {
return this.executing > 0;
}

protected throwForPolicyModificationIfExecuting(): void {
if (this.isExecuting()) {
throw new PolicyModificationNotAllowedException();
}
}

protected abstract async policyExecutorImpl(fn: ExecutedFn<ResultType>): Promise<ResultType>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export class BulkheadCompartmentRejectedException extends Error {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { SuccessDeferred } from '../../../utils/successDeferred';
import { ProactivePolicy } from '../proactivePolicy';
import { BulkheadCompartmentRejectedException } from './bulkheadCompartmentRejectedException';

export class BulkheadIsolationPolicy<ResultType> extends ProactivePolicy<ResultType> {
private bulkheadCompartmentSize = Number.POSITIVE_INFINITY;
private queueSize = 0;

private bulkheadCompartmentUsage = 0;
private readonly queue: Array<SuccessDeferred<void>> = [];

public maxConcurrency(bulkheadCompartmentSize: number): void {
if (!Number.isInteger(bulkheadCompartmentSize)) {
throw new Error('bulkheadCompartmentSize must be integer');
}

if (bulkheadCompartmentSize <= 0) {
throw new Error('bulkheadCompartmentSize must be greater than 0');
}

if (!Number.isSafeInteger(bulkheadCompartmentSize)) {
throw new Error('bulkheadCompartmentSize must be less than or equal to 2^53 - 1');
}

this.throwForPolicyModificationIfExecuting();

this.bulkheadCompartmentSize = bulkheadCompartmentSize;
}

public maxQueuedActions(queueSize: number): void {
if (!Number.isInteger(queueSize)) {
throw new Error('queueSize must be integer');
}

if (queueSize < 0) {
throw new Error('queueSize must be greater than or equal to 0');
}

if (!Number.isSafeInteger(queueSize)) {
throw new Error('queueSize must be less than or equal to 2^53 - 1');
}

this.throwForPolicyModificationIfExecuting();

this.queueSize = queueSize;
}

public getAvailableSlotsCount(): number {
return this.bulkheadCompartmentSize - this.bulkheadCompartmentUsage;
}

public getAvailableQueuedActionsCount(): number {
return this.queueSize - this.queue.length;
}

protected async policyExecutorImpl(fn: () => ResultType | Promise<ResultType>): Promise<ResultType> {
if (this.bulkheadCompartmentUsage >= this.bulkheadCompartmentSize) {
if (this.queue.length >= this.queueSize) {
throw new BulkheadCompartmentRejectedException();
}

const queuingDeferred = new SuccessDeferred<void>();
this.queue.push(queuingDeferred);
await queuingDeferred.promise;
}

try {
this.bulkheadCompartmentUsage++;
return await fn();
} finally {
this.bulkheadCompartmentUsage--;
this.queue.shift()?.resolve();
}
}
}
15 changes: 3 additions & 12 deletions src/policies/proactive/timeoutPolicy/timeoutPolicy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ export class TimeoutPolicy<ResultType> extends ProactivePolicy<ResultType> {
private timeoutMs: number | undefined;
private readonly onTimeoutFns: OnTimeoutFn[] = [];

private executing = 0;

public timeoutAfter(timeoutMs: number): void {
if (!Number.isInteger(timeoutMs)) {
throw new Error('timeoutMs must be integer');
Expand All @@ -22,24 +20,18 @@ export class TimeoutPolicy<ResultType> extends ProactivePolicy<ResultType> {
throw new Error('timeoutMs must be less than or equal to 2^53 - 1');
}

if (this.executing > 0) {
throw new Error('cannot modify policy during execution');
}
this.throwForPolicyModificationIfExecuting();

this.timeoutMs = timeoutMs;
}

public onTimeout(fn: (timedOutAfterMs: number) => void | Promise<void>): void {
if (this.executing > 0) {
throw new Error('cannot modify policy during execution');
}
this.throwForPolicyModificationIfExecuting();

this.onTimeoutFns.push(fn);
}

public async execute(fn: () => Promise<ResultType>): Promise<ResultType> {
this.executing++;

protected async policyExecutorImpl(fn: () => Promise<ResultType>): Promise<ResultType> {
const executionPromise = (async (): Promise<ResultType> => {
try {
return await fn();
Expand Down Expand Up @@ -77,7 +69,6 @@ export class TimeoutPolicy<ResultType> extends ProactivePolicy<ResultType> {
throw typedEx.innerException;
} finally {
clearTimeout(timeoutId);
this.executing--;
}
}
}
Loading

0 comments on commit 0a6fd61

Please sign in to comment.