Skip to content

Commit

Permalink
add RcMap.invalidate api, for removing a resource from an RcMap (#4278)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored and effect-bot committed Jan 17, 2025
1 parent ce0e614 commit 12f4616
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 100 deletions.
5 changes: 5 additions & 0 deletions .changeset/new-numbers-visit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

add RcMap.invalidate api, for removing a resource from an RcMap
13 changes: 1 addition & 12 deletions packages/effect/src/Effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13836,15 +13836,4 @@ function fnApply(options: {
* @since 3.12.0
* @category Tracing
*/
export const fnUntraced: fn.Gen = (body: Function, ...pipeables: Array<any>) =>
pipeables.length === 0
? function(this: any, ...args: Array<any>) {
return core.fromIterator(() => body.apply(this, args))
}
: function(this: any, ...args: Array<any>) {
let effect = core.fromIterator(() => body.apply(this, args))
for (const x of pipeables) {
effect = x(effect)
}
return effect
}
export const fnUntraced: fn.Gen = core.fnUntraced
9 changes: 9 additions & 0 deletions packages/effect/src/RcMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,12 @@ export const get: {
* @category combinators
*/
export const keys: <K, A, E>(self: RcMap<K, A, E>) => Effect.Effect<Array<K>, E> = internal.keys

/**
* @since 3.13.0
* @category combinators
*/
export const invalidate: {
<K>(key: K): <A, E>(self: RcMap<K, A, E>) => Effect.Effect<void>
<K, A, E>(self: RcMap<K, A, E>, key: K): Effect.Effect<void>
} = internal.invalidate
14 changes: 14 additions & 0 deletions packages/effect/src/internal/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,20 @@ export const gen: typeof Effect.gen = function() {
return fromIterator(() => f(pipe))
}

/** @internal */
export const fnUntraced: Effect.fn.Gen = (body: Function, ...pipeables: Array<any>) =>
pipeables.length === 0
? function(this: any, ...args: Array<any>) {
return fromIterator(() => body.apply(this, args))
}
: function(this: any, ...args: Array<any>) {
let effect = fromIterator(() => body.apply(this, args))
for (const x of pipeables) {
effect = x(effect)
}
return effect
}

/* @internal */
export const withConcurrency = dual<
(concurrency: number | "unbounded") => <A, E, R>(self: Effect.Effect<A, E, R>) => Effect.Effect<A, E, R>,
Expand Down
196 changes: 108 additions & 88 deletions packages/effect/src/internal/rcMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ declare namespace State {
interface Entry<A, E> {
readonly deferred: Deferred.Deferred<A, E>
readonly scope: Scope.CloseableScope
readonly finalizer: Effect<void>
fiber: RuntimeFiber<void, never> | undefined
refCount: number
}
Expand Down Expand Up @@ -121,96 +122,96 @@ export const make: {
export const get: {
<K>(key: K): <A, E>(self: RcMap.RcMap<K, A, E>) => Effect<A, E, Scope.Scope>
<K, A, E>(self: RcMap.RcMap<K, A, E>, key: K): Effect<A, E, Scope.Scope>
} = dual(
2,
<K, A, E>(self_: RcMap.RcMap<K, A, E>, key: K): Effect<A, E, Scope.Scope> => {
const self = self_ as RcMapImpl<K, A, E>
return core.uninterruptibleMask((restore) =>
core.suspend(() => {
if (self.state._tag === "Closed") {
return core.interrupt
}
const state = self.state
const o = MutableHashMap.get(state.map, key)
if (o._tag === "Some") {
const entry = o.value
entry.refCount++
return entry.fiber
? core.as(core.interruptFiber(entry.fiber), entry)
: core.succeed(entry)
} else if (Number.isFinite(self.capacity) && MutableHashMap.size(self.state.map) >= self.capacity) {
return core.fail(
new core.ExceededCapacityException(`RcMap attempted to exceed capacity of ${self.capacity}`)
) as Effect<never>
} = dual(2, <K, A, E>(self_: RcMap.RcMap<K, A, E>, key: K): Effect<A, E, Scope.Scope> => {
const self = self_ as RcMapImpl<K, A, E>
return core.uninterruptibleMask((restore) => getImpl(self, key, restore as any))
})

const getImpl = core.fnUntraced(function*<K, A, E>(self: RcMapImpl<K, A, E>, key: K, restore: <A>(a: A) => A) {
if (self.state._tag === "Closed") {
return yield* core.interrupt
}
const state = self.state
const o = MutableHashMap.get(state.map, key)
let entry: State.Entry<A, E>
if (o._tag === "Some") {
entry = o.value
entry.refCount++
if (entry.fiber) yield* core.interruptFiber(entry.fiber)
} else if (Number.isFinite(self.capacity) && MutableHashMap.size(self.state.map) >= self.capacity) {
return yield* core.fail(
new core.ExceededCapacityException(`RcMap attempted to exceed capacity of ${self.capacity}`)
) as Effect<never>
} else {
entry = yield* self.semaphore.withPermits(1)(acquire(self, key, restore))
}
const scope = yield* fiberRuntime.scopeTag
yield* scope.addFinalizer(() => entry.finalizer)
return yield* restore(core.deferredAwait(entry.deferred))
})

const acquire = core.fnUntraced(function*<K, A, E>(self: RcMapImpl<K, A, E>, key: K, restore: <A>(a: A) => A) {
const scope = yield* fiberRuntime.scopeMake()
const deferred = yield* core.deferredMake<A, E>()
const acquire = self.lookup(key)
yield* restore(core.fiberRefLocally(
acquire as Effect<A, E>,
core.currentContext,
Context.add(self.context, fiberRuntime.scopeTag, scope)
)).pipe(
core.exit,
core.flatMap((exit) => core.deferredDone(deferred, exit)),
circular.forkIn(scope)
)
const entry: State.Entry<A, E> = {
deferred,
scope,
finalizer: undefined as any,
fiber: undefined,
refCount: 1
}
;(entry as any).finalizer = release(self, key, entry)
if (self.state._tag === "Open") {
MutableHashMap.set(self.state.map, key, entry)
}
return entry
})

const release = <K, A, E>(self: RcMapImpl<K, A, E>, key: K, entry: State.Entry<A, E>) =>
core.suspend(() => {
entry.refCount--
if (entry.refCount > 0) {
return core.void
} else if (
self.state._tag === "Closed"
|| !MutableHashMap.has(self.state.map, key)
|| self.idleTimeToLive === undefined
) {
if (self.state._tag === "Open") {
MutableHashMap.remove(self.state.map, key)
}
return core.scopeClose(entry.scope, core.exitVoid)
}

return coreEffect.sleep(self.idleTimeToLive).pipe(
core.interruptible,
core.zipRight(core.suspend(() => {
if (self.state._tag === "Open" && entry.refCount === 0) {
MutableHashMap.remove(self.state.map, key)
return core.scopeClose(entry.scope, core.exitVoid)
}
const acquire = self.lookup(key)
return fiberRuntime.scopeMake().pipe(
coreEffect.bindTo("scope"),
coreEffect.bind("deferred", () => core.deferredMake<A, E>()),
core.tap(({ deferred, scope }) =>
restore(core.fiberRefLocally(
acquire as Effect<A, E>,
core.currentContext,
Context.add(self.context, fiberRuntime.scopeTag, scope)
)).pipe(
core.exit,
core.flatMap((exit) => core.deferredDone(deferred, exit)),
circular.forkIn(scope)
)
),
core.map(({ deferred, scope }) => {
const entry: State.Entry<A, E> = {
deferred,
scope,
fiber: undefined,
refCount: 1
}
MutableHashMap.set(state.map, key, entry)
return entry
})
)
}).pipe(
self.semaphore.withPermits(1),
coreEffect.bindTo("entry"),
coreEffect.bind("scope", () => fiberRuntime.scopeTag),
core.tap(({ entry, scope }) =>
scope.addFinalizer(() =>
core.suspend(() => {
entry.refCount--
if (entry.refCount > 0) {
return core.void
} else if (self.idleTimeToLive === undefined) {
if (self.state._tag === "Open") {
MutableHashMap.remove(self.state.map, key)
}
return core.scopeClose(entry.scope, core.exitVoid)
}
return coreEffect.sleep(self.idleTimeToLive).pipe(
core.interruptible,
core.zipRight(core.suspend(() => {
if (self.state._tag === "Open" && entry.refCount === 0) {
MutableHashMap.remove(self.state.map, key)
return core.scopeClose(entry.scope, core.exitVoid)
}
return core.void
})),
fiberRuntime.ensuring(core.sync(() => {
entry.fiber = undefined
})),
circular.forkIn(self.scope),
core.tap((fiber) => {
entry.fiber = fiber
}),
self.semaphore.withPermits(1)
)
})
)
),
core.flatMap(({ entry }) => restore(core.deferredAwait(entry.deferred)))
)
return core.void
})),
fiberRuntime.ensuring(core.sync(() => {
entry.fiber = undefined
})),
circular.forkIn(self.scope),
core.tap((fiber) => {
entry.fiber = fiber
}),
self.semaphore.withPermits(1)
)
}
)
})

/** @internal */
export const keys = <K, A, E>(self: RcMap.RcMap<K, A, E>): Effect<Array<K>> => {
Expand All @@ -219,3 +220,22 @@ export const keys = <K, A, E>(self: RcMap.RcMap<K, A, E>): Effect<Array<K>> => {
impl.state._tag === "Closed" ? core.interrupt : core.succeed(MutableHashMap.keys(impl.state.map))
)
}

/** @internal */
export const invalidate: {
<K>(key: K): <A, E>(self: RcMap.RcMap<K, A, E>) => Effect<void>
<K, A, E>(self: RcMap.RcMap<K, A, E>, key: K): Effect<void>
} = dual(
2,
core.fnUntraced(function*<K, A, E>(self_: RcMap.RcMap<K, A, E>, key: K) {
const self = self_ as RcMapImpl<K, A, E>
if (self.state._tag === "Closed") return
const o = MutableHashMap.get(self.state.map, key)
if (o._tag === "None") return
const entry = o.value
MutableHashMap.remove(self.state.map, key)
if (entry.refCount > 0) return
yield* core.scopeClose(entry.scope, core.exitVoid)
if (entry.fiber) yield* core.interruptFiber(entry.fiber)
})
)
6 changes: 6 additions & 0 deletions packages/effect/test/RcMap.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ describe("RcMap", () => {

yield* TestClock.adjust(1000)
assert.deepStrictEqual(released, ["foo", "bar"])

yield* Effect.scoped(RcMap.get(map, "baz"))
assert.deepStrictEqual(acquired, ["foo", "bar", "baz"])
yield* RcMap.invalidate(map, "baz")
assert.deepStrictEqual(acquired, ["foo", "bar", "baz"])
assert.deepStrictEqual(released, ["foo", "bar", "baz"])
}))

it.scoped("capacity", () =>
Expand Down

0 comments on commit 12f4616

Please sign in to comment.