Skip to content

Commit

Permalink
[Code] refactor launcher code, add unit tests (elastic#36863)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yulong committed May 24, 2019
1 parent 6e19e55 commit c30098e
Show file tree
Hide file tree
Showing 6 changed files with 596 additions and 177 deletions.
186 changes: 186 additions & 0 deletions x-pack/plugins/code/server/lsp/abstract_launcher.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

// eslint-disable-next-line max-classes-per-file
import { fork, ChildProcess } from 'child_process';
import path from 'path';
import fs from 'fs';

import { ServerOptions } from '../server_options';
import { createTestServerOption } from '../test_utils';
import { AbstractLauncher } from './abstract_launcher';
import { RequestExpander } from './request_expander';
import { LanguageServerProxy } from './proxy';
import { ConsoleLoggerFactory } from '../utils/console_logger_factory';
import { Logger } from '../log';

jest.setTimeout(10000);

// @ts-ignore
const options: ServerOptions = createTestServerOption();

// a mock function being called when then forked sub process status changes
// @ts-ignore
const mockMonitor = jest.fn();

class MockLauncher extends AbstractLauncher {
public childProcess?: ChildProcess;

constructor(name: string, targetHost: string, opt: ServerOptions) {
super(name, targetHost, opt, new ConsoleLoggerFactory());
}

createExpander(
proxy: LanguageServerProxy,
builtinWorkspace: boolean,
maxWorkspace: number
): RequestExpander {
return new RequestExpander(proxy, builtinWorkspace, maxWorkspace, this.options);
}

async getPort() {
return 19999;
}

async spawnProcess(installationPath: string, port: number, log: Logger): Promise<ChildProcess> {
const childProcess = fork(path.join(__dirname, 'mock_lang_server.js'));
this.childProcess = childProcess;
childProcess.on('message', msg => {
// eslint-disable-next-line no-console
console.log(msg);
mockMonitor(msg);
});
childProcess.send(`port ${await this.getPort()}`);
childProcess.send(`host ${this.targetHost}`);
childProcess.send('listen');
return childProcess;
}
}

class PassiveMockLauncher extends MockLauncher {
constructor(
name: string,
targetHost: string,
opt: ServerOptions,
private dieFirstTime: boolean = false
) {
super(name, targetHost, opt);
}

startConnect(proxy: LanguageServerProxy) {
proxy.awaitServerConnection();
}

async getPort() {
return 19998;
}

async spawnProcess(installationPath: string, port: number, log: Logger): Promise<ChildProcess> {
this.childProcess = fork(path.join(__dirname, 'mock_lang_server.js'));
this.childProcess.on('message', msg => {
// eslint-disable-next-line no-console
console.log(msg);
mockMonitor(msg);
});
this.childProcess.send(`port ${await this.getPort()}`);
this.childProcess.send(`host ${this.targetHost}`);
if (this.dieFirstTime) {
this.childProcess!.send('quit');
this.dieFirstTime = false;
} else {
this.childProcess!.send('connect');
}
return this.childProcess!;
}
}

beforeAll(async () => {
if (!fs.existsSync(options.workspacePath)) {
fs.mkdirSync(options.workspacePath, { recursive: true });
fs.mkdirSync(options.jdtWorkspacePath, { recursive: true });
}
});

beforeEach(() => {
mockMonitor.mockClear();
});

function delay(millis: number) {
return new Promise(resolve => {
setTimeout(() => resolve(), millis);
});
}

test('launcher can start and end a process', async () => {
const launcher = new MockLauncher('mock', 'localhost', options);
const proxy = await launcher.launch(false, 1, '');
await delay(100);
expect(mockMonitor.mock.calls[0][0]).toBe('process started');
expect(mockMonitor.mock.calls[1][0]).toBe('start listening');
expect(mockMonitor.mock.calls[2][0]).toBe('socket connected');
await proxy.exit();
await delay(100);
expect(mockMonitor.mock.calls[3][0]).toMatchObject({ method: 'shutdown' });
expect(mockMonitor.mock.calls[4][0]).toMatchObject({ method: 'exit' });
expect(mockMonitor.mock.calls[5][0]).toBe('exit process with code 0');
});

test('launcher can force kill the process if langServer can not exit', async () => {
const launcher = new MockLauncher('mock', 'localhost', options);
const proxy = await launcher.launch(false, 1, '');
await delay(100);
// set mock lang server to noExist mode
launcher.childProcess!.send('noExit');
mockMonitor.mockClear();
await proxy.exit();
await delay(2000);
expect(mockMonitor.mock.calls[0][0]).toMatchObject({ method: 'shutdown' });
expect(mockMonitor.mock.calls[1][0]).toMatchObject({ method: 'exit' });
expect(mockMonitor.mock.calls[2][0]).toBe('noExit');
expect(launcher.childProcess!.killed).toBe(true);
});

test('launcher can reconnect if process died', async () => {
const launcher = new MockLauncher('mock', 'localhost', options);
const proxy = await launcher.launch(false, 1, '');
await delay(1000);
mockMonitor.mockClear();
// let the process quit
launcher.childProcess!.send('quit');
await delay(5000);
// launcher should respawn a new process and connect
expect(mockMonitor.mock.calls[0][0]).toBe('process started');
expect(mockMonitor.mock.calls[1][0]).toBe('start listening');
expect(mockMonitor.mock.calls[2][0]).toBe('socket connected');
await proxy.exit();
await delay(2000);
});

test('passive launcher can start and end a process', async () => {
const launcher = new PassiveMockLauncher('mock', 'localhost', options);
const proxy = await launcher.launch(false, 1, '');
await delay(100);
expect(mockMonitor.mock.calls[0][0]).toBe('process started');
expect(mockMonitor.mock.calls[1][0]).toBe('start connecting');
expect(mockMonitor.mock.calls[2][0]).toBe('socket connected');
await proxy.exit();
await delay(100);
expect(mockMonitor.mock.calls[3][0]).toMatchObject({ method: 'shutdown' });
expect(mockMonitor.mock.calls[4][0]).toMatchObject({ method: 'exit' });
expect(mockMonitor.mock.calls[5][0]).toBe('exit process with code 0');
});

test('passive launcher should restart a process if a process died before connected', async () => {
const launcher = new PassiveMockLauncher('mock', 'localhost', options, true);
const proxy = await launcher.launch(false, 1, '');
await delay(100);
expect(mockMonitor.mock.calls[0][0]).toBe('process started');
expect(mockMonitor.mock.calls[1][0]).toBe('process started');
expect(mockMonitor.mock.calls[2][0]).toBe('start connecting');
expect(mockMonitor.mock.calls[3][0]).toBe('socket connected');
await proxy.exit();
await delay(1000);
});
190 changes: 190 additions & 0 deletions x-pack/plugins/code/server/lsp/abstract_launcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { ChildProcess } from 'child_process';
import { ILanguageServerLauncher } from './language_server_launcher';
import { ServerOptions } from '../server_options';
import { LoggerFactory } from '../utils/log_factory';
import { Logger } from '../log';
import { LanguageServerProxy } from './proxy';
import { RequestExpander } from './request_expander';

export abstract class AbstractLauncher implements ILanguageServerLauncher {
running: boolean = false;
private _currentPid: number = -1;
private child: ChildProcess | null = null;
private _startTime: number = -1;
private _proxyConnected: boolean = false;
protected constructor(
readonly name: string,
readonly targetHost: string,
readonly options: ServerOptions,
readonly loggerFactory: LoggerFactory
) {}

public async launch(builtinWorkspace: boolean, maxWorkspace: number, installationPath: string) {
const port = await this.getPort();
const log: Logger = this.loggerFactory.getLogger([
'code',
`${this.name}@${this.targetHost}:${port}`,
]);
let child: ChildProcess;
const proxy = new LanguageServerProxy(port, this.targetHost, log, this.options.lsp);
if (this.options.lsp.detach) {
log.debug('Detach mode, expected language server launch externally');
proxy.onConnected(() => {
this.running = true;
});
proxy.onDisconnected(() => {
this.running = false;
if (!proxy.isClosed) {
log.debug(`${this.name} language server disconnected, reconnecting`);
setTimeout(() => this.reconnect(proxy, installationPath, port, log), 1000);
}
});
} else {
child = await this.spawnProcess(installationPath, port, log);
this.child = child;
log.debug('spawned a child process ' + child.pid);
this._currentPid = child.pid;
this._startTime = Date.now();
this.running = true;
this.onProcessExit(child, () => this.reconnect(proxy, installationPath, port, log));
proxy.onDisconnected(async () => {
this._proxyConnected = true;
if (!proxy.isClosed) {
log.debug('proxy disconnected, reconnecting');
setTimeout(async () => {
await this.reconnect(proxy, installationPath, port, log, child);
}, 1000);
} else if (this.child) {
log.info('proxy closed, kill process');
await this.killProcess(this.child, log);
}
});
}
proxy.onExit(() => {
log.debug('proxy exited, is the process running? ' + this.running);
if (this.child && this.running) {
const p = this.child!;
setTimeout(async () => {
if (!p.killed) {
log.debug('killing the process after 1s');
await this.killProcess(p, log);
}
}, 1000);
}
});
proxy.listen();
this.startConnect(proxy);
await new Promise(resolve => {
proxy.onConnected(() => {
this._proxyConnected = true;
resolve();
});
});
return this.createExpander(proxy, builtinWorkspace, maxWorkspace);
}

private onProcessExit(child: ChildProcess, reconnectFn: () => void) {
const pid = child.pid;
child.on('exit', () => {
if (this._currentPid === pid) {
this.running = false;
// if the process exited before proxy connected, then we reconnect
if (!this._proxyConnected) {
reconnectFn();
}
}
});
}

/**
* proxy should be connected within this timeout, otherwise we reconnect.
*/
protected startupTimeout = 3000;

/**
* try reconnect the proxy when disconnected
*/
public async reconnect(
proxy: LanguageServerProxy,
installationPath: string,
port: number,
log: Logger,
child?: ChildProcess
) {
log.debug('reconnecting');
if (this.options.lsp.detach) {
this.startConnect(proxy);
} else {
const processExpired = () => Date.now() - this._startTime > this.startupTimeout;
if (child && !child.killed && !processExpired()) {
this.startConnect(proxy);
} else {
if (child && this.running) {
log.debug('killing the old process.');
await this.killProcess(child, log);
}
this.child = await this.spawnProcess(installationPath, port, log);
log.debug('spawned a child process ' + this.child.pid);
this._currentPid = this.child.pid;
this._startTime = Date.now();
this.running = true;
this.onProcessExit(this.child, () =>
this.reconnect(proxy, installationPath, port, log, child)
);
this.startConnect(proxy);
}
}
}

abstract async getPort(): Promise<number>;

startConnect(proxy: LanguageServerProxy) {
proxy.connect();
}

/**
* await for proxy connected, create a request expander
* @param proxy
*/
abstract createExpander(
proxy: LanguageServerProxy,
builtinWorkspace: boolean,
maxWorkspace: number
): RequestExpander;

abstract async spawnProcess(
installationPath: string,
port: number,
log: Logger
): Promise<ChildProcess>;

private killProcess(child: ChildProcess, log: Logger) {
if (!child.killed) {
return new Promise<boolean>((resolve, reject) => {
// if not killed within 1s
const t = setTimeout(reject, 1000);
child.on('exit', () => {
clearTimeout(t);
resolve(true);
});
child.kill();
log.info('killed process ' + child.pid);
})
.catch(() => {
// force kill
child.kill('SIGKILL');
log.info('force killed process ' + child.pid);
return child.killed;
})
.finally(() => {
if (this._currentPid === child.pid) this.running = false;
});
}
}
}
Loading

0 comments on commit c30098e

Please sign in to comment.