Skip to content

Commit

Permalink
Add startError unhealthy check. remove exitpromise and startup task r…
Browse files Browse the repository at this point in the history
…etention (reduce GC pressure)
  • Loading branch information
mceachen committed Feb 28, 2022
1 parent 4d32dad commit 73235be
Showing 1 changed file with 67 additions and 67 deletions.
134 changes: 67 additions & 67 deletions src/BatchProcess.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import child_process from "child_process"
import { until } from "./Async"
import { Deferred } from "./Deferred"
import { cleanError, tryEach } from "./Error"
import { InternalBatchProcessOptions } from "./InternalBatchProcessOptions"
import { Logger } from "./Logger"
Expand Down Expand Up @@ -29,6 +28,7 @@ export type WhyNotHealthy =
| "stdout.error"
| "timeout"
| "tooMany" // < only sent by BatchCluster when maxProcs is reduced
| "startError"
| "unhealthy"
| "worn"

Expand All @@ -44,24 +44,28 @@ export class BatchProcess {
#lastHealthCheck = Date.now()
#healthCheckFailures = 0

readonly #startupTask: Task
readonly startupTaskId: number
readonly #logger: () => Logger
#lastJobFinshedAt = Date.now()

// Only set to true when `proc.pid` is no longer in the process table.
#ended = false
#starting = true

// .#end() started:
#ending = false

// .#end() finished:
#ended = false

// pidExists() returned false
#exited = false

#whyNotHealthy?: WhyNotHealthy

failedTaskCount = 0

#taskCount = -1 // don't count the startupTask

/**
* Supports non-polling notification of process exit
*/
readonly #resolvedOnExit = new Deferred<void>()
/**
* Should be undefined if this instance is not currently processing a task.
*/
Expand Down Expand Up @@ -89,9 +93,9 @@ export class BatchProcess {
this.pid = proc.pid

this.proc.on("error", (err) => this.#onError("proc.error", err))
this.proc.on("close", () => this.#onExit("proc.close"))
this.proc.on("exit", () => this.#onExit("proc.exit"))
this.proc.on("disconnect", () => this.#onExit("proc.disconnect"))
this.proc.on("close", () => this.end(false, "proc.close"))
this.proc.on("exit", () => this.end(false, "proc.exit"))
this.proc.on("disconnect", () => this.end(false, "proc.disconnect"))

const stdin = this.proc.stdin
if (stdin == null) throw new Error("Given proc had no stdin")
Expand All @@ -107,9 +111,10 @@ export class BatchProcess {
stderr.on("data", (err) => this.#onStderr(err))
})

this.#startupTask = new Task(opts.versionCommand, SimpleParser)
const startupTask = new Task(opts.versionCommand, SimpleParser)
this.startupTaskId = startupTask.taskId

if (!this.execTask(this.#startupTask)) {
if (!this.execTask(startupTask)) {
this.opts.observer.emit(
"internalError",
new Error(this.name + " startup task was not submitted")
Expand All @@ -120,33 +125,44 @@ export class BatchProcess {
this.opts.observer.emit("childStart", this)
}

get startupPromise(): Promise<void> {
return this.#startupTask.promise
}

get currentTask(): Task | undefined {
return this.#currentTask
}

get taskCount(): number {
return this.#taskCount
}

get starting(): boolean {
return this.#starting
}

/**
* @return true if `this.end()` has been requested or the child process has
* exited.
* @return true if `this.end()` has been requested (which may be due to the
* child process exiting)
*/
get ending(): boolean {
return this.#ending
}

/**
* @return true if `this.end()` has completed running, which includes child
* process cleanup. Note that this may return `true` and the process table may
* still include the child pid. Call {@link .running()} for an authoritative
* (but expensive!) answer.
*/
get ended(): boolean {
return this.#ended
}

/**
* @return true if the child process has exited and is no longer in the
* process table.
* process table. Note that this may be erroneously false if the process table
* hasn't been checked. Call {@link .running()} for an authoritative (but
* expensive!) answer.
*/
get exited(): boolean {
return this.#resolvedOnExit.settled
}
get exitPromise(): Promise<void> {
return this.#resolvedOnExit.promise
return this.#exited
}

/**
Expand All @@ -157,7 +173,7 @@ export class BatchProcess {
*/
get whyNotHealthy(): WhyNotHealthy | null {
if (this.#whyNotHealthy != null) return this.#whyNotHealthy
if (this.exited) {
if (this.#ended) {
return "ended"
} else if (this.#ending) {
return "ending"
Expand Down Expand Up @@ -234,37 +250,21 @@ export class BatchProcess {
* @return true if the child process is in the process table
*/
async running(): Promise<boolean> {
if (this.#ended) {
// this.dead is only set if the process table has said we're dead.
return false
} else {
const alive = await pidExists(this.pid)
if (!alive) {
// once a PID leaves the process table, it's gone for good:
this.#ended = true
this.#ending = true
this.#resolvedOnExit.resolve()
}
return alive
if (this.#exited) return false

const alive = await pidExists(this.pid)
if (!alive) {
this.#exited = true
// once a PID leaves the process table, it's gone for good.
this.end(false, "proc.exit")
}
return alive
}

notRunning(): Promise<boolean> {
return this.running().then((ea) => !ea)
}

/**
* @return {boolean} true if `this.end()` has been requested or the child
* process has exited.
*/
async ended(): Promise<boolean> {
return this.#ended || (await this.notRunning())
}

async notEnded(): Promise<boolean> {
return this.ended().then((ea) => !ea)
}

maybeRunHealthcheck(): Task | undefined {
if (
this.ready &&
Expand Down Expand Up @@ -301,7 +301,7 @@ export class BatchProcess {
this.#taskCount++
this.#currentTask = task
const cmd = ensureSuffix(task.command, "\n")
const isStartupTask = task.taskId === this.#startupTask.taskId
const isStartupTask = task.taskId === this.startupTaskId
const taskTimeoutMs = isStartupTask
? this.opts.spawnTimeoutMillis
: this.opts.taskTimeoutMillis
Expand All @@ -318,20 +318,26 @@ export class BatchProcess {
void task.promise.then(
() => {
this.#clearCurrentTask(task)
// this.#logger().trace("task completed", { task })

if (!isStartupTask) {
if (isStartupTask) {
// no need to emit taskResolved for startup tasks.
this.#starting = false
} else {
this.opts.observer.emit("taskResolved", task, this)
}
// Call _after_ we've cleared the current task:
this.onIdle()
},
(err) => {
(error) => {
this.#clearCurrentTask(task)
// this.#logger().trace("task failed", { task, err: error })

if (isStartupTask) {
this.opts.observer.emit("startError", err)
this.opts.observer.emit("startError", error)
this.end(false, "startError")
} else {
this.opts.observer.emit("taskError", err, task, this)
this.opts.observer.emit("taskError", error, task, this)
}

// Call _after_ we've cleared the current task:
Expand Down Expand Up @@ -378,14 +384,13 @@ export class BatchProcess {
return undefined
} else {
this.#ending = true
this.opts.observer.emit("childEnd", this, reason)
return this.#end(gracefully)
return this.#end(gracefully, reason)
}
}

// NOTE: Must only be invoked by this.end(), and only expected to be invoked
// once per instance.
async #end(gracefully = true): Promise<void> {
async #end(gracefully: boolean, reason: WhyNotHealthy) {
const lastTask = this.#currentTask
this.#clearCurrentTask()

Expand All @@ -395,7 +400,9 @@ export class BatchProcess {

// NOTE: holy crap there are a lot of notes here.

if (lastTask != null) {
// We don't need to wait for the startup task to complete, and we certainly
// don't need to fuss about ending when we're just getting started.
if (lastTask != null && lastTask.taskId !== this.startupTaskId) {
try {
// Let's wait for the process to complete and the streams to flush, as
// that may actually allow the task to complete successfully. Let's not
Expand Down Expand Up @@ -430,7 +437,7 @@ export class BatchProcess {
this.opts.cleanupChildProcs &&
gracefully &&
this.opts.endGracefulWaitTimeMillis > 0 &&
(await this.running())
!this.#exited
) {
// Wait for the end command to take effect:
await this.#awaitNotRunning(this.opts.endGracefulWaitTimeMillis / 2)
Expand All @@ -451,9 +458,8 @@ export class BatchProcess {
)
await kill(this.proc.pid, true)
}
// OK, we're all cleaned up: parent may be able to spin up another worker:
this.onIdle()
return this.#resolvedOnExit
this.#ended = true
this.opts.observer.emit("childEnd", this, reason)
}

#awaitNotRunning(timeout: number) {
Expand Down Expand Up @@ -521,12 +527,6 @@ export class BatchProcess {
}
}

#onExit(reason: WhyNotHealthy) {
this.#resolvedOnExit.resolve()
// no need to be graceful, it's just for bookkeeping:
return this.end(false, reason)
}

#onStderr(data: string | Buffer) {
if (blank(data)) return
this.#logger().warn(this.name + ".onStderr(): " + data)
Expand Down

0 comments on commit 73235be

Please sign in to comment.