Skip to content

Commit

Permalink
verify that we're not escalating an SIGPIPE to callers
Browse files Browse the repository at this point in the history
  • Loading branch information
mceachen committed Feb 10, 2024
1 parent 4fc97db commit 797774e
Showing 1 changed file with 17 additions and 13 deletions.
30 changes: 17 additions & 13 deletions src/BatchCluster.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ describe("BatchCluster", function () {
events = new Events()
})

process.on("SIGPIPE", (error) => {
internalErrors.push(new Error("process.on(SIGPIPE): " + String(error)))
})

function postAssertions() {
expect(internalErrors).to.eql([], "internal errors")

Expand Down Expand Up @@ -124,7 +128,7 @@ describe("BatchCluster", function () {
// const pendingCommands = bc.pendingTasks.map((ea) => ea.command)
const runningCommands = bc.currentTasks.map((ea) => ea.command)
const busyProcCount = bc.busyProcCount
const pids = await bc.pids()
const pids = bc.pids()
const livingPids = await currentTestPids()

const done =
Expand Down Expand Up @@ -350,7 +354,7 @@ describe("BatchCluster", function () {
await bc.end()
expect(bc.ended).to.eql(true)
expect(bc.isIdle).to.eql(true)
expect((await bc.pids()).length).to.eql(0)
expect(bc.pids().length).to.eql(0)
expect(bc.spawnedProcCount).to.eql(0)
expect(events.events).to.eql(expectedEndEvents)
expect(testPids()).to.eql([])
Expand Down Expand Up @@ -396,7 +400,7 @@ describe("BatchCluster", function () {
let expectedResultCount = 0
const results = await Promise.all(runTasks(bc, maxProcs))
expectedResultCount += maxProcs
const pids = await bc.pids()
const pids = bc.pids()
const iters = Math.floor(
maxProcs * opts.maxTasksPerProcess * 1.5,
)
Expand Down Expand Up @@ -437,13 +441,13 @@ describe("BatchCluster", function () {

// Expect no prior pids to remain, as long as there were before-pids:
if (pids.length > 0)
expect(await bc.pids()).to.not.include.members(pids)
expect(bc.pids()).to.not.include.members(pids)

expect(bc.meanTasksPerProc).to.be.within(
0.25, // because flaky
opts.maxTasksPerProcess,
)
expect((await bc.pids()).length).to.be.lte(maxProcs)
expect(bc.pids().length).to.be.lte(maxProcs)
expect((await currentTestPids()).length).to.be.lte(
bc.spawnedProcCount,
) // because flaky
Expand Down Expand Up @@ -648,7 +652,7 @@ describe("BatchCluster", function () {
maxProcs,
uniqPids: pid2count.size,
pid2count,
bcPids: await bc.pids(),
bcPids: bc.pids(),
})
for (const [, count] of pid2count.entries()) {
expect(count).to.be.within(expectTaskMin, expectedTaskMax)
Expand Down Expand Up @@ -833,7 +837,7 @@ describe("BatchCluster", function () {
await Promise.all(runTasks(bc, opts.maxProcs + 100)),
)
// 0 because we might get unlucky.
expect((await bc.pids()).length).to.be.within(0, opts.maxProcs)
expect(bc.pids().length).to.be.within(0, opts.maxProcs)
await delay(opts.maxProcAgeMillis + 100)
await bc.vacuumProcs()
console.log({
Expand All @@ -844,7 +848,7 @@ describe("BatchCluster", function () {
expect(bc.countEndedChildProcs("idle")).to.eql(0)
expect(bc.countEndedChildProcs("old")).to.be.gte(2)
// Calling .pids calls .procs(), which culls old procs
expect((await bc.pids()).length).to.be.within(0, opts.maxProcs)
expect(bc.pids().length).to.be.within(0, opts.maxProcs)
postAssertions()
})
})
Expand Down Expand Up @@ -874,7 +878,7 @@ describe("BatchCluster", function () {
it("culls idle child procs", async () => {
assertExpectedResults(await Promise.all(runTasks(bc, opts.maxProcs + 10)))
// 0 because we might get unlucky.
expect((await bc.pids()).length).to.be.within(0, opts.maxProcs)
expect(bc.pids().length).to.be.within(0, opts.maxProcs)
// wait long enough for at least 1 process to be idle and get reaped:
await delay(opts.maxIdleMsPerProcess + 100)
await bc.vacuumProcs()
Expand All @@ -887,10 +891,10 @@ describe("BatchCluster", function () {
expect(bc.countEndedChildProcs("old")).to.be.lte(1)
expect(bc.countEndedChildProcs("worn")).to.be.lte(2)
// Calling .pids calls .procs(), which culls old procs
if ((await bc.pids()).length > 0) {
if (bc.pids().length > 0) {
await delay(1000)
}
expect((await bc.pids()).length).to.eql(0)
expect(bc.pids().length).to.eql(0)
postAssertions()
})
})
Expand Down Expand Up @@ -939,10 +943,10 @@ describe("BatchCluster", function () {
}),
)
assertExpectedResults(await Promise.all(runTasks(bc, 2)))
const pidsBefore = await bc.pids()
const pidsBefore = bc.pids()
tk.freeze(start + 7000)
assertExpectedResults(await Promise.all(runTasks(bc, 2)))
const pidsAfter = await bc.pids()
const pidsAfter = bc.pids()
console.dir({ maxProcAgeMillis, pidsBefore, pidsAfter })
exp(pidsBefore, pidsAfter)
postAssertions()
Expand Down

0 comments on commit 797774e

Please sign in to comment.