Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed RD-15170: Reuse existing DASes when registering the same credential #17

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 69 additions & 45 deletions src/main/scala/com/rawlabs/das/server/DASSdkManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,24 @@

package com.rawlabs.das.server

import com.google.common.cache.{CacheBuilder, CacheLoader, RemovalNotification}
import com.rawlabs.das.sdk.{DASSdk, DASSdkBuilder}
import com.rawlabs.protocol.das.DASId
import com.rawlabs.utils.core.RawSettings
import com.typesafe.scalalogging.StrictLogging

import java.util.ServiceLoader
import scala.collection.JavaConverters._
import scala.collection.mutable
import java.util.ServiceLoader

object DASSdkManager {
private val BUILTIN_DAS = "raw.das.server.builtin"
}

// TODO (msb): Remove if NOT USED since M hours AND/OR does not exist in creds if it came from creds

private case class DaSDKInMemoryEntry(options: Map[String, String], dasSdk: DASSdk)
// In memory spec of a DAS configuration. Used to index running DASes.
private case class DASConfig(dasType: String, options: Map[String, String])

/**
* Manages the lifecycle of Data Access Services (DAS) in the server.
Expand All @@ -40,48 +42,59 @@ class DASSdkManager(implicit settings: RawSettings) extends StrictLogging {

private val dasSdkLoader = ServiceLoader.load(classOf[DASSdkBuilder]).asScala

private val dasSdksInMemory = mutable.HashMap[DASId, DaSDKInMemoryEntry]()
private val dasSdksInMemoryLock = new Object
private val dasSdkConfigCache = mutable.HashMap[DASId, DASConfig]()
private val dasSdkConfigCacheLock = new Object
private val dasSdkCache = CacheBuilder
.newBuilder()
.removalListener((notification: RemovalNotification[DASConfig, DASSdk]) => {
logger.debug(s"Removing DAS SDK for type: ${notification.getKey.dasType}")
// That's where a DAS instance should be closed (e.g. close connections, etc.)
})
.build(new CacheLoader[DASConfig, DASSdk] {
override def load(dasConfig: DASConfig): DASSdk = {
logger.debug(s"Loading DAS SDK for type: ${dasConfig.dasType}")
logger.trace(s"DAS Options: ${dasConfig.options}")
val dasType = dasConfig.dasType
dasSdkLoader
.find(_.dasType == dasType)
.getOrElse {
logger.error(s"DAS type '$dasType' not supported.")
throw new IllegalArgumentException(s"DAS type '$dasType' not supported")
}
.build(dasConfig.options)
}
})

// At startup, read any available DAS configurations from the local config file and register them.
registerDASFromConfig()

/**
* Registers a new DAS instance with the specified type and options.
*
* @param dasType The type of the DAS to register.
* @param options A map of options for configuring the DAS.
* @param dasType The type of the DAS to register.
* @param options A map of options for configuring the DAS.
* @param maybeDasId An optional DAS ID, if not provided a new one will be generated.
* @return The registered DAS ID.
*/
def registerDAS(dasType: String, options: Map[String, String], maybeDasId: Option[DASId] = None): DASId = {
dasSdksInMemoryLock.synchronized {
val dasId = maybeDasId.getOrElse(DASId.newBuilder().setId(java.util.UUID.randomUUID().toString).build())
dasSdksInMemory.get(dasId) match {
case Some(DaSDKInMemoryEntry(inMemoryOptions, _)) =>
if (compareOptions(inMemoryOptions, options)) {
logger.warn(s"DAS with ID $dasId is already registered with the same options.")
return dasId
} else {
// Start from the provided DAS ID, or create a new one.
val dasId = maybeDasId.getOrElse(DASId.newBuilder().setId(java.util.UUID.randomUUID().toString).build())
// Then make sure that the DAS is not already registered with a different config.
val config = DASConfig(dasType, stripOptions(options))
dasSdkConfigCacheLock.synchronized {
dasSdkConfigCache.get(dasId) match {
case Some(registeredConfig) => if (registeredConfig != config) {
logger.error(
s"DAS with ID $dasId is already registered. Registered options are: $inMemoryOptions and new options are: $options"
s"DAS with ID $dasId is already registered with a different configuration"
)
throw new IllegalArgumentException(s"DAS with id $dasId already registered")
}
case None =>
logger.debug(s"Registering DAS with ID: $dasId, Type: $dasType")
val dasSdk = dasSdkLoader
.find(_.dasType == dasType)
.getOrElse {
logger.error(s"DAS type '$dasType' not supported.")
throw new IllegalArgumentException(s"DAS type '$dasType' not supported")
}
.build(options)
dasSdksInMemory.put(dasId, DaSDKInMemoryEntry(options, dasSdk))
logger.debug(s"DAS registered successfully with ID: $dasId")
dasId
case None => dasSdkConfigCache.put(dasId, config)
}
}
// Everything is fine at dasId/config level. Create (or use previously cached instance) an SDK with the config.
dasSdkCache.get(config) // If the config didn't exist, that blocks until the new DAS is loaded
dasId
}

/**
Expand All @@ -90,13 +103,19 @@ class DASSdkManager(implicit settings: RawSettings) extends StrictLogging {
* @param dasId The DAS ID to unregister.
*/
def unregisterDAS(dasId: DASId): Unit = {
dasSdksInMemoryLock.synchronized {
if (dasSdksInMemory.contains(dasId)) {
logger.debug(s"Unregistering DAS with ID: $dasId")
dasSdksInMemory.remove(dasId)
logger.debug(s"DAS unregistered successfully with ID: $dasId")
} else {
logger.warn(s"Tried to unregister DAS with ID: $dasId, but it was not found.")
dasSdkConfigCacheLock.synchronized {
dasSdkConfigCache.get(dasId) match {
case Some(config) =>
logger.debug(s"Unregistering DAS with ID: $dasId")
dasSdkConfigCache.remove(dasId)
logger.debug(s"DAS unregistered successfully with ID: $dasId")
if (!dasSdkConfigCache.values.exists(_ == config)) {
// It was the last DAS with this config, so invalidate the cache
dasSdkCache.invalidate(config)
}
case None => {
logger.warn(s"Tried to unregister DAS with ID: $dasId, but it was not found.")
}
}
}
}
Expand All @@ -108,13 +127,15 @@ class DASSdkManager(implicit settings: RawSettings) extends StrictLogging {
* @return The DAS instance.
*/
def getDAS(dasId: DASId): DASSdk = {
dasSdksInMemoryLock.synchronized {
logger.debug(s"Fetching DAS with ID: $dasId")
dasSdksInMemory.getOrElseUpdate(
// Pick the known config
val config = dasSdkConfigCacheLock.synchronized {
dasSdkConfigCache.getOrElseUpdate(
dasId,
getDASFromRemote(dasId).getOrElse(throw new IllegalArgumentException(s"DAS not found: $dasId"))
)
}.dasSdk
}
// Get the matching DAS from the cache
dasSdkCache.get(DASConfig(config.dasType, config.options))
}

/**
Expand All @@ -138,7 +159,7 @@ class DASSdkManager(implicit settings: RawSettings) extends StrictLogging {
*/
private def readDASFromConfig(): Map[String, (String, Map[String, String])] = {
val ids = mutable.Map[String, (String, Map[String, String])]()
dasSdksInMemoryLock.synchronized {
dasSdkConfigCacheLock.synchronized {
try {
settings.config.getConfig(BUILTIN_DAS).root().entrySet().asScala.foreach { entry =>
val id = entry.getKey
Expand Down Expand Up @@ -180,14 +201,17 @@ class DASSdkManager(implicit settings: RawSettings) extends StrictLogging {
* @param dasId The DAS ID to retrieve.
* @return The DAS instance.
*/
private def getDASFromRemote(dasId: DASId): Option[DaSDKInMemoryEntry] = {
private def getDASFromRemote(dasId: DASId): Option[DASConfig] = {
None
}

// Compare options to determine if two DAS instances are the same.
// Ignore options that start with "das_" as they are internal to the DAS SDK.
private def compareOptions(options1: Map[String, String], options2: Map[String, String]): Boolean = {
options1.filterKeys(!_.startsWith("das_")) == options2.filterKeys(!_.startsWith("das_"))
/**
* Strips the DAS-specific options (das_*) from the provided options map. They aren't needed
* for the SDK instance creation and shouldn't be used as part of the cache key.
* @param options The options sent by the client.
* @return The same options with the DAS-specific entries removed.
*/
private def stripOptions(options: Map[String, String]): Map[String, String] = {
options.filterKeys(!_.startsWith("das_"))
}

}