From 892b81861e10613db9f8d2f01f8149b7bc205361 Mon Sep 17 00:00:00 2001 From: Thomas Mellenthin Date: Tue, 6 Sep 2022 14:14:44 +0200 Subject: [PATCH 1/2] Amb: forward cancel to the inner publishers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We noticed that cancelling an Amb publisher does not cancel any of the inner publishers. It looks like nil-ing firstSink / secondSink should do the job, but it does not work. Adding explicit cancel fixes the issue, but I’m not sure if this is the right solution. --- Sources/Operators/Amb.swift | 2 ++ Tests/AmbTests.swift | 31 +++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/Sources/Operators/Amb.swift b/Sources/Operators/Amb.swift index 5801e6c..5fe34cc 100644 --- a/Sources/Operators/Amb.swift +++ b/Sources/Operators/Amb.swift @@ -144,7 +144,9 @@ private extension Publishers.Amb { } func cancel() { + firstSink?.cancelUpstream() firstSink = nil + secondSink?.cancelUpstream() secondSink = nil } } diff --git a/Tests/AmbTests.swift b/Tests/AmbTests.swift index fa8177a..917cf54 100644 --- a/Tests/AmbTests.swift +++ b/Tests/AmbTests.swift @@ -47,6 +47,37 @@ class AmbTests: XCTestCase { XCTAssertEqual(completion, .finished) } + func testAmbCancelPreSubscription() { + enum CancelError: Swift.Error { + case cancelled + } + var ambPublisher: AnyCancellable? + + var firstCompletion: Subscribers.Completion? + let subject1 = PassthroughSubject() + let subject1Publisher = subject1 + .handleEvents(receiveCancel: { + firstCompletion = .failure(CancelError.cancelled) + }) + .eraseToAnyPublisher() + + var secondCompletion: Subscribers.Completion? + let subject2 = PassthroughSubject() + let subject2Publisher = subject2 + .handleEvents(receiveCancel: { + secondCompletion = .failure(CancelError.cancelled) + }) + .eraseToAnyPublisher() + + ambPublisher = Publishers.Amb(first: subject1Publisher, second: subject2Publisher) + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in }) + ambPublisher?.cancel() + + XCTAssertEqual(firstCompletion, .failure(CancelError.cancelled)) + XCTAssertEqual(secondCompletion, .failure(CancelError.cancelled)) + } + func testAmbLimitedPreDemand() { let subject1 = PassthroughSubject() let subject2 = PassthroughSubject() From 1f30938a8ae83e3a5232eb0504e2d5c4e4c8ea33 Mon Sep 17 00:00:00 2001 From: Thomas Mellenthin Date: Wed, 7 Sep 2022 12:00:18 +0200 Subject: [PATCH 2/2] Amb: cancel the publisher that looses the race immediately --- Sources/Operators/Amb.swift | 2 ++ Tests/AmbTests.swift | 52 ++++++++++++++++++++++++++++--------- 2 files changed, 42 insertions(+), 12 deletions(-) diff --git a/Sources/Operators/Amb.swift b/Sources/Operators/Amb.swift index 5fe34cc..71e692d 100644 --- a/Sources/Operators/Amb.swift +++ b/Sources/Operators/Amb.swift @@ -103,8 +103,10 @@ private extension Publishers.Amb { guard let decision = decision else { return } switch decision { case .first: + secondSink?.cancelUpstream() secondSink = nil case .second: + firstSink?.cancelUpstream() firstSink = nil } diff --git a/Tests/AmbTests.swift b/Tests/AmbTests.swift index 917cf54..dd5ceec 100644 --- a/Tests/AmbTests.swift +++ b/Tests/AmbTests.swift @@ -48,34 +48,62 @@ class AmbTests: XCTestCase { } func testAmbCancelPreSubscription() { - enum CancelError: Swift.Error { - case cancelled - } - var ambPublisher: AnyCancellable? + let ambPublisher: AnyCancellable? - var firstCompletion: Subscribers.Completion? - let subject1 = PassthroughSubject() + let subject1Cancelled = expectation(description: "first publisher cancelled") + let subject1 = PassthroughSubject() let subject1Publisher = subject1 .handleEvents(receiveCancel: { - firstCompletion = .failure(CancelError.cancelled) + subject1Cancelled.fulfill() }) .eraseToAnyPublisher() - var secondCompletion: Subscribers.Completion? - let subject2 = PassthroughSubject() + let subject2Cancelled = expectation(description: "second publisher cancelled") + let subject2 = PassthroughSubject() let subject2Publisher = subject2 .handleEvents(receiveCancel: { - secondCompletion = .failure(CancelError.cancelled) + subject2Cancelled.fulfill() }) .eraseToAnyPublisher() ambPublisher = Publishers.Amb(first: subject1Publisher, second: subject2Publisher) .sink(receiveCompletion: { _ in }, receiveValue: { _ in }) + + // cancelling amb should cancel the inner publishers ambPublisher?.cancel() - XCTAssertEqual(firstCompletion, .failure(CancelError.cancelled)) - XCTAssertEqual(secondCompletion, .failure(CancelError.cancelled)) + waitForExpectations(timeout: 0.01) + } + + func testAmbCancelPostSubscription() { + let subject1 = PassthroughSubject() + var subject1cancelCounter = 0 + let subject1Publisher = subject1 + .handleEvents(receiveCancel: { + subject1cancelCounter += 1 + }) + .eraseToAnyPublisher() + + let subject2 = PassthroughSubject() + var subject2cancelCounter = 0 + let subject2Publisher = subject2 + .handleEvents(receiveCancel: { + subject2cancelCounter += 1 + }) + .eraseToAnyPublisher() + + Publishers.Amb(first: subject1Publisher, second: subject2Publisher) + .sink(receiveCompletion: { _ in }, + receiveValue: { _ in }) + .store(in: &subscriptions) + + // subject1 wins the race, so 2 has to be cancelled + subject1.send(1) + + // At dealloc both publishes are cancelled, so we cannot use expectations here and count the cancel events instead + XCTAssertEqual(subject1cancelCounter, 0) + XCTAssertEqual(subject2cancelCounter, 1) } func testAmbLimitedPreDemand() {