Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ratelimit example #241

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions typescript/patterns-use-cases/src/ratelimit/app.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import { endpoint } from "@restatedev/restate-sdk";

import { limiter } from "./limiter";
import { myService } from "./service";

endpoint().bind(limiter).bind(myService).listen();
229 changes: 229 additions & 0 deletions typescript/patterns-use-cases/src/ratelimit/limiter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
// a faithful reimplementation of https://pkg.go.dev/golang.org/x/time/rate#Limiter
// using virtual object state

import { object, ObjectContext } from "@restatedev/restate-sdk";

type LimiterState = {
state: LimiterStateInner;
};
type LimiterStateInner = {
limit: number;
burst: number;
tokens: number;
// last is the last time the limiter's tokens field was updated, in unix millis
last: number;
// lastEvent is the latest time of a rate-limited event (past or future), in unix millis
lastEvent: number;
};

export interface Reservation {
ok: boolean;
tokens: number;
creationDate: number;
dateToAct: number;
// This is the Limit at reservation time, it can change later.
limit: number;
}

export const limiter = object({
name: "limiter",
handlers: {
state: async (
ctx: ObjectContext<LimiterState>,
): Promise<LimiterStateInner> => {
return getState(ctx);
},
tokens: async (ctx: ObjectContext<LimiterState>): Promise<number> => {
// deterministic date not needed, as there is only an output entry
const tokens = advance(await getState(ctx), Date.now());
return tokens;
},
reserve: async (
ctx: ObjectContext<LimiterState>,
{
n = 1,
waitLimitMillis = Infinity,
}: { n?: number; waitLimitMillis?: number },
): Promise<Reservation> => {
let lim = await getState(ctx);

if (lim.limit == Infinity) {
// deterministic date is not necessary, as this is part of a response body, which won't be replayed.
const now = Date.now();
return {
ok: true,
tokens: n,
creationDate: now,
dateToAct: now,
limit: 0,
};
}

let r: Reservation;
({ lim, r } = await ctx.run(() => {
const now = Date.now();
let tokens = advance(lim, now);

// Calculate the remaining number of tokens resulting from the request.
tokens -= n;

// Calculate the wait duration
let waitDurationMillis = 0;
if (tokens < 0) {
waitDurationMillis = durationFromTokens(lim.limit, -tokens);
}

// Decide result
const ok = n <= lim.burst && waitDurationMillis <= waitLimitMillis;

// Prepare reservation
const r = {
ok,
tokens: 0,
creationDate: now,
dateToAct: 0,
limit: lim.limit,
} satisfies Reservation;

if (ok) {
r.tokens = n;
r.dateToAct = now + waitDurationMillis;

// Update state
lim.last = now;
lim.tokens = tokens;
lim.lastEvent = r.dateToAct;
}

return { lim, r };
}));

setState(ctx, lim);

return r;
},
setRate: async (
ctx: ObjectContext<LimiterState>,
{ newLimit, newBurst }: { newLimit?: number; newBurst?: number },
) => {
if (newLimit === undefined && newBurst === undefined) {
return;
}

let lim = await getState(ctx);

lim = await ctx.run(() => {
const now = Date.now();
const tokens = advance(lim, now);

lim.last = now;
lim.tokens = tokens;
if (newLimit !== undefined) lim.limit = newLimit;
if (newBurst !== undefined) lim.burst = newBurst;

return lim;
});

setState(ctx, lim);
},
cancelReservation: async (
ctx: ObjectContext<LimiterState>,
r: Reservation,
) => {
let lim = await getState(ctx);

lim = await ctx.run(() => {
const now = Date.now();

if (lim.limit == Infinity || r.tokens == 0 || r.dateToAct < now) {
return lim;
}

// calculate tokens to restore
// The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved
// after r was obtained. These tokens should not be restored.
const restoreTokens =
r.tokens - tokensFromDuration(r.limit, lim.lastEvent - r.dateToAct);
if (restoreTokens <= 0) {
return lim;
}
// advance time to now
let tokens = advance(lim, now);
// calculate new number of tokens
tokens += restoreTokens;
if (tokens > lim.burst) {
tokens = lim.burst;
}
// update state
lim.last = now;
lim.tokens = tokens;
if (r.dateToAct == lim.lastEvent) {
const prevEvent =
r.dateToAct + durationFromTokens(r.limit, -r.tokens);
if (prevEvent >= now) {
lim.lastEvent = prevEvent;
}
}

return lim;
});

setState(ctx, lim);
},
},
});

function advance(lim: LimiterStateInner, date: number): number {
let last = lim.last;
if (date <= last) {
last = date;
}

// Calculate the new number of tokens, due to time that passed.
const elapsedMillis = date - last;
const delta = tokensFromDuration(lim.limit, elapsedMillis);
let tokens = lim.tokens + delta;
if (tokens > lim.burst) {
tokens = lim.burst;
}

return tokens;
}

async function getState(
ctx: ObjectContext<LimiterState>,
): Promise<LimiterStateInner> {
return (
(await ctx.get("state")) ?? {
limit: 0,
burst: 0,
tokens: 0,
last: 0,
lastEvent: 0,
}
);
}

