From 21284a95e76496450c9b09152a4684a0014444a2 Mon Sep 17 00:00:00 2001 From: Anshuman Mishra Date: Wed, 18 Oct 2023 23:29:27 -0700 Subject: [PATCH] feat: Hot CAS Entries - Final Integration --- .../java/build/buildfarm/cas/MemoryCAS.java | 10 +++++-- .../build/buildfarm/cas/cfc/CASFileCache.java | 5 ++++ .../buildfarm/common/config/Backplane.java | 3 +++ .../worker/shard/ShardCASFileCache.java | 2 ++ .../build/buildfarm/worker/shard/Worker.java | 27 ++++++++++++++++++- .../buildfarm/cas/cfc/CASFileCacheTest.java | 3 +++ 6 files changed, 47 insertions(+), 3 deletions(-) diff --git a/src/main/java/build/buildfarm/cas/MemoryCAS.java b/src/main/java/build/buildfarm/cas/MemoryCAS.java index 08ac8540c2..74038543f3 100644 --- a/src/main/java/build/buildfarm/cas/MemoryCAS.java +++ b/src/main/java/build/buildfarm/cas/MemoryCAS.java @@ -49,6 +49,7 @@ public class MemoryCAS implements ContentAddressableStorage { private final long maxSizeInBytes; private final Consumer onPut; + private final Consumer onReadComplete; @GuardedBy("this") private final Map storage; @@ -63,13 +64,17 @@ public class MemoryCAS implements ContentAddressableStorage { private final Writes writes = new Writes(this); public MemoryCAS(long maxSizeInBytes) { - this(maxSizeInBytes, (digest) -> {}, /* delegate=*/ null); + this(maxSizeInBytes, (digest) -> {}, /* onReadComplete=*/ (digest) -> {}, /* delegate=*/ null); } public MemoryCAS( - long maxSizeInBytes, Consumer onPut, ContentAddressableStorage delegate) { + long maxSizeInBytes, + Consumer onPut, + Consumer onReadComplete, + ContentAddressableStorage delegate) { this.maxSizeInBytes = maxSizeInBytes; this.onPut = onPut; + this.onReadComplete = onReadComplete; this.delegate = delegate; sizeInBytes = 0; header.before = header.after = header; @@ -223,6 +228,7 @@ private synchronized Entry getEntry(Digest digest) { return null; } e.recordAccess(header); + onReadComplete.accept(digest); return e; } diff --git a/src/main/java/build/buildfarm/cas/cfc/CASFileCache.java b/src/main/java/build/buildfarm/cas/cfc/CASFileCache.java index aac1b28994..c13d297ff6 100644 --- a/src/main/java/build/buildfarm/cas/cfc/CASFileCache.java +++ b/src/main/java/build/buildfarm/cas/cfc/CASFileCache.java @@ -176,6 +176,7 @@ public abstract class CASFileCache implements ContentAddressableStorage { private final DigestUtil digestUtil; private final ConcurrentMap storage; private final Consumer onPut; + private final Consumer onReadComplete; private final Consumer> onExpire; private final Executor accessRecorder; private final ExecutorService expireService; @@ -324,6 +325,7 @@ public CASFileCache( /* storage=*/ Maps.newConcurrentMap(), /* directoriesIndexDbName=*/ DEFAULT_DIRECTORIES_INDEX_NAME, /* onPut=*/ (digest) -> {}, + /* onReadComplete=*/ (digest) -> {}, /* onExpire=*/ (digests) -> {}, /* delegate=*/ null, /* delegateSkipLoad=*/ false); @@ -342,6 +344,7 @@ public CASFileCache( ConcurrentMap storage, String directoriesIndexDbName, Consumer onPut, + Consumer onReadComplete, Consumer> onExpire, @Nullable ContentAddressableStorage delegate, boolean delegateSkipLoad) { @@ -354,6 +357,7 @@ public CASFileCache( this.accessRecorder = accessRecorder; this.storage = storage; this.onPut = onPut; + this.onReadComplete = onReadComplete; this.onExpire = onExpire; this.delegate = delegate; this.delegateSkipLoad = delegateSkipLoad; @@ -679,6 +683,7 @@ void sendBuffer() throws IOException { if (len < 0) { in.close(); blobObserver.onCompleted(); + onReadComplete.accept(digest); } } } diff --git a/src/main/java/build/buildfarm/common/config/Backplane.java b/src/main/java/build/buildfarm/common/config/Backplane.java index 6376c99f1c..4a16ec4851 100644 --- a/src/main/java/build/buildfarm/common/config/Backplane.java +++ b/src/main/java/build/buildfarm/common/config/Backplane.java @@ -36,6 +36,9 @@ public enum BACKPLANE_TYPE { private String casPrefix = "ContentAddressableStorage"; private int casExpire = 604800; // 1 Week private String casReadCountSetName = "CasReadCount"; + private boolean enableCasAccessMetrics = false; + private int casReadCountWindow = 14400; // 4 hours + private int casReadCountUpdateInterval = 900; // 15 mins @Getter(AccessLevel.NONE) private boolean subscribeToBackplane = true; // deprecated diff --git a/src/main/java/build/buildfarm/worker/shard/ShardCASFileCache.java b/src/main/java/build/buildfarm/worker/shard/ShardCASFileCache.java index b03cbe6f32..52e265be47 100644 --- a/src/main/java/build/buildfarm/worker/shard/ShardCASFileCache.java +++ b/src/main/java/build/buildfarm/worker/shard/ShardCASFileCache.java @@ -43,6 +43,7 @@ class ShardCASFileCache extends CASFileCache { ExecutorService expireService, Executor accessRecorder, Consumer onPut, + Consumer onReadComplete, Consumer> onExpire, ContentAddressableStorage delegate, boolean delegateSkipLoad) { @@ -59,6 +60,7 @@ class ShardCASFileCache extends CASFileCache { /* storage=*/ Maps.newConcurrentMap(), DEFAULT_DIRECTORIES_INDEX_NAME, onPut, + onReadComplete, onExpire, delegate, delegateSkipLoad); diff --git a/src/main/java/build/buildfarm/worker/shard/Worker.java b/src/main/java/build/buildfarm/worker/shard/Worker.java index 7a0d4a60ed..ac0852e874 100644 --- a/src/main/java/build/buildfarm/worker/shard/Worker.java +++ b/src/main/java/build/buildfarm/worker/shard/Worker.java @@ -31,6 +31,7 @@ import build.buildfarm.cas.ContentAddressableStorage; import build.buildfarm.cas.ContentAddressableStorage.Blob; import build.buildfarm.cas.MemoryCAS; +import build.buildfarm.cas.cfc.CASAccessMetricsRecorder; import build.buildfarm.cas.cfc.CASFileCache; import build.buildfarm.common.BuildfarmExecutors; import build.buildfarm.common.DigestUtil; @@ -135,6 +136,7 @@ public final class Worker extends LoggingMain { private Backplane backplane; private LoadingCache workerStubs; private AtomicBoolean released = new AtomicBoolean(true); + @Nullable private CASAccessMetricsRecorder casAccessMetricsRecorder; private Worker() { super("BuildFarmShardWorker"); @@ -300,7 +302,8 @@ private ContentAddressableStorage createStorage( throw new IllegalArgumentException("Invalid cas type specified"); case MEMORY: case FUSE: // FIXME have FUSE refer to a name for storage backing, and topo - return new MemoryCAS(cas.getMaxSizeBytes(), this::onStoragePut, delegate); + return new MemoryCAS( + cas.getMaxSizeBytes(), this::onStoragePut, this::onReadComplete, delegate); case GRPC: checkState(delegate == null, "grpc cas cannot delegate"); return createGrpcCAS(cas); @@ -318,6 +321,7 @@ private ContentAddressableStorage createStorage( removeDirectoryService, accessRecorder, this::onStoragePut, + this::onReadComplete, delegate == null ? this::onStorageExpire : (digests) -> {}, delegate, delegateSkipLoad); @@ -347,6 +351,9 @@ private void onStoragePut(Digest digest) { if (configs.getWorker().getCapabilities().isCas()) { backplane.addBlobLocation(digest, configs.getWorker().getPublicName()); } + if (configs.getBackplane().isEnableCasAccessMetrics()) { + casAccessMetricsRecorder.recordWrite(digest); + } } catch (IOException e) { throw Status.fromThrowable(e).asRuntimeException(); } @@ -363,6 +370,12 @@ private void onStorageExpire(Iterable digests) { } } + private void onReadComplete(Digest digest) { + if (configs.getBackplane().isEnableCasAccessMetrics()) { + casAccessMetricsRecorder.recordRead(digest); + } + } + private void removeWorker(String name) { try { backplane.removeWorker(name, "removing self prior to initialization"); @@ -575,6 +588,15 @@ public void start() throws ConfigurationException, InterruptedException, IOExcep execFileSystem.start( (digests) -> addBlobsLocation(digests, configs.getWorker().getPublicName()), skipLoad); + if (configs.getBackplane().isEnableCasAccessMetrics()) { + casAccessMetricsRecorder = + new CASAccessMetricsRecorder( + backplane, + java.time.Duration.ofSeconds(configs.getBackplane().getCasReadCountWindow()), + java.time.Duration.ofSeconds(configs.getBackplane().getCasReadCountUpdateInterval())); + casAccessMetricsRecorder.start(); + } + server.start(); healthStatusManager.setStatus( HealthStatusManager.SERVICE_NAME_ALL_SERVICES, ServingStatus.SERVING); @@ -645,6 +667,9 @@ private void shutdown() throws InterruptedException { execFileSystem.stop(); execFileSystem = null; } + if (casAccessMetricsRecorder != null) { + casAccessMetricsRecorder.stop(); + } if (server != null) { log.info("Shutting down the server"); server.shutdown(); diff --git a/src/test/java/build/buildfarm/cas/cfc/CASFileCacheTest.java b/src/test/java/build/buildfarm/cas/cfc/CASFileCacheTest.java index a3d406e232..8f1f24e55c 100644 --- a/src/test/java/build/buildfarm/cas/cfc/CASFileCacheTest.java +++ b/src/test/java/build/buildfarm/cas/cfc/CASFileCacheTest.java @@ -154,6 +154,7 @@ public void setUp() throws IOException, InterruptedException { storage, /* directoriesIndexDbName=*/ ":memory:", onPut, + /* onReadComplete=*/ digest -> {}, onExpire, delegate, /* delegateSkipLoad=*/ false) { @@ -1118,6 +1119,7 @@ public void copyExternalInputRetries() throws Exception { storage, /* directoriesIndexDbName=*/ ":memory:", /* onPut=*/ digest -> {}, + /* onReadComplete=*/ digest -> {}, /* onExpire=*/ digests -> {}, /* delegate=*/ null, /* delegateSkipLoad=*/ false) { @@ -1181,6 +1183,7 @@ public void newInputThrowsNoSuchFileExceptionWithoutDelegate() throws Exception storage, /* directoriesIndexDbName=*/ ":memory:", /* onPut=*/ digest -> {}, + /* onReadComplete=*/ digest -> {}, /* onExpire=*/ digests -> {}, /* delegate=*/ null, /* delegateSkipLoad=*/ false) {