Skip to content

Commit

Permalink
Store.waitUntilAllEventConsumed (#465)
Browse files Browse the repository at this point in the history
  • Loading branch information
muukii authored Mar 23, 2024
1 parent 4d479c7 commit 419af19
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 30 deletions.
7 changes: 6 additions & 1 deletion Sources/Verge/Library/EventEmitter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,12 @@ protocol EventEmitterType: AnyObject, Sendable {
func removeEventHandler(_ token: EventEmitterCancellable)
}

public protocol EventEmitterEventType {
func onComsume()
}

/// Instead of Combine
open class EventEmitter<Event>: EventEmitterType, @unchecked Sendable {
open class EventEmitter<Event: EventEmitterEventType>: EventEmitterType, @unchecked Sendable {

public var publisher: Publisher {
return .init(eventEmitter: self)
Expand Down Expand Up @@ -107,6 +111,7 @@ open class EventEmitter<Event>: EventEmitterType, @unchecked Sendable {

for subscriber in capturedSubscribers {
subscriber.1(event)
event.onComsume()
}
}

Expand Down
31 changes: 29 additions & 2 deletions Sources/Verge/Store/Store.swift
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,27 @@ public typealias NoActivityStoreBase<State: Equatable> = Store<State, Never>

private let sanitizerQueue = DispatchQueue.init(label: "org.vergegroup.verge.sanitizer")

public enum _StoreEvent<State: Equatable, Activity> {
public enum _StoreEvent<State: Equatable, Activity>: EventEmitterEventType {

public enum StateEvent {
case willUpdate
case didUpdate(Changes<State>)
}

case state(StateEvent)
case activity(Activity)
case waiter(() -> Void)

public func onComsume() {
switch self {
case .state:
break
case .activity:
break
case .waiter(let closure):
closure()
}
}
}

actor Writer {
Expand Down Expand Up @@ -219,6 +231,8 @@ open class Store<State: Equatable, Activity>: EventEmitter<_StoreEvent<State, Ac
}
case .activity:
break
case .waiter:
break
}
}

Expand Down Expand Up @@ -306,6 +320,19 @@ extension Store {

}

// MARK: - Wait
extension Store {

public func waitUntilAllEventConsumed() async {
await withUnsafeContinuation { c in
accept(.waiter({
c.resume()
}))
}
}

}

// MARK: - Middleware
extension Store {

Expand Down
6 changes: 5 additions & 1 deletion Sources/VergeClassic/Storage.swift
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ import Foundation

open class ReadonlyStorage<Value>: @unchecked Sendable, CustomReflectable {

public enum Event {
public enum Event: EventEmitterEventType {
case willUpdate
case didUpdate(Value)
case willDeinit

public func onComsume() {

}
}

private let eventEmitter = EventEmitter<Event>()
Expand Down
64 changes: 38 additions & 26 deletions Tests/VergeTests/EventEmitterTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,34 @@ import Combine

@available(iOS 13.0, *)
class EventEmitterTests: XCTestCase {


struct _Event: EventEmitterEventType {
let value: String

init(_ value: String) {
self.value = value
}

func onComsume() {
print("consume")
}
}

private var subscriptions = Set<AnyCancellable>()

@available(iOS 13, *)
func testPublisher() {

let emitter = EventEmitter<String>()
let emitter = EventEmitter<_Event>()

let waiter = XCTestExpectation()

emitter
.publisher
.handleEvents(receiveSubscription: { (sub) in
print(sub)
}, receiveOutput: { (value) in
XCTAssertEqual(value, "Hello")
XCTAssertEqual(value.value, "Hello")
waiter.fulfill()
}, receiveCompletion: { (completion) in

Expand All @@ -43,16 +55,16 @@ class EventEmitterTests: XCTestCase {
.connect()
.store(in: &subscriptions)

emitter.accept("Hello")
emitter.accept(.init("Hello"))

wait(for: [waiter], timeout: 10)
}

@available(iOS 13, *)
func testPublisherMultiple() {

let emitter = EventEmitter<String>()
let emitter = EventEmitter<_Event>()

let waiter1 = XCTestExpectation()
let waiter2 = XCTestExpectation()
let waiter3 = XCTestExpectation()
Expand All @@ -78,15 +90,15 @@ class EventEmitterTests: XCTestCase {
}
.store(in: &subscriptions)

emitter.accept("Hello")
emitter.accept(.init("Hello"))

wait(for: [waiter1, waiter2, waiter3], timeout: 10)
}


func testRegistrationPerformance() {

let emitter = EventEmitter<Void>()
let emitter = EventEmitter<_Event>()
measure(metrics: [XCTMemoryMetric(), XCTCPUMetric(), XCTClockMetric()]) {
for _ in 0..<1000 {
emitter.addEventHandler { _ in
Expand All @@ -99,61 +111,61 @@ class EventEmitterTests: XCTestCase {

func testEmittingPerformance() {

let emitter = EventEmitter<Void>()
let emitter = EventEmitter<_Event>()

measure(metrics: [XCTMemoryMetric(), XCTCPUMetric()]) {
for _ in 0..<10000 {
emitter.accept(())
emitter.accept(.init(""))
}
}

}

func testOrder() {

let emitter = EventEmitter<Int>()
let emitter = EventEmitter<_Event>()

var results_1 = [Int]()
var results_1 = [_Event]()
emitter.addEventHandler { value in
results_1.append(value)

if value == 1 {
emitter.accept(2)
if value.value == "1" {
emitter.accept(.init("2"))
}
}

var results_2 = [Int]()
var results_2 = [_Event]()
emitter.addEventHandler { value in
results_2.append(value)
}

emitter.accept(1)
emitter.accept(.init("1"))

XCTAssertEqual(results_1, [1, 2])
XCTAssertEqual(results_2, [1, 2])
XCTAssertEqual(results_1.map(\.value), ["1", "2"])
XCTAssertEqual(results_2.map(\.value), ["1", "2"])

}

func testEmitsAll() {

let emitter = EventEmitter<Int>()
let emitter = EventEmitter<_Event>()

emitter.addEventHandler { value in
}

let outputs = VergeConcurrency.UnfairLockAtomic.init([Int]())
let outputs = VergeConcurrency.UnfairLockAtomic.init([_Event]())
emitter.addEventHandler { value in
outputs.modify({
$0.append(value)
})
}

let inputs = VergeConcurrency.UnfairLockAtomic.init([Int]())
let inputs = VergeConcurrency.UnfairLockAtomic.init([_Event]())
DispatchQueue.concurrentPerform(iterations: 500) { i in
inputs.modify {
$0.append(i)
$0.append(.init("\(i)"))
}
emitter.accept(i)
emitter.accept(.init("\(i)"))
}

XCTAssertEqual(outputs.value.count, 500)
Expand Down

0 comments on commit 419af19

Please sign in to comment.