Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
muukii committed Mar 4, 2024
1 parent 689029b commit 2455b28
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 55 deletions.
4 changes: 2 additions & 2 deletions Sources/Verge/Derived/Derived+Assign.swift
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ extension StoreDriverType {
public func assign(
queue: some TargetQueueType = .passthrough,
to binder: @escaping (Changes<TargetStore.State>) -> Void
) -> StoreSubscription {
) -> StoreStateSubscription {
store.asStore().sinkState(queue: queue, receive: binder)
}

Expand All @@ -41,7 +41,7 @@ extension StoreDriverType {
public func assign(
queue: MainActorTargetQueue,
to binder: @escaping (Changes<TargetStore.State>) -> Void
) -> StoreSubscription {
) -> StoreStateSubscription {
store.asStore().sinkState(queue: queue, receive: binder)
}

Expand Down
6 changes: 3 additions & 3 deletions Sources/Verge/Derived/Derived.swift
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public class Derived<Value: Equatable>: Store<Value, Never>, DerivedType, @unche
dropsFirst: Bool = false,
queue: some TargetQueueType,
receive: @escaping (Changes<Value>) -> Void
) -> StoreSubscription {
) -> StoreStateSubscription {
_primitive_sinkState(
dropsFirst: dropsFirst,
queue: queue,
Expand Down Expand Up @@ -227,7 +227,7 @@ extension Derived where Value : Equatable {
dropsFirst: Bool = false,
queue: some TargetQueueType,
receive: @escaping (Value) -> Void
) -> StoreSubscription {
) -> StoreStateSubscription {
sinkState(dropsFirst: dropsFirst, queue: queue) { (changes) in
changes.ifChanged().do { value in
receive(value)
Expand All @@ -244,7 +244,7 @@ extension Derived where Value : Equatable {
dropsFirst: Bool = false,
queue: MainActorTargetQueue = .mainIsolated(),
receive: @escaping @MainActor (Value) -> Void
) -> StoreSubscription {
) -> StoreStateSubscription {
sinkState(dropsFirst: dropsFirst, queue: queue) { @MainActor changes in
changes.ifChanged().do { value in
receive(value)
Expand Down
147 changes: 141 additions & 6 deletions Sources/Verge/Library/StoreSubscription.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,158 @@ public final class StoreSubscription: Hashable, Cancellable, @unchecked Sendable
associatedStore = nil
}

func associate(store: some StoreType) -> StoreSubscription {
ensureAlive()
associatedStore = store
return self
}

func associate(object: AnyObject) -> StoreSubscription {
ensureAlive()
associatedReferences.append(object)
return self
}

/**
Make this subscription alive while the source is active.
the source means a root data store which is Store.

In case of Derived, the source will be Derived's upstream.
If the upstream invalidated, this subscription will stop.
*/
@discardableResult
public func storeWhileSourceActive() -> StoreSubscription {
ensureAlive()
assert(storeCancellable != nil)
storeCancellable?.associate(self)
return self
}

@inline(__always)
private func ensureAlive() {
assert(wasCancelled.load(ordering: .relaxed) == false)
}

/**
Converts to Combine.AnyCancellable to make it auto cancellable.
*/
public func asAny() -> AnyCancellable {
return .init { [self] in
self.cancel()
}
}

deinit {
cancel()
}
}

public final class StoreStateSubscription: Hashable, Cancellable, @unchecked Sendable {

enum Action {
case suspend
case resume
}

public static func == (lhs: StoreStateSubscription, rhs: StoreStateSubscription) -> Bool {
lhs === rhs
}

public func hash(into hasher: inout Hasher) {
ObjectIdentifier(self).hash(into: &hasher)
}

private let wasCancelled = ManagedAtomic(false)

private var entranceForSuspension: ManagedAtomic<Int8> = .init(0)

private var source: AtomicReferenceStorage<EventEmitterCancellable>

// TODO: can't be sendable
private weak var storeCancellable: VergeAnyCancellable?
private var associatedStore: (any StoreType)?
private var associatedReferences: [AnyObject] = []
private let onAction: (StoreStateSubscription, Action) -> Void

init(
_ eventEmitterCancellable: EventEmitterCancellable,
storeCancellable: VergeAnyCancellable,
onAction: @escaping (StoreStateSubscription, Action) -> Void
) {
self.source = .init(eventEmitterCancellable)
self.storeCancellable = storeCancellable
self.onAction = onAction
}

public func cancel() {

guard wasCancelled.compareExchange(expected: false, desired: true, ordering: .sequentiallyConsistent).exchanged else {
return
}

source.dispose().cancel()

associatedStore = nil
}

func cancelSubscription() {

guard wasCancelled.load(ordering: .sequentiallyConsistent) == false else {
return
}

source.dispose().cancel()

}

func replace(cancellable: consuming EventEmitterCancellable) {

guard wasCancelled.load(ordering: .sequentiallyConsistent) == false else {
return
}

AtomicReferenceStorage.atomicStore(cancellable, at: &source, ordering: .relaxed)
}

public func suspend() {

guard wasCancelled.load(ordering: .sequentiallyConsistent) == false else {
return
}

guard entranceForSuspension.loadThenWrappingIncrement(ordering: .sequentiallyConsistent) == 0 else {
assertionFailure("currently operating")
return
}

onAction(self, .suspend)

entranceForSuspension.wrappingDecrement(ordering: .sequentiallyConsistent)
}

public func resume() {

guard wasCancelled.load(ordering: .sequentiallyConsistent) == false else {
return
}

guard entranceForSuspension.loadThenWrappingIncrement(ordering: .sequentiallyConsistent) == 0 else {
assertionFailure("currently operating")
return
}

onAction(self, .resume)

entranceForSuspension.wrappingDecrement(ordering: .sequentiallyConsistent)
}

func associate(store: some StoreType) -> StoreSubscription {
func associate(store: some StoreType) -> StoreStateSubscription {
ensureAlive()
associatedStore = store
return self
}

func associate(object: AnyObject) -> StoreSubscription {
func associate(object: AnyObject) -> StoreStateSubscription {
ensureAlive()
associatedReferences.append(object)
return self
Expand All @@ -68,7 +205,7 @@ public final class StoreSubscription: Hashable, Cancellable, @unchecked Sendable
If the upstream invalidated, this subscription will stop.
*/
@discardableResult
public func storeWhileSourceActive() -> StoreSubscription {
public func storeWhileSourceActive() -> StoreStateSubscription {
ensureAlive()
assert(storeCancellable != nil)
storeCancellable?.associate(self)
Expand All @@ -77,7 +214,7 @@ public final class StoreSubscription: Hashable, Cancellable, @unchecked Sendable

@inline(__always)
private func ensureAlive() {
assert(wasCancelled.load(ordering: .relaxed) == false)
assert(wasCancelled.load(ordering: .sequentiallyConsistent) == false)
}

/**
Expand All @@ -93,5 +230,3 @@ public final class StoreSubscription: Hashable, Cancellable, @unchecked Sendable
cancel()
}
}


Loading

0 comments on commit 2455b28

Please sign in to comment.