async function setState(
ctx: ObjectContext<LimiterState>,
lim: LimiterStateInner,
) {
ctx.set("state", lim);
}

function durationFromTokens(limit: number, tokens: number): number {
if (limit <= 0) {
return Infinity;
}

return (tokens / limit) * 1000;
}

function tokensFromDuration(limit: number, durationMillis: number): number {
if (limit <= 0) {
return 0;
}
return (durationMillis / 1000) * limit;
}

export type Limiter = typeof limiter;
150 changes: 150 additions & 0 deletions typescript/patterns-use-cases/src/ratelimit/limiter_client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
import { Context, TerminalError } from "@restatedev/restate-sdk";
import type {
Limiter as LimiterObject,
Reservation as ReservationResponse,
} from "./limiter";

export interface Reservation extends ReservationResponse {
// cancel indicates that the reservation holder will not perform the reserved action
// and reverses the effects of this Reservation on the rate limit as much as possible,
// considering that other reservations may have already been made.
cancel(): void;
}

export interface Limiter {
// limit returns the maximum overall event rate.
limit(): Promise<number>;
// burst returns the maximum burst size. Burst is the maximum number of tokens
// that can be consumed in a single call to allow, reserve, or wait, so higher
// Burst values allow more events to happen at once.
// A zero Burst allows no events, unless limit == Inf.
burst(): Promise<number>;
// tokens returns the number of tokens available at time t (defaults to now).
tokens(): Promise<number>;
// allow reports whether n events may happen at time t.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use reserve or wait.
allow(n?: number): Promise<boolean>;
// reserve returns a Reservation that indicates how long the caller must wait before n events happen.
// The limiter takes this Reservation into account when allowing future events.
// The returned Reservation’s ok parameter is false if n exceeds the limiter's burst size, or provided waitLimitMillis.
// Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events.
// If you need to cancel the delay, use wait instead.
// To drop or skip events exceeding rate limit, use allow instead.
reserve(n?: number, waitLimitMillis?: number): Promise<Reservation>;
// setLimit sets a new limit for the limiter. The new limit, and burst, may be violated
// or underutilized by those which reserved (using reserve or wait) but did not yet act
// before setLimit was called.
setLimit(newLimit: number): Promise<void>;
// setBurst sets a new burst size for the limiter.
setBurst(newBurst: number): Promise<void>;
// setRate sets a new limit and burst size for the limiter.
setRate(newLimit: number, newBurst: number): Promise<void>;
// waitN blocks until the limiter permits n events to happen.
// It returns an error if n exceeds the limiter's burst size, the invocation is canceled,
// or the wait would be longer than the deadline.
// The burst limit is ignored if the rate limit is Inf.
wait(n?: number, waitLimitMillis?: number): Promise<void>;
}

export namespace Limiter {
export function fromContext(ctx: Context, limiterID: string): Limiter {
const client = ctx.objectClient<LimiterObject>(
{ name: "limiter" },
limiterID,
);
return {
async limit() {
return (await client.state()).limit;
},
async burst() {
return (await client.state()).burst;
},
async tokens() {
return client.tokens();
},
async allow(n?: number) {
const r = await client.reserve({
n,
waitLimitMillis: 0,
});
return r.ok;
},
async reserve(n?: number, waitLimitMillis?: number) {
const r = await client.reserve({
n,
waitLimitMillis,
});
return {
cancel() {
ctx
.objectSendClient<LimiterObject>({ name: "limiter" }, limiterID)
.cancelReservation(r);
},
...r,
};
},
async setLimit(newLimit: number) {
return client.setRate({
newLimit,
});
},
async setBurst(newBurst: number) {
return client.setRate({
newBurst,
});
},
async setRate(newLimit: number, newBurst: number) {
return client.setRate({
newLimit,
newBurst,
});
},
async wait(n: number = 1, waitLimitMillis?: number) {
// Reserve
const r = await this.reserve(n, waitLimitMillis);
if (!r.ok) {
if (waitLimitMillis === undefined) {
throw new TerminalError(
`rate: Wait(n=${n}) would exceed the limiters burst`,
{ errorCode: 429 },
);
} else {
throw new TerminalError(
`rate: Wait(n=${n}) would either exceed the limiters burst or the provided waitLimitMillis`,
{ errorCode: 429 },
);
}
}
// Wait if necessary
const delay = delayFrom(r, r.creationDate);
if (delay == 0) {
return;
}

try {
await ctx.sleep(delay);
} catch (e) {
// this only happens on invocation cancellation - cancel the reservation in the background
r.cancel();
throw e;
}
},
};
}
}

// delayFrom returns the duration in millis for which the reservation holder must wait
// before taking the reserved action. Zero duration means act immediately.
// Infinity means the limiter cannot grant the tokens requested in this
// Reservation within the maximum wait time.
function delayFrom(r: ReservationResponse, date: number): number {
if (!r.ok) {
return Infinity;
}
const delay = r.dateToAct - date;
if (delay < 0) {
return 0;
}
return Math.floor(delay);
}
Loading