Skip to content

Commit

Permalink
resolve part of #2300, improve transfer rate of pushing objects to a …
Browse files Browse the repository at this point in the history
…remote by pushing them component afte component and waiting the remote to consume before continue pushing
  • Loading branch information
davidfirst committed Feb 5, 2020
1 parent 54c8f6d commit 66a5fc4
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 21 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [unreleased]

- [#2300](https://github.com/teambit/bit/issues/2300) improve transfer rate of pushing objects to a remote

## [[14.7.3] - 2020-02-02](https://github.com/teambit/bit/releases/tag/v14.7.3)

### New
Expand Down
21 changes: 20 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@
"@types/mocha": "^5.2.7",
"@types/node": "^12.7.9",
"@types/open": "^6.2.1",
"@types/ssh2": "^0.5.39",
"@types/vinyl": "^2.0.3",
"@typescript-eslint/eslint-plugin": "^2.15.0",
"@typescript-eslint/parser": "^2.15.0",
Expand Down
9 changes: 9 additions & 0 deletions src/cli/commands/private-cmds/_put-cmd.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { performance } from 'perf_hooks';
import Command from '../../command';
import { fromBase64, buildCommandMessage, packCommand, unpackCommand } from '../../../utils';
import { put } from '../../../api/scope';
Expand All @@ -16,15 +17,23 @@ export default class Put extends Command {

action([path, args]: [string, string]): Promise<any> {
let data = '';
const t0 = performance.now();
const { headers } = unpackCommand(args);
compressResponse = clientSupportCompressedCommand(headers.version);
checkVersionCompatibilityOnTheServer(headers.version);
return new Promise((resolve, reject) => {
process.stdin
.on('data', chunk => {
data += chunk.toString();
// const size = chunk.byteLength;
// logger.debug(`DATA ${size}B. ${Math.floor(size / 1024)}KB ${Math.floor(size / 1024 / 1024)}MB`);
})
.on('end', () => {
const size = data.length;
logger.debug(`END ${size}B. ${Math.floor(size / 1024)}KB ${Math.floor(size / 1024 / 1024)}MB`);
const t1 = performance.now();
logger.debug(`_put, getting all data fro the client took ${t1 - t0} milliseconds.`);
logger.debug(`_put, transfer rate is ${Math.floor(size / 1024 / 1024 / ((t1 - t0) / 1000))} MB / Sec`);
logger.info('Checking if a migration is needed');
const scopePath = fromBase64(path);
return migrate(scopePath, false)
Expand Down
15 changes: 15 additions & 0 deletions src/scope/component-objects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,22 @@ export default class ComponentObjects {
return result;
}

/**
* _put:
* until v14.7.3, the client was sending all its components with one JSON.stringify call.
* Due to performance issue, this has been changed to send each componentAndObjects separately.
* The delimiter between the each componentAndObjects instance is one space.
*
* _fetch:
* still using the old method, will probably need to switch as well.
*/
static manyFromString(str: string): ComponentObjects[] {
if (str.includes(' ')) {
return str
.split(' ')
.filter(s => s.length > 1)
.map(componentObject => ComponentObjects.fromString(componentObject));
}
return JSON.parse(str).map(componentObject => ComponentObjects.fromString(componentObject));
}

Expand Down
57 changes: 37 additions & 20 deletions src/scope/network/ssh/ssh.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint max-classes-per-file: 0 */
import SSH2 from 'ssh2';
import SSH2, { Channel } from 'ssh2';
import R from 'ramda';
import * as os from 'os';
import merge from 'lodash.merge';
Expand Down Expand Up @@ -69,6 +69,7 @@ export const DEFAULT_READ_STRATEGIES: SSHConnectionStrategyName[] = [
'user-password'
];
export default class SSH implements Network {
// @ts-ignore
connection: SSH2 | null | undefined;
path: string;
username: string;
Expand Down Expand Up @@ -211,7 +212,7 @@ export default class SSH implements Network {
authFailedMsg: string
): Promise<SSH> {
const connectWithConfigP = () => {
const conn = new SSH2();
const conn = new SSH2.Client();
return new Promise((resolve, reject) => {
conn
.on('error', err => {
Expand Down Expand Up @@ -268,33 +269,49 @@ export default class SSH implements Network {
context = context || {};
context.sshUsername = this._sshUsername;
}
// No need to use packCommand on the payload in case of put command
// because we handle all the base64 stuff in a better way inside the ComponentObjects.manyToString
// inside pushMany function here
const cmd = this.buildCmd(
commandName,
absolutePath(this.path || ''),
commandName === '_put' ? null : payload,
context
);
if (!this.connection) {
throw new Error('ssh connection is not defined');
}
// eslint-disable-next-line consistent-return
return new Promise((resolve, reject) => {
let res = '';
let err;
// No need to use packCommand on the payload in case of put command
// because we handle all the base64 stuff in a better way inside the ComponentObjects.manyToString
// inside pushMany function here
const cmd = this.buildCmd(
commandName,
absolutePath(this.path || ''),
commandName === '_put' ? null : payload,
context
);
if (!this.connection) {
err = 'ssh connection is not defined';
logger.error('ssh', err);
return reject(err);
}
// eslint-disable-next-line consistent-return
this.connection.exec(cmd, (error, stream) => {
this.connection.exec(cmd, (error, stream: Channel) => {
if (error) {
logger.error('ssh, exec returns an error: ', error);
return reject(error);
}
if (commandName === '_put') {
stream.stdin.write(payload);
stream.stdin.end();
let current = -1;
const packets = payload.map(componentAndObject => componentAndObject.toString());
const writeIntoSocket = () => {
current += 1;

if (current === packets.length) {
stream.end();
return;
}

const nextPacket = `${packets[current]} `;
const canContinue = stream.write(nextPacket);

// wait until stream drains to continue
if (canContinue) writeIntoSocket();
else stream.once('drain', writeIntoSocket);
};
logger.debug(`ssh, _put going to write ${packets.length} packets (components) into remote`);
writeIntoSocket();
logger.debug('ssh, _put finished writing into the remote socket');
}
stream
.on('data', response => {
Expand Down Expand Up @@ -376,7 +393,7 @@ export default class SSH implements Network {
pushMany(manyComponentObjects: ComponentObjects[], context?: Record<string, any>): Promise<string[]> {
// This ComponentObjects.manyToString will handle all the base64 stuff so we won't send this payload
// to the pack command (to prevent duplicate base64)
return this.exec('_put', ComponentObjects.manyToString(manyComponentObjects), context).then((data: string) => {
return this.exec('_put', manyComponentObjects, context).then((data: string) => {
const { payload, headers } = this._unpack(data);
checkVersionCompatibility(headers.version);
return payload.ids;
Expand Down

0 comments on commit 66a5fc4

Please sign in to comment.