Skip to content

Commit

Permalink
Use swift-atomics
Browse files Browse the repository at this point in the history
Motivation:

NIO deprecated its atomics in apple/swift-nio#2204.

Modifications:

- Use swift-atomics in the `QPSBenchmark` sub-package
- Use a lock in the one test which used `NIOAtomic`

Result:

Not using deprecated code.
  • Loading branch information
glbrntt committed Jul 11, 2022
1 parent d772b68 commit f0c4f80
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 13 deletions.
2 changes: 2 additions & 0 deletions Performance/QPSBenchmark/Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ let package = Package(
.package(url: "https://github.com/apple/swift-nio.git", from: "2.32.0"),
.package(url: "https://github.com/apple/swift-log.git", from: "1.4.0"),
.package(url: "https://github.com/apple/swift-argument-parser.git", from: "1.0.0"),
.package(url: "https://github.com/apple/swift-atomics.git", from: "1.0.2"),
.package(
url: "https://github.com/swift-server/swift-service-lifecycle.git",
from: "1.0.0-alpha"
Expand All @@ -42,6 +43,7 @@ let package = Package(
name: "QPSBenchmark",
dependencies: [
.product(name: "GRPC", package: "grpc-swift"),
.product(name: "Atomics", package: "swift-atomics"),
.product(name: "NIOCore", package: "swift-nio"),
.product(name: "NIOPosix", package: "swift-nio"),
.product(name: "NIOConcurrencyHelpers", package: "swift-nio"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/

import Atomics
import BenchmarkUtils
import Foundation
import GRPC
Expand Down Expand Up @@ -168,8 +169,8 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
/// Succeeds after a stop has been requested and all outstanding requests have completed.
private var stopComplete: EventLoopPromise<Void>

private let running: NIOAtomic<Bool> = .makeAtomic(value: false)
private let outstanding: NIOAtomic<Int> = .makeAtomic(value: 0)
private let running = ManagedAtomic<Bool>(false)
private let outstanding = ManagedAtomic<Int>(0)

private var requestMaker: RequestMakerType

Expand Down Expand Up @@ -211,15 +212,20 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
// - when a request finishes it will either start a new request or decrement the
// the atomic counter (if we've been told to stop)
// - if the counter drops to zero we're finished.
let exchangedRunning = self.running.compareAndExchange(expected: false, desired: true)
precondition(exchangedRunning, "launchRequests should only be called once")
let exchangedRunning = self.running.compareExchange(
expected: false,
desired: true,
ordering: .relaxed
)
precondition(exchangedRunning.exchanged, "launchRequests should only be called once")

// We only decrement the outstanding count when running has been changed back to false.
let exchangedOutstanding = self.outstanding.compareAndExchange(
let exchangedOutstanding = self.outstanding.compareExchange(
expected: 0,
desired: self.maxPermittedOutstandingRequests
desired: self.maxPermittedOutstandingRequests,
ordering: .relaxed
)
precondition(exchangedOutstanding, "launchRequests should only be called once")
precondition(exchangedOutstanding.exchanged, "launchRequests should only be called once")

for _ in 0 ..< self.maxPermittedOutstandingRequests {
self.requestMaker.makeRequest().whenComplete { _ in
Expand All @@ -229,11 +235,11 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
}

private func makeRequest() {
if self.running.load() {
if self.running.load(ordering: .relaxed) {
self.requestMaker.makeRequest().whenComplete { _ in
self.makeRequest()
}
} else if self.outstanding.sub(1) == 1 {
} else if self.outstanding.loadThenWrappingDecrement(ordering: .relaxed) == 1 {
self.stopIsComplete()
} // else we're no longer running but not all RPCs have finished.
}
Expand All @@ -260,7 +266,7 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
/// - returns: A future which can be waited on to signal when all activity has ceased.
func stop() -> EventLoopFuture<Void> {
self.requestMaker.requestStop()
self.running.store(false)
self.running.store(false, ordering: .relaxed)
return self.stopComplete.futureResult
}
}
Expand Down
15 changes: 12 additions & 3 deletions Tests/GRPCTests/StreamResponseHandlerRetainCycleTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,19 @@ final class StreamResponseHandlerRetainCycleTests: GRPCTestCase {

func testHandlerClosureIsReleasedOnceStreamEnds() {
final class Counter {
private let atomic = NIOAtomic.makeAtomic(value: 0)
func increment() { self.atomic.add(1) }
private let lock = Lock()
private var _value = 0

func increment() {
self.lock.withLockVoid {
self._value += 1
}
}

var value: Int {
self.atomic.load()
return self.lock.withLock {
self._value
}
}
}

Expand Down

0 comments on commit f0c4f80

Please sign in to comment.