Skip to content

Commit

Permalink
A small refactor on the WorkerPool interface.
Browse files Browse the repository at this point in the history
It's mainly about the naming.

We removed the WorkerPoolImplLegacy file a while ago. This is a follow-up for that task to rename the interface into the style of the new WorkerPoolImpl.

PiperOrigin-RevId: 715795843
Change-Id: Ief13bb257b326c49208a1c05587202e0d15d1963
  • Loading branch information
bigelephant29 authored and copybara-github committed Jan 15, 2025
1 parent b35bf58 commit 15f9d85
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void invalidateAndClose(@Nullable Exception e) throws IOException, Interr
worker.getStatus().maybeUpdateStatus(Status.PENDING_KILL_DUE_TO_UNKNOWN);
}

manager.workerPool.invalidateObject(request.getResourceSet().getWorkerKey(), worker);
manager.workerPool.invalidateWorker(worker);
worker = null;
this.close();
}
Expand Down Expand Up @@ -441,7 +441,7 @@ private synchronized Worker incrementResources(ResourceRequest request)
windowEstimationCpu += resources.getResources().getOrDefault(ResourceSet.CPU, 0.0);
usedLocalTestCount += resources.getLocalTestCount();
if (resources.getWorkerKey() != null) {
return this.workerPool.borrowObject(resources.getWorkerKey());
return this.workerPool.borrowWorker(resources.getWorkerKey());
}

runningActions++;
Expand Down Expand Up @@ -532,7 +532,7 @@ private synchronized ResourceLatch acquire(ResourceRequest request)
private synchronized void release(ResourceRequest request, @Nullable Worker worker)
throws IOException, InterruptedException {
if (worker != null) {
this.workerPool.returnObject(worker.getWorkerKey(), worker);
this.workerPool.returnWorker(worker.getWorkerKey(), worker);
}

ResourceSet resources = request.getResourceSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,23 +72,22 @@ ImmutableSet<Integer> evictWorkers(ImmutableSet<Integer> workerIdsToEvict)
*
* @param key the worker key.
*/
Worker borrowObject(WorkerKey key) throws IOException, InterruptedException;
Worker borrowWorker(WorkerKey key) throws IOException, InterruptedException;

/**
* Returns an active worker back to the pool.
*
* @param key the worker key.
* @param obj the worker to be returned.
* @param worker the worker to be returned.
*/
void returnObject(WorkerKey key, Worker obj);
void returnWorker(WorkerKey key, Worker worker);

/**
* Invalidates the worker, thus destroying it.
*
* @param key the worker key.
* @param obj the worker to be invalidated.
* @param worker the worker to be invalidated.
*/
void invalidateObject(WorkerKey key, Worker obj) throws InterruptedException;
void invalidateWorker(Worker worker) throws InterruptedException;

void reset();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,27 +116,18 @@ public ImmutableSet<Integer> getIdleWorkers() throws InterruptedException {
}

@Override
public Worker borrowObject(WorkerKey key) throws IOException, InterruptedException {
public Worker borrowWorker(WorkerKey key) throws IOException, InterruptedException {
return getPool(key).borrowWorker(key);
}

@Override
public void returnObject(WorkerKey key, Worker obj) {
public void returnWorker(WorkerKey key, Worker obj) {
getPool(key).returnWorker(key, /* worker= */ obj);
}

@Override
public void invalidateObject(WorkerKey key, Worker obj) throws InterruptedException {
invalidateWorker(
/* worker= */ obj, /* shouldShrinkPool= */ obj.getStatus().isPendingEviction());
}

/**
* TODO(b/323880131): This should be the main interface once the we remove the legacy worker pool
* implementation.
*/
private void invalidateWorker(Worker worker, boolean shouldShrinkPool) {
getPool(worker.getWorkerKey()).invalidateWorker(worker, shouldShrinkPool);
public void invalidateWorker(Worker worker) throws InterruptedException {
getPool(worker.getWorkerKey()).invalidateWorker(worker, worker.getStatus().isPendingEviction());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ public void testEvictWorkers_doNothing_lowMemoryUsage() throws Exception {
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 1), emptyEntryList()));
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowObject(key);
workerPool.returnObject(key, w1);
Worker w1 = workerPool.borrowWorker(key);
workerPool.returnWorker(key, w1);
ImmutableList<WorkerProcessMetrics> workerMetrics =
ImmutableList.of(createWorkerMetric(w1, PROCESS_ID_1, /* memoryInKb= */ 1000));
WorkerOptions options = new WorkerOptions();
Expand All @@ -138,8 +138,8 @@ public void testEvictWorkers_doNothing_zeroThreshold() throws Exception {
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 1), emptyEntryList()));
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowObject(key);
workerPool.returnObject(key, w1);
Worker w1 = workerPool.borrowWorker(key);
workerPool.returnWorker(key, w1);

ImmutableList<WorkerProcessMetrics> workerMetrics =
ImmutableList.of(createWorkerMetric(w1, PROCESS_ID_1, /* memoryInKb= */ 1000));
Expand All @@ -165,8 +165,8 @@ public void testEvictWorkers_doNothing_emptyMetrics() throws Exception {
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 1), emptyEntryList()));
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowObject(key);
workerPool.returnObject(key, w1);
Worker w1 = workerPool.borrowWorker(key);
workerPool.returnWorker(key, w1);

