From 73dbb8fb80f56942d35e5b5cda9715bba5d5c98f Mon Sep 17 00:00:00 2001 From: Ruben Fiszel Date: Fri, 31 Jan 2025 13:24:35 +0100 Subject: [PATCH] feat(cli): add --parallel option to push + display timings --- cli/conf.ts | 1 + cli/script.ts | 49 ++--- cli/sync.ts | 495 ++++++++++++++++++++++++++++---------------------- cli/utils.ts | 16 +- 4 files changed, 316 insertions(+), 245 deletions(-) diff --git a/cli/conf.ts b/cli/conf.ts index eab154fec99c6..2bb3a69b9b861 100644 --- a/cli/conf.ts +++ b/cli/conf.ts @@ -22,6 +22,7 @@ export interface SyncOptions { excludes?: string[]; defaultTs?: "bun" | "deno"; codebases?: Codebase[]; + parallel?: number; } export interface Codebase { diff --git a/cli/script.ts b/cli/script.ts index e6b08e15da1aa..6ab67212e8a0c 100644 --- a/cli/script.ts +++ b/cli/script.ts @@ -193,11 +193,12 @@ export async function handleFile( bundleContent = execSync( codebase.customBundler + " " + path ).toString(); - log.info("Custom bundler executed"); + log.info("Custom bundler executed for " + path); } else { const esbuild = await import("npm:esbuild"); - log.info(`Starting building the bundle for ${path}`); + log.info(`Started bundling ${path} ...`); + const startTime = performance.now(); const out = await esbuild.build({ entryPoints: [path], format: "cjs", @@ -210,19 +211,20 @@ export async function handleFile( packages: "bundle", target: "node20.15.1", }); + const endTime = performance.now(); bundleContent = out.outputFiles[0].text; log.info( - "Bundle size: " + (bundleContent.length / 1024).toFixed(0) + "kB" + `Finished bundling ${path}: ${(bundleContent.length / 1024).toFixed(0)}kB (${(endTime - startTime).toFixed(0)}ms)` ); } if (Array.isArray(codebase.assets) && codebase.assets.length > 0) { const archiveNpm = await import("npm:@ayonli/jsext/archive"); - log.info( - `Using the following asset configuration: ${JSON.stringify( + `Using the following asset configuration for ${path}: ${JSON.stringify( codebase.assets )}` ); + const startTime = performance.now(); const tarball = new archiveNpm.Tarball(); tarball.append( new File([bundleContent], "main.js", { type: "text/plain" }) @@ -233,10 +235,10 @@ export async function handleFile( const file = new File([blob], asset.to); tarball.append(file); } - log.info("Tarball size: " + (tarball.size / 1024).toFixed(0) + "kB"); + const endTime = performance.now(); + log.info(`Finished creating tarball for ${path}: ${(tarball.size / 1024).toFixed(0)}kB (${(endTime - startTime).toFixed(0)}ms)`); bundleContent = tarball; } - log.info(`Finished building the bundle for ${path}`); } let typed = opts?.skipScriptsMetadata ? undefined : @@ -364,24 +366,23 @@ export async function handleFile( } } - log.info( - colors.yellow.bold(`Creating script with a parent ${remotePath}`) - ); + + log.info(`Updating script ${remotePath} ...`); const body = { ...requestBodyCommon, parent_hash: remote.hash, }; - await createScript(bundleContent, workspaceId, body, workspace); + const execTime = await createScript(bundleContent, workspaceId, body, workspace); + log.info(colors.yellow.bold(`Updated script ${remotePath} (${execTime.toFixed(0)}ms)`)); } else { - log.info( - colors.yellow.bold(`Creating script without parent ${remotePath}`) - ); - + log.info(`Creating new script ${remotePath} ...`); const body = { ...requestBodyCommon, parent_hash: undefined, }; - await createScript(bundleContent, workspaceId, body, workspace); + const execTime = await createScript(bundleContent, workspaceId, body, workspace); + log.info(colors.yellow.bold(`Created new script ${remotePath} (${execTime.toFixed(0)}ms)`)); + } return true; } @@ -416,7 +417,8 @@ async function createScript( workspaceId: string, body: NewScript, workspace: Workspace -) { +): Promise { + const start = performance.now(); if (!bundleContent) { try { // no parent hash @@ -427,7 +429,7 @@ async function createScript( } catch (e: any) { throw Error( `Script creation for ${body.path} with parent ${body.parent_hash - } was not successful: ${e.body ?? e.message}` + } was not successful: ${e.body ?? e.message} ` ); } } else { @@ -447,16 +449,17 @@ async function createScript( "/scripts/create_snapshot"; const req = await fetch(url, { method: "POST", - headers: { Authorization: `Bearer ${workspace.token}` }, + headers: { Authorization: `Bearer ${workspace.token} ` }, body: form, }); if (req.status != 201) { throw Error( `Script snapshot creation was not successful: ${req.status} - ${req.statusText - } - ${await req.text()}` + } - ${await req.text()} ` ); } } + return performance.now() - start; } export async function findContentFile(filePath: string) { @@ -868,7 +871,7 @@ async function generateMetadata( } & SyncOptions, scriptPath: string | undefined ) { - log.info("This command only works for workspace scripts, for flows inline scripts use `wmill flow generate-locks`"); + log.info("This command only works for workspace scripts, for flows inline scripts use `wmill flow generate - locks`"); if (scriptPath == "") { scriptPath = undefined; } @@ -924,7 +927,7 @@ async function generateMetadata( ); if (candidate) { hasAny = true; - log.info(colors.green(`+ ${candidate}`)); + log.info(colors.green(`+ ${candidate} `)); } } if (hasAny) { @@ -987,7 +990,7 @@ const command = new Command() .action(bootstrap as any) .command( "generate-metadata", - "re-generate the metadata file updating the lock and the script schema (for flows, use `wmill flow generate-locks`)" + "re-generate the metadata file updating the lock and the script schema (for flows, use `wmill flow generate - locks`)" ) .arguments("[script:file]") .option("--yes", "Skip confirmation prompt") diff --git a/cli/sync.ts b/cli/sync.ts index e8d6bb4d1abee..b691a0d42cd59 100644 --- a/cli/sync.ts +++ b/cli/sync.ts @@ -1390,244 +1390,299 @@ export async function push(opts: GlobalOptions & SyncOptions) { return; } + const start = performance.now(); log.info(colors.gray(`Applying changes to files ...`)); - const alreadySynced: string[] = []; - - - - for await (const change of changes) { - const stateTarget = path.join(Deno.cwd(), ".wmill", change.path); - let stateExists = true; + let stateful = opts.stateful; + if (stateful) { try { - await Deno.stat(stateTarget); + await Deno.stat(path.join(Deno.cwd(), ".wmill")); } catch { - stateExists = false; + stateful = false; } + } - if (change.name === "edited") { - if ( - await handleScriptMetadata( - change.path, - workspace, - alreadySynced, - opts.message, - globalDeps, - codebases, - opts - ) - ) { - if (opts.stateful && stateExists) { - await Deno.writeTextFile(stateTarget, change.after); - } - continue; - } else if ( - await handleFile( - change.path, - workspace, - alreadySynced, - opts.message, - opts, - globalDeps, - codebases - ) - ) { - if (opts.stateful && stateExists) { - await Deno.writeTextFile(stateTarget, change.after); - } - continue; - } - if (opts.stateful) { - await ensureDir(path.dirname(stateTarget)); - log.info(`Editing ${getTypeStrFromPath(change.path)} ${change.path}`); - } + // Group changes by base path (before first dot) + const groupedChanges = new Map(); + for (const change of changes) { + const basePath = change.path.split('.')[0]; + if (!groupedChanges.has(basePath)) { + groupedChanges.set(basePath, []); + } + groupedChanges.get(basePath)!.push(change); + } - if (isFileResource(change.path)) { - const resourceFilePath = await findResourceFile(change.path); - if (!alreadySynced.includes(resourceFilePath)) { - alreadySynced.push(resourceFilePath); - const newObj = parseFromPath( - resourceFilePath, - await Deno.readTextFile(resourceFilePath) - ); - await pushResource( - workspace.workspaceId, - resourceFilePath, - undefined, - newObj - ); - if (opts.stateful && stateExists) { - await Deno.writeTextFile(stateTarget, change.after); + let parallelizationFactor = opts.parallel ?? 1; + if (parallelizationFactor <= 0) { + parallelizationFactor = 1; + } + const groupedChangesArray = Array.from(groupedChanges.entries()); + log.info(`found changes for ${groupedChangesArray.length} items with a total of ${groupedChangesArray.reduce((acc, [_, changes]) => acc + changes.length, 0)} files to process`); + if (parallelizationFactor > 1) { + log.info(`Parallelizing ${parallelizationFactor} changes at a time`); + } + + // Create a pool of workers that processes items as they become available + const pool = new Set(); + const queue = [...groupedChangesArray]; + + while (queue.length > 0 || pool.size > 0) { + // Fill the pool until we reach parallelizationFactor + while (pool.size < parallelizationFactor && queue.length > 0) { + const [_basePath, changes] = queue.shift()!; + const promise = (async () => { + const alreadySynced: string[] = []; + + for await (const change of changes) { + let stateTarget = undefined; + if (stateful) { + try { + stateTarget = path.join(Deno.cwd(), ".wmill", change.path); + await Deno.stat(stateTarget); + } catch { + stateTarget = undefined; + } } - continue; - } - } - const oldObj = parseFromPath(change.path, change.before); - const newObj = parseFromPath(change.path, change.after); - - await pushObj( - workspace.workspaceId, - change.path, - oldObj, - newObj, - opts.plainSecrets ?? false, - alreadySynced, - opts.message - ); - if (opts.stateful && stateExists) { - await Deno.writeTextFile(stateTarget, change.after); - } - } else if (change.name === "added") { - if ( - change.path.endsWith(".script.json") || - change.path.endsWith(".script.yaml") || - change.path.endsWith(".lock") || - isFileResource(change.path) - ) { - continue; - } else if ( - await handleFile( - change.path, - workspace, - alreadySynced, - opts.message, - opts, - globalDeps, - codebases - ) - ) { - continue; - } - if (opts.stateful && stateExists) { - await ensureDir(path.dirname(stateTarget)); - log.info(`Adding ${getTypeStrFromPath(change.path)} ${change.path}`); - } - const obj = parseFromPath(change.path, change.content); - await pushObj( - workspace.workspaceId, - change.path, - undefined, - obj, - opts.plainSecrets ?? false, - [], - opts.message - ); + if (change.name === "edited") { + if ( + await handleScriptMetadata( + change.path, + workspace, + alreadySynced, + opts.message, + globalDeps, + codebases, + opts + ) + ) { + if (stateTarget) { + await Deno.writeTextFile(stateTarget, change.after); + } + continue; + } else if ( + await handleFile( + change.path, + workspace, + alreadySynced, + opts.message, + opts, + globalDeps, + codebases + ) + ) { + if (stateTarget) { + await Deno.writeTextFile(stateTarget, change.after); + } + continue; + } + if (stateTarget) { + await ensureDir(path.dirname(stateTarget)); + log.info(`Editing ${getTypeStrFromPath(change.path)} ${change.path}`); + } - if (opts.stateful && stateExists) { - await Deno.writeTextFile(stateTarget, change.content); - } - } else if (change.name === "deleted") { - if (change.path.endsWith(".lock")) { - continue; - } - const typ = getTypeStrFromPath(change.path); + if (isFileResource(change.path)) { + const resourceFilePath = await findResourceFile(change.path); + if (!alreadySynced.includes(resourceFilePath)) { + alreadySynced.push(resourceFilePath); + + const newObj = parseFromPath( + resourceFilePath, + await Deno.readTextFile(resourceFilePath) + ); + + await pushResource( + workspace.workspaceId, + resourceFilePath, + undefined, + newObj + ); + if (stateTarget) { + await Deno.writeTextFile(stateTarget, change.after); + } + continue; + } + } + const oldObj = parseFromPath(change.path, change.before); + const newObj = parseFromPath(change.path, change.after); + + await pushObj( + workspace.workspaceId, + change.path, + oldObj, + newObj, + opts.plainSecrets ?? false, + alreadySynced, + opts.message + ); - if (typ == "script") { - log.info(`Archiving ${typ} ${change.path}`); - } else { - log.info(`Deleting ${typ} ${change.path}`); - } - const workspaceId = workspace.workspaceId; - const target = change.path.replaceAll(SEP, "/"); - switch (typ) { - case "script": { - const script = await wmill.getScriptByPath({ - workspace: workspaceId, - path: removeExtensionToPath(target), - }); - await wmill.archiveScriptByHash({ - workspace: workspaceId, - hash: script.hash, - }); - break; - } - case "folder": - await wmill.deleteFolder({ - workspace: workspaceId, - name: change.path.split(SEP)[1], - }); - break; - case "resource": - await wmill.deleteResource({ - workspace: workspaceId, - path: removeSuffix(target, ".resource.json"), - }); - break; - case "resource-type": - await wmill.deleteResourceType({ - workspace: workspaceId, - path: removeSuffix(target, ".resource-type.json"), - }); - break; - case "flow": - await wmill.deleteFlowByPath({ - workspace: workspaceId, - path: removeSuffix(target, ".flow/flow.json"), - }); - break; - case "app": - await wmill.deleteApp({ - workspace: workspaceId, - path: removeSuffix(target, ".app/app.json"), - }); - break; - case "schedule": - await wmill.deleteSchedule({ - workspace: workspaceId, - path: removeSuffix(target, ".schedule.json"), - }); - break; - case "variable": - await wmill.deleteVariable({ - workspace: workspaceId, - path: removeSuffix(target, ".variable.json"), - }); - break; - case "user": { - const users = await wmill.listUsers({ - workspace: workspaceId, - }); - - const email = removeSuffix( - removePathPrefix(change.path, "users"), - ".user.json" - ); - const user = users.find((u) => u.email === email); - if (!user) { - throw new Error(`User ${email} not found`); + if (stateTarget) { + await Deno.writeTextFile(stateTarget, change.after); + } + } else if (change.name === "added") { + if ( + change.path.endsWith(".script.json") || + change.path.endsWith(".script.yaml") || + change.path.endsWith(".lock") || + isFileResource(change.path) + ) { + continue; + } else if ( + await handleFile( + change.path, + workspace, + alreadySynced, + opts.message, + opts, + globalDeps, + codebases + ) + ) { + continue; + } + if (stateTarget) { + await ensureDir(path.dirname(stateTarget)); + log.info(`Adding ${getTypeStrFromPath(change.path)} ${change.path}`); + } + const obj = parseFromPath(change.path, change.content); + await pushObj( + workspace.workspaceId, + change.path, + undefined, + obj, + opts.plainSecrets ?? false, + [], + opts.message + ); + + if (stateTarget) { + await Deno.writeTextFile(stateTarget, change.content); + } + } else if (change.name === "deleted") { + if (change.path.endsWith(".lock")) { + continue; + } + const typ = getTypeStrFromPath(change.path); + + if (typ == "script") { + log.info(`Archiving ${typ} ${change.path}`); + } else { + log.info(`Deleting ${typ} ${change.path}`); + } + const workspaceId = workspace.workspaceId; + const target = change.path.replaceAll(SEP, "/"); + switch (typ) { + case "script": { + const script = await wmill.getScriptByPath({ + workspace: workspaceId, + path: removeExtensionToPath(target), + }); + await wmill.archiveScriptByHash({ + workspace: workspaceId, + hash: script.hash, + }); + break; + } + case "folder": + await wmill.deleteFolder({ + workspace: workspaceId, + name: change.path.split(SEP)[1], + }); + break; + case "resource": + await wmill.deleteResource({ + workspace: workspaceId, + path: removeSuffix(target, ".resource.json"), + }); + break; + case "resource-type": + await wmill.deleteResourceType({ + workspace: workspaceId, + path: removeSuffix(target, ".resource-type.json"), + }); + break; + case "flow": + await wmill.deleteFlowByPath({ + workspace: workspaceId, + path: removeSuffix(target, ".flow/flow.json"), + }); + break; + case "app": + await wmill.deleteApp({ + workspace: workspaceId, + path: removeSuffix(target, ".app/app.json"), + }); + break; + case "schedule": + await wmill.deleteSchedule({ + workspace: workspaceId, + path: removeSuffix(target, ".schedule.json"), + }); + break; + case "variable": + await wmill.deleteVariable({ + workspace: workspaceId, + path: removeSuffix(target, ".variable.json"), + }); + break; + case "user": { + const users = await wmill.listUsers({ + workspace: workspaceId, + }); + + const email = removeSuffix( + removePathPrefix(change.path, "users"), + ".user.json" + ); + const user = users.find((u) => u.email === email); + if (!user) { + throw new Error(`User ${email} not found`); + } + await wmill.deleteUser({ + workspace: workspaceId, + username: user.username, + }); + break; + } + case "group": + await wmill.deleteGroup({ + workspace: workspaceId, + name: removeSuffix( + removePathPrefix(change.path, "groups"), + ".group.json" + ), + }); + break; + default: + break; + } + if (stateTarget) { + try { + await Deno.remove(stateTarget); + } catch { + // state target may not exist already + } + } } - await wmill.deleteUser({ - workspace: workspaceId, - username: user.username, - }); - break; } - case "group": - await wmill.deleteGroup({ - workspace: workspaceId, - name: removeSuffix( - removePathPrefix(change.path, "groups"), - ".group.json" - ), - }); - break; - default: - break; - } - try { - await Deno.remove(stateTarget); - } catch { - // state target may not exist already - } + })(); + + pool.add(promise); + // Remove from pool when complete + promise.then(() => pool.delete(promise)); + } + + // Wait for at least one task to complete before continuing + if (pool.size > 0) { + await Promise.race(pool); } } log.info( colors.bold.green.underline( - `\nDone! All ${changes.length} changes pushed to the remote workspace ${workspace.workspaceId} named ${workspace.name}.` + `\nDone! All ${changes.length} changes pushed to the remote workspace ${workspace.workspaceId} named ${workspace.name} (${(performance.now() - start).toFixed(0)}ms)` ) ); } @@ -1699,6 +1754,10 @@ const command = new Command() "--message ", "Include a message that will be added to all scripts/flows/apps updated during this push" ) + .option( + "--parallel ", + "Number of changes to process in parallel" + ) // deno-lint-ignore no-explicit-any .action(push as any); diff --git a/cli/utils.ts b/cli/utils.ts index fcb07bea9ebe1..027ff7ce00daf 100644 --- a/cli/utils.ts +++ b/cli/utils.ts @@ -15,7 +15,7 @@ export function deepEqual(a: T, b: T): boolean { if (Array.isArray(a)) { length = a.length; if (length != b.length) return false; - for (i = length; i-- !== 0; ) { + for (i = length; i-- !== 0;) { if (!deepEqual(a[i], b[i])) return false; } return true; @@ -43,7 +43,7 @@ export function deepEqual(a: T, b: T): boolean { if (ArrayBuffer.isView(a) && ArrayBuffer.isView(b)) { length = a.length; if (length != b.length) return false; - for (i = length; i-- !== 0; ) { + for (i = length; i-- !== 0;) { if (a[i] !== b[i]) return false; } return true; @@ -66,11 +66,11 @@ export function deepEqual(a: T, b: T): boolean { length = keys.length; if (length !== Object.keys(b).length) return false; - for (i = length; i-- !== 0; ) { + for (i = length; i-- !== 0;) { if (!Object.prototype.hasOwnProperty.call(b, keys[i])) return false; } - for (i = length; i-- !== 0; ) { + for (i = length; i-- !== 0;) { const key = keys[i]; if (!deepEqual(a[key], b[key])) return false; } @@ -142,3 +142,11 @@ export function isFileResource(path: string): boolean { splitPath[2] == "file" ); } + +export function printSync(input: string | Uint8Array, to = Deno.stdout) { + let bytesWritten = 0 + const bytes = typeof input === 'string' ? new TextEncoder().encode(input) : input + while (bytesWritten < bytes.length) { + bytesWritten += to.writeSync(bytes.subarray(bytesWritten)) + } +} \ No newline at end of file