Skip to content

Commit

Permalink
refactor: move exec back into compression implementation
Browse files Browse the repository at this point in the history
By passing the tar details to the deflator
  • Loading branch information
Dominic Scheirlinck authored and dominics committed Jan 12, 2022
1 parent aee7492 commit 898db23
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 86 deletions.
19 changes: 10 additions & 9 deletions src/artifacts/compression/compression.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
import stream from 'stream';
import { ExecaReturnValue } from 'execa';
import execa from 'execa';
import { Artifact } from '../model';

export interface Compression {
extension: string;
export type TarInputArgs = { argv: string[]; input: string } | { file: string };

export interface Compression {
/**
* inflate just does the inflation, in place
* inflate decompresses an input stream (usually an in-progress artifact download), writing decompressed files to disk
* at the given outputPath (usually the working dir)
*/
inflate(input: stream.Readable, outputPath?: string): Promise<ExecaReturnValue>;
inflate(input: stream.Readable, outputPath?: string): Promise<execa.ExecaReturnValue>;

/**
* Returns a command (argv) that deflates from stdin to the given outputPath
*
* The return value is an argv that can be sent to a `sh -c`-style shell interpreter
* deflate either takes a tar, or creates on on the fly, and passes this to a compression algorithm, outputting the
* desired artifact
*/
deflateCmd(outputPath: string): Promise<string[]>;
deflate(output: Artifact, tarInputArgs: TarInputArgs): Promise<execa.ExecaChildProcess>;
}
15 changes: 8 additions & 7 deletions src/artifacts/compression/desync.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import execa, { ExecaReturnValue } from 'execa';
import rimrafCb from 'rimraf';
import tempy from 'tempy';
import { hasBin } from '../../util/exec';
import { Compression } from './compression';
import { Artifact } from '../model';
import { Compression, TarInputArgs } from './compression';
import { execFromTar } from './tar';

const log = debug('monofo:artifact:compression:desync');

Expand Down Expand Up @@ -192,15 +194,14 @@ async function checkEnabled(): Promise<void> {
}

export const desync: Compression = {
extension: 'caidx',

/**
* Deflate a tar file, creating a content-addressed index file
*/
async deflateCmd(outputPath: string): Promise<string[]> {
async deflate(output: Artifact, tarInputArgs: TarInputArgs): Promise<execa.ExecaChildProcess> {
await checkEnabled();

return [
return execFromTar(tarInputArgs, [
'|',
'desync',
'tar',
'--config',
Expand All @@ -212,9 +213,9 @@ export const desync: Compression = {
'--index',
'--store',
store(),
outputPath,
output.filename,
'-',
];
]);
},

/**
Expand Down
10 changes: 5 additions & 5 deletions src/artifacts/compression/gzip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import debug from 'debug';
import execa, { ExecaReturnValue } from 'execa';
import { hasBin } from '../../util/exec';
import { tar } from '../../util/tar';
import { Compression } from './compression';
import { Artifact } from '../model';
import { Compression, TarInputArgs } from './compression';
import { execFromTar } from './tar';

const log = debug('monofo:artifact:compression:gzip');

Expand All @@ -20,11 +22,9 @@ async function checkEnabled() {
}

export const gzip: Compression = {
extension: 'tar.gz',

async deflateCmd(outputPath: string): Promise<string[]> {
async deflate(output: Artifact, tarInputArgs: TarInputArgs): Promise<execa.ExecaChildProcess> {
await checkEnabled();
return ['gzip', '>', outputPath];
return execFromTar(tarInputArgs, ['|', 'gzip', '>', output.filename]);
},

async inflate(input: stream.Readable, outputPath = '.'): Promise<ExecaReturnValue> {
Expand Down
59 changes: 25 additions & 34 deletions src/artifacts/compression/index.ts
Original file line number Diff line number Diff line change
@@ -1,60 +1,51 @@
import stream from 'stream';
import debug from 'debug';
import execa, { ExecaChildProcess } from 'execa';
import execa from 'execa';
import { exec } from '../../util/exec';
import { Artifact } from '../model';
import { Compression } from './compression';
import { Compression, TarInputArgs } from './compression';
import { desync } from './desync';
import { gzip } from './gzip';
import { lz4 } from './lz4';
import { tar } from './tar';

const log = debug('monofo:artifact:compression');

// TODO: use file extension information automatically
export * from './compression';

export const compressors: Record<string, Compression> = {
desync,
gzip,
lz4,
caidx: desync,
'tar.gz': gzip,
'tar.lz4': lz4,
tar,
};

export function deflateCmd(artifact: Artifact): Promise<string[]> {
switch (artifact.ext) {
case 'tar':
return Promise.resolve(['cat', '>', artifact.filename]);
case compressors.gzip.extension:
return compressors.gzip.deflateCmd(artifact.filename);
case compressors.lz4.extension:
return compressors.lz4.deflateCmd(artifact.filename);
case compressors.desync.extension:
return compressors.desync.deflateCmd(artifact.filename);
default:
throw new Error(`Unsupported artifact format: ${artifact.ext}`);
export function deflator(output: Artifact, tarInputArgs: TarInputArgs): Promise<execa.ExecaChildProcess> {
const compressor = compressors?.[output.ext];

if (!compressor) {
throw new Error(`Unsupported output artifact format: ${output.ext}`);
}

return compressor.deflate(output, tarInputArgs);
}

export async function inflator(
input: stream.Readable,
artifact: Artifact,
outputPath = '.'
): Promise<ExecaChildProcess> {
): Promise<execa.ExecaChildProcess> {
if (artifact.skip) {
log(`Skipping download and inflate for ${artifact.name} because skip is enabled`);
return Promise.resolve(execa('true'));
}

switch (artifact.ext) {
case 'tar':
// eslint-disable-next-line @typescript-eslint/return-await
return Promise.resolve(execa('tar', ['-C', outputPath, '-xf', '-'], { input, stderr: 'inherit' }));
case compressors.gzip.extension:
return compressors.gzip.inflate(input, outputPath);
case compressors.lz4.extension:
return compressors.lz4.inflate(input, outputPath);
case compressors.desync.extension:
return compressors.desync.inflate(input, outputPath);
default:
// eslint-disable-next-line @typescript-eslint/return-await
return Promise.resolve(execa('tee', [artifact.filename], { input, stderr: 'inherit' }));
const compressor = compressors?.[artifact.ext];

if (!compressor) {
log(`Using no compression: inflating ${artifact.name} "as-is"`);
return exec('cat', ['>', artifact.filename], { input });
}
}

export * from './compression';
return compressor.inflate(input, outputPath);
}
11 changes: 6 additions & 5 deletions src/artifacts/compression/lz4.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import debug from 'debug';
import execa, { ExecaReturnValue } from 'execa';
import { hasBin } from '../../util/exec';
import { tar } from '../../util/tar';
import { Compression } from './compression';
import { Artifact } from '../model';
import { Compression, TarInputArgs } from './compression';
import { execFromTar } from './tar';

const log = debug('monofo:artifact:compression:lz4');

Expand All @@ -20,14 +22,13 @@ async function checkEnabled() {
}

export const lz4: Compression = {
extension: 'tar.lz4',

async deflateCmd(outputPath: string): Promise<string[]> {
async deflate(output: Artifact, tarInputArgs: TarInputArgs): Promise<execa.ExecaChildProcess> {
await checkEnabled();
return ['lz4', '-2', '>', outputPath];
return execFromTar(tarInputArgs, ['|', 'lz4', '-2', '>', output.filename]);
},

async inflate(input: stream.Readable, outputPath = '.'): Promise<ExecaReturnValue> {
await checkEnabled();
log(`Inflating .tar.lz4 archive: tar -C ${outputPath} -x --use-compress-program=lz4 -f -`);

const result = await execa(await tar(), ['-C', outputPath, '-x', '--use-compress-program=lz4', '-f', '-'], {
Expand Down
72 changes: 72 additions & 0 deletions src/artifacts/compression/tar.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import stream from 'stream';
import debug from 'debug';
import execa, { ExecaReturnValue } from 'execa';
import { EmptyArgsError, exec, hasBin } from '../../util/exec';
import { tar as tarBin } from '../../util/tar';
import { Compression, TarInputArgs } from './compression';

const log = debug('monofo:artifact:compression:lz4');

let enabled: boolean | undefined;

async function checkEnabled() {
if (enabled === undefined) {
enabled = await hasBin('tar');
}

if (!enabled) {
throw new Error('tar is disabled due to no tar binary found on PATH');
}
}

export function tarExecArgs(tarInputArgs: TarInputArgs): string[] {
if ('file' in tarInputArgs) {
return ['cat', tarInputArgs.file]; // https://porkmail.org/era/unix/award.html#cat
}

return tarInputArgs.argv;
}

export function tarExecOptions(tarInputArgs: TarInputArgs): Partial<execa.Options> {
if ('input' in tarInputArgs) {
return { input: tarInputArgs.input };
}

return {};
}

export function execFromTar(
tarInputArgs: TarInputArgs,
argv: string[],
options: execa.Options = {}
): Promise<execa.ExecaChildProcess> {
const [first, ...rest] = [...tarExecArgs(tarInputArgs), ...argv];

if (!first) {
throw new EmptyArgsError();
}

return exec(first, rest, { ...tarExecOptions(tarInputArgs), ...options });
}

export const tar: Compression = {
async deflate(artifact, tarInputArgs): Promise<execa.ExecaChildProcess> {
await checkEnabled();

return execFromTar(tarInputArgs, ['>', artifact.filename]);
},

async inflate(input: stream.Readable, outputPath = '.'): Promise<ExecaReturnValue> {
await checkEnabled();

log(`Inflating .tar archive: tar -C ${outputPath} -x -f -`);

const result = await exec(await tarBin(), ['-C', outputPath, '-x', '-f', '-'], {
input,
});

log('Finished inflating .tar archive');

return result;
},
};
14 changes: 4 additions & 10 deletions src/commands/deflate.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { promises as fs } from 'fs';
import debug from 'debug';
import execa from 'execa';
import { deflateCmd } from '../artifacts/compression';
import { deflator } from '../artifacts/compression';
import { Artifact } from '../artifacts/model';
import { BaseCommand } from '../command';

Expand Down Expand Up @@ -41,13 +40,8 @@ export default class Deflate extends BaseCommand {
}

const artifact = new Artifact(args.output);
const allArgs: string[] = ['-o', 'pipefail', ';', 'cat', args.tarFile, '|', ...(await deflateCmd(artifact))];
log(`Going to deflate ${args.tarFile}: set ${allArgs.join(' ')}`);

return (
await execa('set', allArgs, {
shell: 'bash',
})
).stdout;

const result = await deflator(artifact, { file: args.tarFile });
return result.stdout;
}
}
24 changes: 16 additions & 8 deletions src/commands/upload.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import { flags as f } from '@oclif/command';
import debug from 'debug';
import { upload } from '../artifacts/api';
import { deflateCmd } from '../artifacts/compression';
import { deflator } from '../artifacts/compression';
import { filesToUpload } from '../artifacts/matcher';
import { Artifact } from '../artifacts/model';
import { BaseCommand, BaseFlags } from '../command';
import { exec } from '../util/exec';
import { count } from '../util/helper';
import { tar } from '../util/tar';

Expand Down Expand Up @@ -104,13 +103,22 @@ locally cached
log(`Uploading ${count(files, 'path')} as ${args.output}`, files);

const tarBin = await tar();
const tarArgs = ['-c', '--sort=name', '--hard-dereference', '--null', '--files-from', '-'];
const allArgs: string[] = ['-o', 'pipefail', ';', tarBin, ...tarArgs, '|', ...(await deflateCmd(artifact))];
const input = `${files.join('\x00')}\x00`;
const tarArgs = [
'set',
'-o',
'pipefail',
';',
tarBin,
'-c',
'--sort=name',
'--hard-dereference',
'--null',
'--files-from',
'-',
];

await exec('set', allArgs, {
input,
});
const input = `${files.join('\x00')}\x00`;
await deflator(artifact, { argv: tarArgs, input });

log(`Archive deflated at ${args.output}`);

Expand Down
12 changes: 11 additions & 1 deletion src/util/exec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,20 @@ export function hasBin(bin: string): Promise<boolean> {
.catch(() => false);
}

export class EmptyArgsError extends Error {
constructor() {
super('Expected argv to contain at least one argument');
}
}

/**
* Announces what it's about to execute, then runs it, returning the output
*/
export async function exec(command: string, args: string[], options: execa.Options): Promise<execa.ExecaChildProcess> {
export async function exec(
command: string,
args: string[],
options: execa.Options = {}
): Promise<execa.ExecaChildProcess> {
const opts: execa.Options = {
shell: 'bash',
stderr: 'inherit',
Expand Down
10 changes: 4 additions & 6 deletions test/artifacts/compression.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import fs from 'fs';
import { promisify } from 'util';
import execa from 'execa';
import rimrafCb from 'rimraf';
import tempy from 'tempy';
import { Compression, compressors, inflator } from '../../src/artifacts/compression';
Expand Down Expand Up @@ -36,12 +35,11 @@ describe('compression', () => {
});

describe('round-trip', () => {
it.each([['gzip'], ['lz4'], ['desync']])('compression algorithm: %s', async (algo) => {
const compression: Compression = compressors[algo];
const compressed = `${dir}/test.${compression.extension}`;
it.each([['tar.gz'], ['tar.lz4'], ['caidx']])('compression algorithm: %s', async (extension) => {
const compression: Compression = compressors[extension];
const compressed = `${dir}/test.${extension}`;

const command: string[] = [getFixturePath('qux.tar'), '|', ...(await compression.deflateCmd(compressed))];
await execa('cat', command, { shell: 'bash' });
await compression.deflate(new Artifact(compressed), { file: getFixturePath('qux.tar') });

expect(fs.existsSync(compressed)).toBe(true);

Expand Down
Loading

0 comments on commit 898db23

Please sign in to comment.