Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
muukii committed Mar 22, 2024
1 parent 4ca9c46 commit 42ef2e4
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 75 deletions.
77 changes: 77 additions & 0 deletions Sources/Verge/Library/StoreActivitySubscription.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import Combine
@_implementationOnly import Atomics

/**
A subscription that is compatible with Combine’s Cancellable.
You can manage asynchronous tasks either call the ``cancel()`` to halt the subscription, or allow it to terminate upon instance deallocation, and by implementing the ``storeWhileSourceActive()`` technique, the subscription’s active status is maintained until the source store is released.
*/
public final class StoreActivitySubscription: Hashable, Cancellable, @unchecked Sendable {

public static func == (lhs: StoreActivitySubscription, rhs: StoreActivitySubscription) -> Bool {
lhs === rhs
}

public func hash(into hasher: inout Hasher) {
ObjectIdentifier(self).hash(into: &hasher)
}

private let wasCancelled = ManagedAtomic(false)

private let source: EventEmitterCancellable

// TODO: can't be sendable
private weak var storeCancellable: VergeAnyCancellable?
private var associatedStore: (any StoreType)?
private var associatedReferences: [AnyObject] = []

init(
_ eventEmitterCancellable: EventEmitterCancellable,
storeCancellable: VergeAnyCancellable
) {
self.source = eventEmitterCancellable
self.storeCancellable = storeCancellable
}

public func cancel() {

guard wasCancelled.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged else { return }

storeCancellable?.dissociate(self)
source.cancel()
associatedStore = nil
}

/**
Make this subscription alive while the source is active.
the source means a root data store which is Store.

In case of Derived, the source will be Derived's upstream.
If the upstream invalidated, this subscription will stop.
*/
@discardableResult
public func storeWhileSourceActive() -> StoreActivitySubscription {
ensureAlive()
assert(storeCancellable != nil)
storeCancellable?.associate(self)
return self
}

@inline(__always)
private func ensureAlive() {
assert(wasCancelled.load(ordering: .relaxed) == false)
}

/**
Converts to Combine.AnyCancellable to make it auto cancellable.
*/
public func asAny() -> AnyCancellable {
return .init { [self] in
self.cancel()
}
}

deinit {
cancel()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,76 +5,6 @@ import Combine
A subscription that is compatible with Combine’s Cancellable.
You can manage asynchronous tasks either call the ``cancel()`` to halt the subscription, or allow it to terminate upon instance deallocation, and by implementing the ``storeWhileSourceActive()`` technique, the subscription’s active status is maintained until the source store is released.
*/
public final class StoreSubscription: Hashable, Cancellable, @unchecked Sendable {

public static func == (lhs: StoreSubscription, rhs: StoreSubscription) -> Bool {
lhs === rhs
}

public func hash(into hasher: inout Hasher) {
ObjectIdentifier(self).hash(into: &hasher)
}

private let wasCancelled = ManagedAtomic(false)

private let source: EventEmitterCancellable

// TODO: can't be sendable
private weak var storeCancellable: VergeAnyCancellable?
private var associatedStore: (any StoreType)?
private var associatedReferences: [AnyObject] = []

init(
_ eventEmitterCancellable: EventEmitterCancellable,
storeCancellable: VergeAnyCancellable
) {
self.source = eventEmitterCancellable
self.storeCancellable = storeCancellable
}

public func cancel() {

guard wasCancelled.compareExchange(expected: false, desired: true, ordering: .relaxed).exchanged else { return }

storeCancellable?.dissociate(self)
source.cancel()
associatedStore = nil
}

/**
Make this subscription alive while the source is active.
the source means a root data store which is Store.

In case of Derived, the source will be Derived's upstream.
If the upstream invalidated, this subscription will stop.
*/
@discardableResult
public func storeWhileSourceActive() -> StoreSubscription {
ensureAlive()
assert(storeCancellable != nil)
storeCancellable?.associate(self)
return self
}

@inline(__always)
private func ensureAlive() {
assert(wasCancelled.load(ordering: .relaxed) == false)
}

/**
Converts to Combine.AnyCancellable to make it auto cancellable.
*/
public func asAny() -> AnyCancellable {
return .init { [self] in
self.cancel()
}
}

deinit {
cancel()
}
}

public final class StoreStateSubscription: Hashable, Cancellable, @unchecked Sendable {

enum Action {
Expand Down Expand Up @@ -161,7 +91,7 @@ public final class StoreStateSubscription: Hashable, Cancellable, @unchecked Sen
entranceForSuspension.wrappingDecrement(ordering: .sequentiallyConsistent)
}

guard isSuspending.compareExchange(expected: false, desired: true, ordering: .sequentiallyConsistent).exchanged else {
guard isSuspending.compareExchange(expected: false, desired: true, ordering: .sequentiallyConsistent).exchanged else {
return
}

Expand Down
4 changes: 2 additions & 2 deletions Sources/Verge/Store/Store.swift
Original file line number Diff line number Diff line change
Expand Up @@ -744,14 +744,14 @@ Latest Version (%d): (%@)
func _mainActor_sinkActivity(
queue: MainActorTargetQueue,
receive: @escaping @MainActor (Activity) -> Void
) -> StoreSubscription {
) -> StoreActivitySubscription {
return _primitive_sinkActivity(queue: Queues.MainActor(queue), receive: receive)
}

func _primitive_sinkActivity(
queue: some TargetQueueType,
receive: @escaping (Activity) -> Void
) -> StoreSubscription {
) -> StoreActivitySubscription {

let execute = queue.execute
let cancellable = self._sinkActivityEvent { activity in
Expand Down
4 changes: 2 additions & 2 deletions Sources/Verge/Store/StoreDriverType.swift
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ extension StoreDriverType where Scope == TargetStore.State {
public func sinkActivity(
queue: some TargetQueueType,
receive: @escaping (TargetStore.Activity) -> Void
) -> StoreSubscription {
) -> StoreActivitySubscription {

store.asStore()._primitive_sinkActivity(queue: queue, receive: receive)

Expand All @@ -184,7 +184,7 @@ extension StoreDriverType where Scope == TargetStore.State {
public func sinkActivity(
queue: MainActorTargetQueue = .mainIsolated(),
receive: @escaping @MainActor (TargetStore.Activity) -> Void
) -> StoreSubscription {
) -> StoreActivitySubscription {

store.asStore()._mainActor_sinkActivity(queue: queue) { activity in
thunkToMainActor {
Expand Down

0 comments on commit 42ef2e4

Please sign in to comment.