diff --git a/.changeset/new-numbers-visit.md b/.changeset/new-numbers-visit.md new file mode 100644 index 00000000000..dc0a423ab99 --- /dev/null +++ b/.changeset/new-numbers-visit.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +add RcMap.invalidate api, for removing a resource from an RcMap diff --git a/packages/effect/src/Effect.ts b/packages/effect/src/Effect.ts index 0ab26aa80cc..031a5680585 100644 --- a/packages/effect/src/Effect.ts +++ b/packages/effect/src/Effect.ts @@ -13853,15 +13853,4 @@ function fnApply(options: { * @since 3.12.0 * @category Tracing */ -export const fnUntraced: fn.Gen = (body: Function, ...pipeables: Array) => - pipeables.length === 0 - ? function(this: any, ...args: Array) { - return core.fromIterator(() => body.apply(this, args)) - } - : function(this: any, ...args: Array) { - let effect = core.fromIterator(() => body.apply(this, args)) - for (const x of pipeables) { - effect = x(effect) - } - return effect - } +export const fnUntraced: fn.Gen = core.fnUntraced diff --git a/packages/effect/src/RcMap.ts b/packages/effect/src/RcMap.ts index 1c33b4fcb66..df99d901517 100644 --- a/packages/effect/src/RcMap.ts +++ b/packages/effect/src/RcMap.ts @@ -109,3 +109,12 @@ export const get: { * @category combinators */ export const keys: (self: RcMap) => Effect.Effect, E> = internal.keys + +/** + * @since 3.13.0 + * @category combinators + */ +export const invalidate: { + (key: K): (self: RcMap) => Effect.Effect + (self: RcMap, key: K): Effect.Effect +} = internal.invalidate diff --git a/packages/effect/src/internal/core.ts b/packages/effect/src/internal/core.ts index d934997ca9f..e62ae54fecf 100644 --- a/packages/effect/src/internal/core.ts +++ b/packages/effect/src/internal/core.ts @@ -1421,6 +1421,20 @@ export const gen: typeof Effect.gen = function() { return fromIterator(() => f(pipe)) } +/** @internal */ +export const fnUntraced: Effect.fn.Gen = (body: Function, ...pipeables: Array) => + pipeables.length === 0 + ? function(this: any, ...args: Array) { + return fromIterator(() => body.apply(this, args)) + } + : function(this: any, ...args: Array) { + let effect = fromIterator(() => body.apply(this, args)) + for (const x of pipeables) { + effect = x(effect) + } + return effect + } + /* @internal */ export const withConcurrency = dual< (concurrency: number | "unbounded") => (self: Effect.Effect) => Effect.Effect, diff --git a/packages/effect/src/internal/rcMap.ts b/packages/effect/src/internal/rcMap.ts index d1fece6c776..34de2fbe14b 100644 --- a/packages/effect/src/internal/rcMap.ts +++ b/packages/effect/src/internal/rcMap.ts @@ -32,6 +32,7 @@ declare namespace State { interface Entry { readonly deferred: Deferred.Deferred readonly scope: Scope.CloseableScope + readonly finalizer: Effect fiber: RuntimeFiber | undefined refCount: number } @@ -121,96 +122,96 @@ export const make: { export const get: { (key: K): (self: RcMap.RcMap) => Effect (self: RcMap.RcMap, key: K): Effect -} = dual( - 2, - (self_: RcMap.RcMap, key: K): Effect => { - const self = self_ as RcMapImpl - return core.uninterruptibleMask((restore) => - core.suspend(() => { - if (self.state._tag === "Closed") { - return core.interrupt - } - const state = self.state - const o = MutableHashMap.get(state.map, key) - if (o._tag === "Some") { - const entry = o.value - entry.refCount++ - return entry.fiber - ? core.as(core.interruptFiber(entry.fiber), entry) - : core.succeed(entry) - } else if (Number.isFinite(self.capacity) && MutableHashMap.size(self.state.map) >= self.capacity) { - return core.fail( - new core.ExceededCapacityException(`RcMap attempted to exceed capacity of ${self.capacity}`) - ) as Effect +} = dual(2, (self_: RcMap.RcMap, key: K): Effect => { + const self = self_ as RcMapImpl + return core.uninterruptibleMask((restore) => getImpl(self, key, restore as any)) +}) + +const getImpl = core.fnUntraced(function*(self: RcMapImpl, key: K, restore: (a: A) => A) { + if (self.state._tag === "Closed") { + return yield* core.interrupt + } + const state = self.state + const o = MutableHashMap.get(state.map, key) + let entry: State.Entry + if (o._tag === "Some") { + entry = o.value + entry.refCount++ + if (entry.fiber) yield* core.interruptFiber(entry.fiber) + } else if (Number.isFinite(self.capacity) && MutableHashMap.size(self.state.map) >= self.capacity) { + return yield* core.fail( + new core.ExceededCapacityException(`RcMap attempted to exceed capacity of ${self.capacity}`) + ) as Effect + } else { + entry = yield* self.semaphore.withPermits(1)(acquire(self, key, restore)) + } + const scope = yield* fiberRuntime.scopeTag + yield* scope.addFinalizer(() => entry.finalizer) + return yield* restore(core.deferredAwait(entry.deferred)) +}) + +const acquire = core.fnUntraced(function*(self: RcMapImpl, key: K, restore: (a: A) => A) { + const scope = yield* fiberRuntime.scopeMake() + const deferred = yield* core.deferredMake() + const acquire = self.lookup(key) + yield* restore(core.fiberRefLocally( + acquire as Effect, + core.currentContext, + Context.add(self.context, fiberRuntime.scopeTag, scope) + )).pipe( + core.exit, + core.flatMap((exit) => core.deferredDone(deferred, exit)), + circular.forkIn(scope) + ) + const entry: State.Entry = { + deferred, + scope, + finalizer: undefined as any, + fiber: undefined, + refCount: 1 + } + ;(entry as any).finalizer = release(self, key, entry) + if (self.state._tag === "Open") { + MutableHashMap.set(self.state.map, key, entry) + } + return entry +}) + +const release = (self: RcMapImpl, key: K, entry: State.Entry) => + core.suspend(() => { + entry.refCount-- + if (entry.refCount > 0) { + return core.void + } else if ( + self.state._tag === "Closed" + || !MutableHashMap.has(self.state.map, key) + || self.idleTimeToLive === undefined + ) { + if (self.state._tag === "Open") { + MutableHashMap.remove(self.state.map, key) + } + return core.scopeClose(entry.scope, core.exitVoid) + } + + return coreEffect.sleep(self.idleTimeToLive).pipe( + core.interruptible, + core.zipRight(core.suspend(() => { + if (self.state._tag === "Open" && entry.refCount === 0) { + MutableHashMap.remove(self.state.map, key) + return core.scopeClose(entry.scope, core.exitVoid) } - const acquire = self.lookup(key) - return fiberRuntime.scopeMake().pipe( - coreEffect.bindTo("scope"), - coreEffect.bind("deferred", () => core.deferredMake()), - core.tap(({ deferred, scope }) => - restore(core.fiberRefLocally( - acquire as Effect, - core.currentContext, - Context.add(self.context, fiberRuntime.scopeTag, scope) - )).pipe( - core.exit, - core.flatMap((exit) => core.deferredDone(deferred, exit)), - circular.forkIn(scope) - ) - ), - core.map(({ deferred, scope }) => { - const entry: State.Entry = { - deferred, - scope, - fiber: undefined, - refCount: 1 - } - MutableHashMap.set(state.map, key, entry) - return entry - }) - ) - }).pipe( - self.semaphore.withPermits(1), - coreEffect.bindTo("entry"), - coreEffect.bind("scope", () => fiberRuntime.scopeTag), - core.tap(({ entry, scope }) => - scope.addFinalizer(() => - core.suspend(() => { - entry.refCount-- - if (entry.refCount > 0) { - return core.void - } else if (self.idleTimeToLive === undefined) { - if (self.state._tag === "Open") { - MutableHashMap.remove(self.state.map, key) - } - return core.scopeClose(entry.scope, core.exitVoid) - } - return coreEffect.sleep(self.idleTimeToLive).pipe( - core.interruptible, - core.zipRight(core.suspend(() => { - if (self.state._tag === "Open" && entry.refCount === 0) { - MutableHashMap.remove(self.state.map, key) - return core.scopeClose(entry.scope, core.exitVoid) - } - return core.void - })), - fiberRuntime.ensuring(core.sync(() => { - entry.fiber = undefined - })), - circular.forkIn(self.scope), - core.tap((fiber) => { - entry.fiber = fiber - }), - self.semaphore.withPermits(1) - ) - }) - ) - ), - core.flatMap(({ entry }) => restore(core.deferredAwait(entry.deferred))) - ) + return core.void + })), + fiberRuntime.ensuring(core.sync(() => { + entry.fiber = undefined + })), + circular.forkIn(self.scope), + core.tap((fiber) => { + entry.fiber = fiber + }), + self.semaphore.withPermits(1) ) - } -) + }) /** @internal */ export const keys = (self: RcMap.RcMap): Effect> => { @@ -219,3 +220,22 @@ export const keys = (self: RcMap.RcMap): Effect> => { impl.state._tag === "Closed" ? core.interrupt : core.succeed(MutableHashMap.keys(impl.state.map)) ) } + +/** @internal */ +export const invalidate: { + (key: K): (self: RcMap.RcMap) => Effect + (self: RcMap.RcMap, key: K): Effect +} = dual( + 2, + core.fnUntraced(function*(self_: RcMap.RcMap, key: K) { + const self = self_ as RcMapImpl + if (self.state._tag === "Closed") return + const o = MutableHashMap.get(self.state.map, key) + if (o._tag === "None") return + const entry = o.value + MutableHashMap.remove(self.state.map, key) + if (entry.refCount > 0) return + yield* core.scopeClose(entry.scope, core.exitVoid) + if (entry.fiber) yield* core.interruptFiber(entry.fiber) + }) +) diff --git a/packages/effect/test/RcMap.test.ts b/packages/effect/test/RcMap.test.ts index 2c52b1c2627..b45074a155c 100644 --- a/packages/effect/test/RcMap.test.ts +++ b/packages/effect/test/RcMap.test.ts @@ -89,6 +89,12 @@ describe("RcMap", () => { yield* TestClock.adjust(1000) deepStrictEqual(released, ["foo", "bar"]) + + yield* Effect.scoped(RcMap.get(map, "baz")) + deepStrictEqual(acquired, ["foo", "bar", "baz"]) + yield* RcMap.invalidate(map, "baz") + deepStrictEqual(acquired, ["foo", "bar", "baz"]) + deepStrictEqual(released, ["foo", "bar", "baz"]) })) it.scoped("capacity", () =>