Skip to content

Commit

Permalink
even more graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
localyost3000 committed Feb 7, 2021
1 parent be2703a commit 04d653d
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 40 deletions.
4 changes: 2 additions & 2 deletions src/server/services/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,13 @@ export class DatabaseService {
}

private _beginTransaction(conn: Connection): Observable<Connection> {
return Observable.create(obs => {
return new Observable(obs => {
conn.beginTransaction((err) => {
if (err) {
return obs.error(err);
}
obs.next(conn);
return obs.complete(conn);
obs.complete();
});
});
}
Expand Down
125 changes: 87 additions & 38 deletions src/server/services/health.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import * as cluster from 'cluster';
import * as inspector from 'inspector';
import { Server } from 'net';
import { BehaviorSubject, forkJoin, Observable, of } from 'rxjs';
import { distinctUntilChanged, switchMap } from 'rxjs/operators';
import { BehaviorSubject, forkJoin, Observable, of, race, timer } from 'rxjs';
import { distinctUntilChanged, switchMap, take, tap } from 'rxjs/operators';
import { LoggingService } from './logger';

const EXIT_SIGNALS: string[] = [
Expand All @@ -17,8 +18,6 @@ export class HealthService {

private _servers: Server[] = [];
private _services = [];

private _timeouts = {};
private _listeners = [];

constructor(
Expand All @@ -33,6 +32,30 @@ export class HealthService {
})
);
});

if (cluster.isWorker) {
cluster.worker.on('message', (msg) => {
if (EXIT_SIGNALS.indexOf(msg) >= 0) {
if (!this._isHealthy.value) {
this._logger.log(`${msg} ignored. Process is already terminating`);
return;
}
this._isHealthy.next(false);
this._logger.log(`shutting down...`);
this._cleanup(msg)
.subscribe(
_ => {
this._logger.log('Goodbye.');
process.exit(0);
},
err => {
this._logger.logError(err);
this._logger.log('Goodbye.');
process.exit(1);
});
}
});
}
}

setHealthy(isHealthy: boolean) {
Expand Down Expand Up @@ -71,18 +94,28 @@ export class HealthService {
return alive;
}

private _cleanup(signal: string) {
private _cleanup(signal) {
if (inspector && inspector.url()) {
this._logger.log('closing inspector');
inspector.close();
}
if (cluster.isMaster) {
return forkJoin([
of(true),
...(this._servers.map(s => this._closeServer(s)))
]).pipe(
switchMap(_ => {
const workers = this.getLivingWorkers() || [];
return forkJoin([
of(true),
...(workers.map(w => this._cleanupWorker(w, signal)))
]);
return race(
this._cleanupWorkers(signal),
timer(10 * 1000)
.pipe(
take(1),
tap(_ => {
this._logger.log('[ master ]: Workers took too long to shut down');
this.getLivingWorkers().forEach(w => w.kill('SIGKILL'));
})
)
);
}),
switchMap(_ => {
return forkJoin([
Expand All @@ -108,24 +141,25 @@ export class HealthService {
}

private _handleExitSignal(signal: string) {
if (!this._isHealthy.value) {
this._logger.log(`${signal} ignored. Process is already terminating`);
if (cluster.isMaster) {
if (!this._isHealthy.value) {
this._logger.log(`${signal} ignored. Process is already terminating`);
return;
}
this._isHealthy.next(false);
this._logger.log(`[ master ]: caught ${signal}. shutting down...`);
this._cleanup(signal)
.subscribe(_ => {
this._logger.log(`[ master ]: Goodbye.`);
process.exit(0)
}, err => {
this._logger.logError(err);
process.exit(1);
});
} else {
// workers wait to die
return;
}
this._isHealthy.next(false);
this._logger.log(`${cluster.isMaster ? '[ master ]: ' : ''}caught ${signal}. shutting down...`);
this._cleanup(signal)
.subscribe(_ => {
if (cluster.isMaster) {
this._logger.log(`[ master ]: Goodbye.`);
} else {
this._logger.log(`Goodbye.`);
}
process.exit(0)
}, err => {
this._logger.logError(err);
process.exit(1);
});
}

private _closeServer(server: Server): Observable<any> {
Expand All @@ -137,24 +171,39 @@ export class HealthService {
});
}

private _cleanupWorker(worker: cluster.Worker, signal: string): Observable<any> {
private _cleanupWorkers(signal): Observable<any> {
if (!cluster.isMaster) {
return of(true);
}
this.getLivingWorkers().forEach(w => {
w.send(signal);
});
return new Observable(obs => {
worker.on('exit', (code, signal) => {
if (this._timeouts[worker.id]) {
clearTimeout(this._timeouts[worker.id]);
}
obs.next(`${worker.id} exited`);
cluster.disconnect(() => {
obs.next(true);
obs.complete();
});

worker.kill(signal);
this._timeouts[worker.id] = setTimeout(() => {
this._logger.log('worker shutdown time expired. forcefully killing worker', worker.id);
worker.process.kill('SIGKILL');
}, 6000); // based on maximum `keep-alive` timeout
});
}

// private _cleanupWorker(worker: cluster.Worker, signal: string): Observable<any> {
// return new Observable(obs => {
// worker.on('exit', (code, signal) => {
// if (this._timeouts[worker.id]) {
// clearTimeout(this._timeouts[worker.id]);
// }
// obs.next(`${worker.id} exited`);
// obs.complete();
// });

// worker.kill(signal);
// this._timeouts[worker.id] = setTimeout(() => {
// this._logger.log('worker shutdown time expired. forcefully killing worker', worker.id);
// worker.process.kill('SIGKILL');
// }, 6000); // based on maximum `keep-alive` timeout
// });
// }

private _cleanupService(service): Observable<any> {
if (service && service.cleanup) {
return service.cleanup();
Expand Down
3 changes: 3 additions & 0 deletions src/server/services/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ export class LoggingService {

log(...messages: any[]): void {
if (cluster.isMaster) {
if (messages.length && !/^\[ (m|w)/i.test(messages[0])) {
messages.unshift(`[ master ]:`);
}
console.log(...messages);
} else {
if (cluster.worker.isConnected()) {
Expand Down

0 comments on commit 04d653d

Please sign in to comment.