Skip to content

Commit

Permalink
Generalize apollo-server graceful shutdown to all integrations
Browse files Browse the repository at this point in the history
Previously, the batteries-included `apollo-server` package had a special
override of `stop()` which drains the HTTP server before letting the
actual Apollo Server `stop()` machinery begin. This meant that
`apollo-server` followed this nice shutdown lifecycle:
- Stop listening for new connections
- Close all idle connections and start closing connections as they go
  idle
- Wait a grace period for all connections to close and force-close any
  remaining ones
- Transition ApolloServer to the stopping state, where no operations
  will run
- Run stop hooks (eg send final usage report)

This was great... but only `apollo-server` worked this way, because only
`apollo-server` has full knowledge and control over its HTTP server.

This PR adds a server draining step to the ApolloServer lifecycle and
plugin interface, and provides a built-in plugin which drains a Node
`http.Server` using the logic of the first three steps above.
`apollo-server`'s behavior is now just to automatically install the
plugin.

Specifically:
- Add a new 'phase' called `draining` that fits between `started` and
  `stopping`. Like `started`, operations can still execute during
  `draining`. Like `stopping`, any concurrent call to `stop()` will just
  block until the first `stop()` call finishes rather than starting a
  second shutdown process.
- Add a new `drainServer` plugin hook (on the object returned by
  `serverWillStart`). Invoke all `drainServer` hooks in parallel during
  the `draining` phase.
- Make calling `stop()` when `start()` has not yet completed
  successfully into an error. That behavior was previously undefined.
  Note that as of #5639, the automatic `stop()` call from signal
  handlers can't happen before `start()` succeeds.
- Add `ApolloServerPluginDrainHttpServer` to `apollo-server-core`.
  This plugin implements `drainServer` using the `Stopper` class
  that was previously in the `apollo-server` package. The default
  grace period is 10 seconds.
- Clean up integration tests to just use `stop()` with the plugin
  instead of separately stopping the HTTP server. Note that for Fastify
  specifically we also call `app.close` although there is some weirdness
  here around both `app.close` and our Stopper closing the same server.
  A comment describes the weirdness; perhaps Fastify experts can improve
  this later.
- The Hapi web framework has built in logic that is similar to our
  Stopper, so `apollo-server-hapi` exports
  `ApolloServerPluginStopHapiServer` which should be used instead of the
  other plugin with Hapi.
- Fix some test issues (eg, have FakeTimers only mock out Date.now
  instead of setImmediate, drop an erroneous `const` which made an `app`
  not get cleaned up, etc).

Fixes #5074.
  • Loading branch information
glasser committed Aug 23, 2021
1 parent 9b3467c commit 37e32c6
Show file tree
Hide file tree
Showing 22 changed files with 445 additions and 220 deletions.
134 changes: 94 additions & 40 deletions packages/apollo-server-core/src/ApolloServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,36 @@ export type SchemaDerivedData = {
};

type ServerState =
| { phase: 'initialized'; schemaManager: SchemaManager }
| {
phase: 'initialized';
schemaManager: SchemaManager;
}
| {
phase: 'starting';
barrier: Resolvable<void>;
schemaManager: SchemaManager;
}
| { phase: 'failed to start'; error: Error }
| {
phase: 'failed to start';
error: Error;
}
| {
phase: 'started';
schemaManager: SchemaManager;
}
| { phase: 'stopping'; barrier: Resolvable<void> }
| { phase: 'stopped'; stopError: Error | null };
| {
phase: 'draining';
schemaManager: SchemaManager;
barrier: Resolvable<void>;
}
| {
phase: 'stopping';
barrier: Resolvable<void>;
}
| {
phase: 'stopped';
stopError: Error | null;
};

// Throw this in places that should be unreachable (because all other cases have
// been handled, reducing the type of the argument to `never`). TypeScript will
Expand Down Expand Up @@ -121,6 +138,7 @@ export class ApolloServerBase<
private state: ServerState;
private toDispose = new Set<() => Promise<void>>();
private toDisposeLast = new Set<() => Promise<void>>();
private drainServers: (() => Promise<void>) | null = null;
private experimental_approximateDocumentStoreMiB: Config['experimental_approximateDocumentStoreMiB'];
private stopOnTerminationSignals: boolean;
private landingPage: LandingPage | null = null;
Expand All @@ -137,13 +155,15 @@ export class ApolloServerBase<
typeDefs,
parseOptions = {},
introspection,
mocks,
mockEntireSchema,
plugins,
gateway,
experimental_approximateDocumentStoreMiB,
stopOnTerminationSignals,
apollo,
stopOnTerminationSignals,
// These next options aren't used in this function but they don't belong in
// requestOptions.
mocks,
mockEntireSchema,
experimental_approximateDocumentStoreMiB,
...requestOptions
} = config;