ImmutableList<WorkerProcessMetrics> workerMetrics = ImmutableList.of();
WorkerOptions options = new WorkerOptions();
Expand All @@ -191,8 +191,8 @@ public void testGetEvictionCandidates_selectOnlyWorker() throws Exception {
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 1), emptyEntryList()));
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowObject(key);
workerPool.returnObject(key, w1);
Worker w1 = workerPool.borrowWorker(key);
workerPool.returnWorker(key, w1);
ImmutableList<WorkerProcessMetrics> workerMetrics =
ImmutableList.of(createWorkerMetric(w1, PROCESS_ID_1, /* memoryInKb= */ 2000));
WorkerOptions options = new WorkerOptions();
Expand All @@ -218,12 +218,12 @@ public void testGetEvictionCandidates_evictLargestWorkers() throws Exception {
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 3), emptyEntryList()));
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowObject(key);
Worker w2 = workerPool.borrowObject(key);
Worker w3 = workerPool.borrowObject(key);
workerPool.returnObject(key, w1);
workerPool.returnObject(key, w2);
workerPool.returnObject(key, w3);
Worker w1 = workerPool.borrowWorker(key);
Worker w2 = workerPool.borrowWorker(key);
Worker w3 = workerPool.borrowWorker(key);
workerPool.returnWorker(key, w1);
workerPool.returnWorker(key, w2);
workerPool.returnWorker(key, w3);

ImmutableList<WorkerProcessMetrics> workerMetrics =
ImmutableList.of(
Expand Down Expand Up @@ -258,14 +258,14 @@ public void testGetEvictionCandidates_numberOfWorkersIsMoreThanDefaultNumTests()
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 4), emptyEntryList()));
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowObject(key);
Worker w2 = workerPool.borrowObject(key);
Worker w3 = workerPool.borrowObject(key);
Worker w4 = workerPool.borrowObject(key);
workerPool.returnObject(key, w1);
workerPool.returnObject(key, w2);
workerPool.returnObject(key, w3);
workerPool.returnObject(key, w4);
Worker w1 = workerPool.borrowWorker(key);
Worker w2 = workerPool.borrowWorker(key);
Worker w3 = workerPool.borrowWorker(key);
Worker w4 = workerPool.borrowWorker(key);
workerPool.returnWorker(key, w1);
workerPool.returnWorker(key, w2);
workerPool.returnWorker(key, w3);
workerPool.returnWorker(key, w4);

ImmutableList<WorkerProcessMetrics> workerMetrics =
ImmutableList.of(
Expand Down Expand Up @@ -300,12 +300,12 @@ public void testGetEvictionCandidates_evictWorkerWithSameMenmonicButDifferentKey
WorkerKey key1 = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
WorkerKey key2 = createWorkerKey(DUMMY_MNEMONIC, fileSystem, true);

Worker w1 = workerPool.borrowObject(key1);
Worker w2 = workerPool.borrowObject(key2);
Worker w3 = workerPool.borrowObject(key2);
workerPool.returnObject(key1, w1);
workerPool.returnObject(key2, w2);
workerPool.returnObject(key2, w3);
Worker w1 = workerPool.borrowWorker(key1);
Worker w2 = workerPool.borrowWorker(key2);
Worker w3 = workerPool.borrowWorker(key2);
workerPool.returnWorker(key1, w1);
workerPool.returnWorker(key2, w2);
workerPool.returnWorker(key2, w3);

ImmutableList<WorkerProcessMetrics> workerMetrics =
ImmutableList.of(
Expand Down Expand Up @@ -341,11 +341,11 @@ public void testGetEvictionCandidates_evictOnlyIdleWorkers() throws Exception {
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 3), emptyEntryList()));
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowObject(key);
Worker w2 = workerPool.borrowObject(key);
Worker w3 = workerPool.borrowObject(key);
workerPool.returnObject(key, w1);
workerPool.returnObject(key, w2);
Worker w1 = workerPool.borrowWorker(key);
Worker w2 = workerPool.borrowWorker(key);
Worker w3 = workerPool.borrowWorker(key);
workerPool.returnWorker(key, w1);
workerPool.returnWorker(key, w2);

