Skip to content

Commit

Permalink
Made the cache parameterized
Browse files Browse the repository at this point in the history
  • Loading branch information
bgaidioz committed Feb 17, 2025
1 parent b31ac0a commit 42d76ba
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 27 deletions.
6 changes: 1 addition & 5 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@ das {
batch-latency = 100 millis # how long we wait for more rows before we send a batch
}
cache {
sqlite-catalog-file = "/tmp/catalog.db" # the file where the catalog is stored
data-dir = "/tmp/cacheData" # the directory where the cache is stored
max-entries = 10 # max number of entries in the cache before we start GC'ing old entries
batch-size = 1000 # how many rows of data are produced per producerInterval tick
grace-period = 10 seconds # how long we keep an iterator alive (but paused) even if there are no readers currently consuming it
producer-interval = 5 millis # the interval at which the producer produces data. It produces batchSize rows per interval.
max-chunks-per-entry = 5 # max number of chunks per entry before we stop caching a particular entry
}
}
14 changes: 8 additions & 6 deletions src/main/scala/com/rawlabs/das/server/DASServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,11 @@

package com.rawlabs.das.server

import java.io.File

import scala.concurrent.ExecutionContext
import scala.jdk.DurationConverters.JavaDurationOps

import com.rawlabs.das.sdk.DASSettings
import com.rawlabs.das.server.cache.QueryResultCache
import com.rawlabs.das.server.grpc.{
HealthCheckServiceGrpcImpl,
RegistrationServiceGrpcImpl,
Expand All @@ -26,12 +25,10 @@ import com.rawlabs.das.server.grpc.{
}
import com.rawlabs.das.server.manager.DASSdkManager
import com.rawlabs.das.server.webui.{DASWebUIServer, DebugAppService}
import com.rawlabs.protocol.das.v1.query.Qual
import com.rawlabs.protocol.das.v1.services.{HealthCheckServiceGrpc, RegistrationServiceGrpc, TablesServiceGrpc}
import com.rawlabs.protocol.das.v1.tables.Row

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem, Scheduler}
import akka.actor.typed.{ActorSystem, Scheduler}
import akka.stream.Materializer
import io.grpc.{Server, ServerBuilder}

Expand All @@ -50,10 +47,15 @@ class DASServer()(
private val registrationService = RegistrationServiceGrpc
.bindService(new RegistrationServiceGrpcImpl(dasSdkManager))

private val resultCache =
new QueryResultCache(
maxEntries = settings.getInt("das.cache.max-entries"),
maxChunksPerEntry = settings.getInt("das.cache.max-chunks-per-entry"))

private val tablesService = {
val batchLatency = settings.getDuration("das.server.batch-latency").toScala
TablesServiceGrpc
.bindService(new TableServiceGrpcImpl(dasSdkManager, batchLatency))
.bindService(new TableServiceGrpcImpl(dasSdkManager, resultCache, batchLatency))
}
// private val functionsService - FunctionsServiceGrpc.bindService(new FunctionsServiceGrpcImpl(dasSdkManager))

Expand Down
19 changes: 9 additions & 10 deletions src/main/scala/com/rawlabs/das/server/cache/QueryResultCache.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ final case class QueryCacheKey(request: ExecuteTableRequest)
* limit is exceeded, it marks the result as overflowed. When the stream completes, if the result has not overflowed,
* the accumulated chunks are registered into the cache.
*/
final class ResultBuffer(key: QueryCacheKey, maxSize: Int) {
final class ResultBuffer(resultCache: QueryResultCache, key: QueryCacheKey, maxSize: Int) {

private val rows = mutable.Buffer.empty[Rows]
// A flag indicating that the buffer has overflowed.
Expand All @@ -49,24 +49,23 @@ final class ResultBuffer(key: QueryCacheKey, maxSize: Int) {
* Called when the stream is finished. If the result did not overflow, register the accumulated chunks in the cache.
*/
def register(): Unit = {
if (!full) QueryResultCache.put(key, rows.toSeq)
if (!full) resultCache.put(key, rows.toSeq)
}

}

/**
* QueryResultCache is a simple cache for query results using Guava Cache. It stores up to MAX_CACHES entries. Each
* entry is a sequence of Rows (i.e. row chunks). A removal listener logs when an entry is discarded.
* QueryResultCache is a cache for query results using Guava Cache. It stores up to maxEntries entries. Each entry is a
* sequence of Rows (i.e. row chunks), which size is limited to maxChunksPerEntry chunks. Chunk size isn't determined by
* the cache. Row streams are split at the table level, when hitting the client's buffer size or a timeout. A removal
* listener logs when an entry is discarded.
*/
object QueryResultCache extends StrictLogging {

private val MAX_CACHES = 5
private val MAX_CHUNKS_PER_CACHE = 10
class QueryResultCache(maxEntries: Int, maxChunksPerEntry: Int) extends StrictLogging {

// Create a Guava cache with a maximum size and a removal listener to log evictions.
private val cache: Cache[String, Seq[Rows]] = CacheBuilder
.newBuilder()
.maximumSize(MAX_CACHES)
.maximumSize(maxEntries)
.removalListener((notification: RemovalNotification[String, Seq[Rows]]) => {
logger.info(s"Entry for key [${notification.getKey}] removed due to ${notification.getCause}")
})
Expand All @@ -75,7 +74,7 @@ object QueryResultCache extends StrictLogging {
/**
* Creates a new ResultBuffer for a given query key. The buffer will accumulate up to MAX_CHUNKS_PER_CACHE chunks.
*/
def newBuffer(key: QueryCacheKey): ResultBuffer = new ResultBuffer(key, MAX_CHUNKS_PER_CACHE)
def newBuffer(key: QueryCacheKey): ResultBuffer = new ResultBuffer(this, key, maxChunksPerEntry)

/**
* Retrieves a cached result for the given key, if present. Returns an Iterator over the cached row chunks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import com.rawlabs.protocol.das.v1.tables._
import com.typesafe.scalalogging.StrictLogging

import akka.NotUsed
import akka.actor.typed.Scheduler
import akka.stream.scaladsl.{Keep, Sink, Source}
import akka.stream.{KillSwitches, Materializer, UniqueKillSwitch}
import io.grpc.stub.{ServerCallStreamObserver, StreamObserver}
Expand All @@ -39,10 +38,10 @@ import io.grpc.{Status, StatusRuntimeException}
* @param provider Provides access to DAS (Data Access Service) instances.
* @param batchLatency Time delay for batching rows (used in groupingWeightedWithin).
*/
class TableServiceGrpcImpl(provider: DASSdkManager, batchLatency: FiniteDuration = 500.millis)(implicit
val ec: ExecutionContext,
materializer: Materializer,
scheduler: Scheduler)
class TableServiceGrpcImpl(
provider: DASSdkManager,
resultCache: QueryResultCache,
batchLatency: FiniteDuration = 500.millis)(implicit val ec: ExecutionContext, materializer: Materializer)
extends TablesServiceGrpc.TablesServiceImplBase
with StrictLogging {

Expand Down Expand Up @@ -227,7 +226,7 @@ class TableServiceGrpcImpl(provider: DASSdkManager, batchLatency: FiniteDuration
try {
val key = QueryCacheKey(request)
// Check if we have a cached result for this query
val source: Source[Rows, NotUsed] = QueryResultCache.get(key) match {
val source: Source[Rows, NotUsed] = resultCache.get(key) match {
case Some(iterator) =>
// We do. Use the iterator to build the Source.
logger.debug(s"Using cached result for $request.")
Expand Down

0 comments on commit 42d76ba

Please sign in to comment.