Expand Down Expand Up @@ -425,6 +445,17 @@ export class ApolloServerBase<
});
}

const drainServerCallbacks = taggedServerListeners.flatMap((l) =>
l.serverListener.drainServer ? [l.serverListener.drainServer] : [],
);
if (drainServerCallbacks.length) {
this.drainServers = async () => {
await Promise.all(
drainServerCallbacks.map((drainServer) => drainServer()),
);
};
}

// Find the renderLandingPage callback, if one is provided. If the user
// installed ApolloServerPluginLandingPageDisabled then there may be none
// found. On the other hand, if the user installed a landingPage plugin,
Expand Down Expand Up @@ -537,6 +568,7 @@ export class ApolloServerBase<
'This data graph is missing a valid configuration. More details may be available in the server logs.',
);
case 'started':
case 'draining': // We continue to run operations while draining.
return this.state.schemaManager.getSchemaDerivedData();
case 'stopping':
throw new Error(
Expand All @@ -559,7 +591,7 @@ export class ApolloServerBase<
}

protected assertStarted(methodName: string) {
if (this.state.phase !== 'started') {
if (this.state.phase !== 'started' && this.state.phase !== 'draining') {
throw new Error(
'You must `await server.start()` before calling `server.' +
methodName +
Expand Down Expand Up @@ -648,45 +680,67 @@ export class ApolloServerBase<
};
}

/**
* XXX: Note that stop() was designed to be called after start() has finished,
* and should not be called concurrently with start() or before start(), or
* else unexpected behavior may occur (e.g. some dependencies may not be
* stopped).
*/
public async stop() {
// Calling stop more than once should have the same result as the first time.
if (this.state.phase === 'stopped') {
if (this.state.stopError) {
throw this.state.stopError;
}
return;
}
switch (this.state.phase) {
case 'initialized':
case 'starting':
case 'failed to start':
throw Error(
'apolloServer.stop() should only be called after `await apolloServer.start()` has succeeded',
);

// Two parallel calls to stop; just wait for the other one to finish and
// do whatever it did.
if (this.state.phase === 'stopping') {
await this.state.barrier;
// The cast here is because TS doesn't understand that this.state can
// change during the await
// (https://github.com/microsoft/TypeScript/issues/9998).
const state = this.state as ServerState;
if (state.phase !== 'stopped') {
throw Error(`Surprising post-stopping state ${state.phase}`);
}
if (state.stopError) {
throw state.stopError;
// Calling stop more than once should have the same result as the first time.
case 'stopped':
if (this.state.stopError) {
throw this.state.stopError;
}
return;

// Two parallel calls to stop; just wait for the other one to finish and
// do whatever it did.
case 'stopping':
case 'draining': {
await this.state.barrier;
// The cast here is because TS doesn't understand that this.state can
// change during the await
// (https://github.com/microsoft/TypeScript/issues/9998).
const state = this.state as ServerState;
if (state.phase !== 'stopped') {
throw Error(`Surprising post-stopping state ${state.phase}`);
}
if (state.stopError) {
throw state.stopError;
}
return;
}
return;

case 'started':
// This is handled by the rest of the function.
break;

default:
throw new UnreachableCaseError(this.state);
}

// Commit to stopping, actually stop, and update the phase.
const barrier = resolvable();

// Commit to stopping and start draining servers.
this.state = {
phase: 'draining',
schemaManager: this.state.schemaManager,
barrier,
};

await this.drainServers?.();

// Servers are drained. Prevent further operations from starting and call
// stop handlers.
this.state = { phase: 'stopping', barrier: resolvable() };
try {
// We run shutdown handlers in two phases because we don't want to turn
// off our signal listeners until we've done the important parts of shutdown
// like running serverWillStop handlers. (We can make this more generic later
// if it's helpful.)
// off our signal listeners (ie, allow signals to kill the process) until
// we've done the important parts of shutdown like running serverWillStop
// handlers. (We can make this more generic later if it's helpful.)
await Promise.all([...this.toDispose].map((dispose) => dispose()));
await Promise.all([...this.toDisposeLast].map((dispose) => dispose()));
} catch (stopError) {
Expand Down
21 changes: 11 additions & 10 deletions packages/apollo-server-core/src/__tests__/ApolloServerBase.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,17 @@ describe('ApolloServerBase construction', () => {
warn,
error: jest.fn(),
};
expect(() =>
new ApolloServerBase({
typeDefs,
resolvers,
apollo: {
graphVariant: 'foo',
key: 'service:real:key',
},
logger,
}).stop(),
expect(
() =>
new ApolloServerBase({
typeDefs,
resolvers,
apollo: {
graphVariant: 'foo',
key: 'service:real:key',
},
logger,
}),
).not.toThrow();
expect(warn).toHaveBeenCalledTimes(1);
expect(warn.mock.calls[0][0]).toMatch(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
const http = require('http');
const { Stopper } = require('../../../dist/stoppable.js');
const {
Stopper,
} = require('../../../../../dist/plugin/drainHttpServer/stoppable.js');

const grace = Number(process.argv[2] || Infinity);
let stopper;
Expand Down
26 changes: 26 additions & 0 deletions packages/apollo-server-core/src/plugin/drainHttpServer/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import type http from 'http';
import type { ApolloServerPlugin } from 'apollo-server-plugin-base';
import { Stopper } from './stoppable';

// FIXME docs in code
// FIXME write docs
export interface ApolloServerPluginDrainHttpServerOptions {
httpServer: http.Server;
// Defaults to 10_000
stopGracePeriodMillis?: number;
}

export function ApolloServerPluginDrainHttpServer(
options: ApolloServerPluginDrainHttpServerOptions,
): ApolloServerPlugin {
const stopper = new Stopper(options.httpServer);
return {
async serverWillStart() {
return {
async drainServer() {
await stopper.stop(options.stopGracePeriodMillis ?? 10_000);
},
};
},
};
}
File renamed without changes.
12 changes: 12 additions & 0 deletions packages/apollo-server-core/src/plugin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,18 @@ export function ApolloServerPluginCacheControlDisabled(): ApolloServerPlugin {
}
//#endregion

//#region Drain HTTP server
import type { ApolloServerPluginDrainHttpServerOptions } from './drainHttpServer';
export type { ApolloServerPluginDrainHttpServerOptions } from './drainHttpServer';
export function ApolloServerPluginDrainHttpServer(
options: ApolloServerPluginDrainHttpServerOptions,
): ApolloServerPlugin {
return require('./drainHttpServer').ApolloServerPluginDrainHttpServer(
options,
);
}
//#endregion

//#region LandingPage
import type { InternalApolloServerPlugin } from '../internalPlugin';
export function ApolloServerPluginLandingPageDisabled(): ApolloServerPlugin {
Expand Down
27 changes: 19 additions & 8 deletions packages/apollo-server-express/src/__tests__/ApolloServer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
gql,
AuthenticationError,
ApolloServerPluginCacheControlDisabled,
ApolloServerPluginDrainHttpServer,
} from 'apollo-server-core';
import {
ApolloServer,
Expand All @@ -34,24 +35,34 @@ const resolvers = {
};

describe('apollo-server-express', () => {
let server: ApolloServer;
let httpServer: http.Server;
let serverToCleanUp: ApolloServer | null = null;
testApolloServer(
async (config: ApolloServerExpressConfig, options) => {
server = new ApolloServer(config);
serverToCleanUp = null;
const app = express();
const httpServer = http.createServer(app);
const server = new ApolloServer({
...config,
plugins: [
...(config.plugins ?? []),
ApolloServerPluginDrainHttpServer({
httpServer: httpServer,
}),
],
});
if (!options?.suppressStartCall) {
await server.start();
serverToCleanUp = server;
}
const app = express();
server.applyMiddleware({ app, path: options?.graphqlPath });
httpServer = await new Promise<http.Server>((resolve) => {
const s: http.Server = app.listen({ port: 0 }, () => resolve(s));
await new Promise((resolve) => {
httpServer.once('listening', resolve);
httpServer.listen({ port: 0 });
});
return createServerInfo(server, httpServer);
},
async () => {
if (httpServer?.listening) await httpServer.close();
if (server) await server.stop();
await serverToCleanUp?.stop();
},
);
});
Expand Down
31 changes: 19 additions & 12 deletions packages/apollo-server-express/src/__tests__/expressApollo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,6 @@ import testSuite, {
CreateAppOptions,
} from 'apollo-server-integration-testsuite';

async function createApp(options: CreateAppOptions = {}) {
const app = express();

const server = new ApolloServer(
(options.graphqlOptions as ApolloServerExpressConfig) || { schema: Schema },
);
await server.start();
server.applyMiddleware({ app });
return app;
}

describe('expressApollo', () => {
it('throws error if called without schema', function () {
expect(() => new ApolloServer(undefined as any)).toThrow(
Expand All @@ -25,5 +14,23 @@ describe('expressApollo', () => {
});

describe('integration:Express', () => {
testSuite({ createApp });
let serverToCleanUp: ApolloServer | null = null;
testSuite({
createApp: async function createApp(options: CreateAppOptions = {}) {
serverToCleanUp = null;
const app = express();
const server = new ApolloServer(
(options.graphqlOptions as ApolloServerExpressConfig) || {
schema: Schema,
},
);
await server.start();
serverToCleanUp = server;
server.applyMiddleware({ app });
return app;
},
destroyApp: async function () {
await serverToCleanUp?.stop();
},
});
});
Loading

0 comments on commit 37e32c6

Please sign in to comment.