ImmutableList<WorkerProcessMetrics> workerMetrics =
ImmutableList.of(
Expand Down Expand Up @@ -382,14 +382,14 @@ public void testGetEvictionCandidates_evictDifferentWorkerKeys() throws Exceptio
new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 2, "smart", 2), emptyEntryList()));
WorkerKey key1 = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
WorkerKey key2 = createWorkerKey("smart", fileSystem);
Worker w1 = workerPool.borrowObject(key1);
Worker w2 = workerPool.borrowObject(key1);
Worker w3 = workerPool.borrowObject(key2);
Worker w4 = workerPool.borrowObject(key2);
workerPool.returnObject(key1, w1);
workerPool.returnObject(key1, w2);
workerPool.returnObject(key2, w3);
workerPool.returnObject(key2, w4);
Worker w1 = workerPool.borrowWorker(key1);
Worker w2 = workerPool.borrowWorker(key1);
Worker w3 = workerPool.borrowWorker(key2);
Worker w4 = workerPool.borrowWorker(key2);
workerPool.returnWorker(key1, w1);
workerPool.returnWorker(key1, w2);
workerPool.returnWorker(key2, w3);
workerPool.returnWorker(key2, w4);

ImmutableList<WorkerProcessMetrics> workerMetrics =
ImmutableList.of(
Expand Down Expand Up @@ -417,8 +417,8 @@ public void testGetEvictionCandidates_evictDifferentWorkerKeys() throws Exceptio
assertThat(workerPool.getIdleWorkers()).containsExactly(w1.getWorkerId(), w4.getWorkerId());
assertThat(workerPool.getNumActive(key1)).isEqualTo(0);
assertThat(workerPool.getNumActive(key2)).isEqualTo(0);
assertThat(workerPool.borrowObject(key1).getWorkerId()).isEqualTo(w1.getWorkerId());
assertThat(workerPool.borrowObject(key2).getWorkerId()).isEqualTo(w4.getWorkerId());
assertThat(workerPool.borrowWorker(key1).getWorkerId()).isEqualTo(w1.getWorkerId());
assertThat(workerPool.borrowWorker(key2).getWorkerId()).isEqualTo(w4.getWorkerId());
assertThat(w1.getStatus().isValid()).isTrue();
assertThat(w2.getStatus().get()).isEqualTo(Status.KILLED_DUE_TO_MEMORY_PRESSURE);
assertThat(w3.getStatus().get()).isEqualTo(Status.KILLED_DUE_TO_MEMORY_PRESSURE);
Expand All @@ -431,8 +431,8 @@ public void testGetEvictionCandidates_testDoomedWorkers() throws Exception {
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 2), emptyEntryList()));
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowObject(key);
Worker w2 = workerPool.borrowObject(key);
Worker w1 = workerPool.borrowWorker(key);
Worker w2 = workerPool.borrowWorker(key);

