Skip to content

Commit

Permalink
Handle optional arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
bgaidioz committed Jan 28, 2025
1 parent 92694f4 commit 00b4d0f
Showing 1 changed file with 23 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import com.rawlabs.das.server.cache.catalog.CacheDefinition
import com.rawlabs.das.server.cache.iterator.QueryProcessorFlow
import com.rawlabs.das.server.cache.manager.CacheManager
import com.rawlabs.das.server.cache.manager.CacheManager.{GetIterator, WrappedGetIterator}
import com.rawlabs.das.server.cache.queue.ChronicleDataSource.ConsumerTerminated
import com.rawlabs.das.server.cache.queue.{CloseableIterator, DataProducingTask}
import com.rawlabs.das.server.manager.DASSdkManager
import com.rawlabs.protocol.das.v1.services._
Expand Down Expand Up @@ -225,8 +224,18 @@ class TableServiceGrpcImpl(
val quals = request.getQuery.getQualsList.asScala.toSeq
val columns = request.getQuery.getColumnsList.asScala.toSeq
val sortKeys = request.getQuery.getSortKeysList.asScala.toSeq
val maxCacheAge = request.getMaxCacheAgeSeconds.seconds
assert(maxCacheAge >= 0.seconds, "maxCacheAge must be non-negative")
val minCreationDate = {
request.getMaxBatchSizeBytes
if (request.hasMaxCacheAge) {
val maxCacheAge = request.getMaxCacheAge
val nanos = maxCacheAge.getSeconds * 1_000_000_000 + maxCacheAge.getNanos
assert(nanos >= 0, "maxCacheAge must be non-negative")
Some(Instant.now().minusNanos(nanos))
} else {
// No cache age specified. Don't use caches.
None
}
}

// Build a data-producing task for the table, if the cache manager needs to create a new cache
val makeTask = () =>
Expand Down Expand Up @@ -260,7 +269,7 @@ class TableServiceGrpcImpl(
quals = quals,
columns = columns,
sortKeys = sortKeys),
minCreationDate = Some(Instant.now().minusMillis(maxCacheAge.toMillis)),
minCreationDate = minCreationDate,
makeTask = makeTask,
codec = new RowCodec,
replyTo = replyTo))
Expand Down Expand Up @@ -327,8 +336,16 @@ class TableServiceGrpcImpl(
maybeServerCallObs: Option[ServerCallStreamObserver[Rows]]) = {

// Define the maximum bytes per chunk
val clientMaxBytes = request.getMaxBatchSizeBytes * 3 / 4;
assert(clientMaxBytes > 0, "clientMaxBytes must be positive")
val clientMaxBytes = {
if (request.hasMaxBatchSizeBytes) {
// We multiply by 3/4 to leave some room for gRPC overhead
request.getMaxBatchSizeBytes * 3 / 4
} else {
// Default to 2M
2_000_000
}

}

// Build a stream that splits the rows by the client's max byte size
val rowBatches = source
Expand Down

0 comments on commit 00b4d0f

Please sign in to comment.