Skip to content

Commit

Permalink
[Spark] Unify cache key type in DeltaLog (#4172)
Browse files Browse the repository at this point in the history
<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

There are currently two cache key types in `DeltaLog`:
`DeltaLogCacheKey` and `CacheKey`, but there is no difference between
them. This PR unifies them into a single cache key type.

## How was this patch tested?

Existing unit tests

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
ctring authored Feb 19, 2025
1 parent d814888 commit 3e3c01b
Showing 1 changed file with 24 additions and 15 deletions.
39 changes: 24 additions & 15 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -688,11 +688,15 @@ class DeltaLog private(
object DeltaLog extends DeltaLogging {

/**
* The key type of `DeltaLog` cache. It's a pair of the canonicalized table path and the file
* system options (options starting with "fs." or "dfs." prefix) passed into
* `DataFrameReader/Writer`
* The key type of `DeltaLog` cache. It consists of
* - The canonicalized table path
* - File system options (options starting with "fs." or "dfs." prefix) passed into
* `DataFrameReader/Writer`
*/
private type DeltaLogCacheKey = (Path, Map[String, String])
case class DeltaLogCacheKey(
path: Path,
fsOptions: Map[String, String]
)

/** The name of the subdirectory that holds Delta metadata files */
private[delta] val LOG_DIR_NAME = "_delta_log"
Expand All @@ -705,9 +709,8 @@ object DeltaLog extends DeltaLogging {
* We create only a single [[DeltaLog]] for any given `DeltaLogCacheKey` to avoid wasted work
* in reconstructing the log.
*/
type CacheKey = (Path, Map[String, String])
private[delta] def getOrCreateCache(conf: SQLConf):
Cache[CacheKey, DeltaLog] = synchronized {
Cache[DeltaLogCacheKey, DeltaLog] = synchronized {
deltaLogCache match {
case Some(c) => c
case None =>
Expand All @@ -721,12 +724,12 @@ object DeltaLog extends DeltaLogging {
// Various layers will throw null pointer if the RDD is already gone.
}
})
deltaLogCache = Some(builder.build[CacheKey, DeltaLog]())
deltaLogCache = Some(builder.build[DeltaLogCacheKey, DeltaLog]())
deltaLogCache.get
}
}

private var deltaLogCache: Option[Cache[CacheKey, DeltaLog]] = None
private var deltaLogCache: Option[Cache[DeltaLogCacheKey, DeltaLog]] = None

/**
* Helper to create delta log caches
Expand Down Expand Up @@ -941,15 +944,19 @@ object DeltaLog extends DeltaLogging {
)
}
}
def getDeltaLogFromCache(): DeltaLog = {
val cacheKey = DeltaLogCacheKey(
path,
fileSystemOptions)

def getDeltaLogFromCache: DeltaLog = {
// The following cases will still create a new ActionLog even if there is a cached
// ActionLog using a different format path:
// - Different `scheme`
// - Different `authority` (e.g., different user tokens in the path)
// - Different mount point.
try {
getOrCreateCache(spark.sessionState.conf)
.get(path -> fileSystemOptions, () => {
.get(cacheKey, () => {
createDeltaLog()
}
)
Expand All @@ -959,12 +966,12 @@ object DeltaLog extends DeltaLogging {
}
}

val deltaLog = getDeltaLogFromCache()
val deltaLog = getDeltaLogFromCache
if (Option(deltaLog.sparkContext.get).map(_.isStopped).getOrElse(true)) {
// Invalid the cached `DeltaLog` and create a new one because the `SparkContext` of the cached
// `DeltaLog` has been stopped.
getOrCreateCache(spark.sessionState.conf).invalidate(path -> fileSystemOptions)
getDeltaLogFromCache()
getOrCreateCache(spark.sessionState.conf).invalidate(cacheKey)
getDeltaLogFromCache
} else {
deltaLog
}
Expand All @@ -990,13 +997,15 @@ object DeltaLog extends DeltaLogging {
val iter = deltaLogCache.asMap().keySet().iterator()
while (iter.hasNext) {
val key = iter.next()
if (key._1 == path) {
if (key.path == path) {
keysToBeRemoved += key
}
}
deltaLogCache.invalidateAll(keysToBeRemoved.asJava)
} else {
deltaLogCache.invalidate(path -> Map.empty)
deltaLogCache.invalidate(DeltaLogCacheKey(
path,
fsOptions = Map.empty))
}
} catch {
case NonFatal(e) => logWarning(e.getMessage, e)
Expand Down

0 comments on commit 3e3c01b

Please sign in to comment.