ImmutableList<WorkerProcessMetrics> workerMetrics =
ImmutableList.of(
Expand All @@ -456,7 +456,7 @@ public void testGetEvictionCandidates_testDoomedWorkers() throws Exception {
assertThat(w2.getStatus().get()).isEqualTo(Status.PENDING_KILL_DUE_TO_MEMORY_PRESSURE);

// Return only one worker.
workerPool.returnObject(key, w1);
workerPool.returnWorker(key, w1);

// w1 gets destroyed when it is returned, so there are 0 idle workers.
assertThat(workerPool.getIdleWorkers()).isEmpty();
Expand All @@ -467,7 +467,7 @@ public void testGetEvictionCandidates_testDoomedWorkers() throws Exception {
assertThat(w2.getStatus().get()).isEqualTo(Status.PENDING_KILL_DUE_TO_MEMORY_PRESSURE);

// Return the remaining worker.
workerPool.returnObject(key, w2);
workerPool.returnWorker(key, w2);
assertThat(w2.getStatus().get()).isEqualTo(Status.KILLED_DUE_TO_MEMORY_PRESSURE);
}

Expand All @@ -477,13 +477,13 @@ public void testGetEvictionCandidates_testDoomedAndIdleWorkers() throws Exceptio
new WorkerPoolImpl(
factoryMock, new WorkerPoolConfig(entryList(DUMMY_MNEMONIC, 5), emptyEntryList()));
WorkerKey key = createWorkerKey(DUMMY_MNEMONIC, fileSystem);
Worker w1 = workerPool.borrowObject(key);
Worker w2 = workerPool.borrowObject(key);
Worker w3 = workerPool.borrowObject(key);
Worker w4 = workerPool.borrowObject(key);
Worker w5 = workerPool.borrowObject(key);
workerPool.returnObject(key, w1);
workerPool.returnObject(key, w2);
Worker w1 = workerPool.borrowWorker(key);
Worker w2 = workerPool.borrowWorker(key);
Worker w3 = workerPool.borrowWorker(key);
Worker w4 = workerPool.borrowWorker(key);
Worker w5 = workerPool.borrowWorker(key);
workerPool.returnWorker(key, w1);
workerPool.returnWorker(key, w2);

ImmutableList<WorkerProcessMetrics> workerMetrics =
ImmutableList.of(
Expand Down Expand Up @@ -527,14 +527,14 @@ public void evictWorkers_testMultiplexWorkers() throws Exception {
factoryMock, new WorkerPoolConfig(emptyEntryList(), entryList(DUMMY_MNEMONIC, 2)));
WorkerKey key =
createWorkerKey(DUMMY_MNEMONIC, fileSystem, /* multiplex= */ true, /* sandboxed= */ false);
Worker w1 = workerPool.borrowObject(key);
Worker w2 = workerPool.borrowObject(key);
Worker w1 = workerPool.borrowWorker(key);
Worker w2 = workerPool.borrowWorker(key);

// Multiplex workers should share the same status instance.
assertThat(w1.getStatus()).isSameInstanceAs(w2.getStatus());

workerPool.returnObject(key, w1);
workerPool.returnObject(key, w2);
workerPool.returnWorker(key, w1);
workerPool.returnWorker(key, w2);
ImmutableList<WorkerProcessMetrics> workerMetrics =
ImmutableList.of(
createMultiplexWorkerMetric(
Expand All @@ -558,13 +558,13 @@ public void evictWorkers_doomMultiplexWorker() throws Exception {
factoryMock, new WorkerPoolConfig(emptyEntryList(), entryList(DUMMY_MNEMONIC, 2)));
WorkerKey key =
createWorkerKey(DUMMY_MNEMONIC, fileSystem, /* multiplex= */ true, /* sandboxed= */ false);
Worker w1 = workerPool.borrowObject(key);
Worker w2 = workerPool.borrowObject(key);
Worker w1 = workerPool.borrowWorker(key);
Worker w2 = workerPool.borrowWorker(key);

// Multiplex workers should share the same status instance.
assertThat(w1.getStatus()).isSameInstanceAs(w2.getStatus());

workerPool.returnObject(key, w1);
workerPool.returnWorker(key, w1);
ImmutableList<WorkerProcessMetrics> workerMetrics =
ImmutableList.of(
createMultiplexWorkerMetric(
Expand All @@ -582,7 +582,7 @@ public void evictWorkers_doomMultiplexWorker() throws Exception {
// Not yet killed because w2 is still alive (and both share a WorkerProcessStatus).
assertThat(w1.getStatus().get()).isEqualTo(Status.PENDING_KILL_DUE_TO_MEMORY_PRESSURE);

workerPool.returnObject(key, w2);
workerPool.returnWorker(key, w2);
assertThat(workerPool.getIdleWorkers()).isEmpty();
assertThat(workerPool.getNumActive(key)).isEqualTo(0);
// Status is only set to killed after the last worker proxy is destroyed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public void buildStarting_createsPools()
assertThat(module.workerPool).isNotNull();

WorkerKey workerKey = WorkerTestUtils.createWorkerKey(JSON, fs);
Worker worker = module.workerPool.borrowObject(workerKey);
Worker worker = module.workerPool.borrowWorker(workerKey);

assertThat(worker.workerKey).isEqualTo(workerKey);
assertThat(fs.getPath("/outputRoot/outputBase/bazel-workers").exists()).isTrue();
Expand Down Expand Up @@ -260,7 +260,7 @@ public void buildStarting_survivesNoWorkerDir() throws Exception {

// But an actual worker cannot be created.
WorkerKey key = WorkerTestUtils.createWorkerKey(fs, "Work", /* proxied= */ false);
assertThrows(IOException.class, () -> module.workerPool.borrowObject(key));
assertThrows(IOException.class, () -> module.workerPool.borrowWorker(key));
}

@Test
Expand Down
Loading

0 comments on commit 15f9d85

Please sign in to comment.