Skip to content

Commit

Permalink
Use atomics for counting outstanding RPCs in the QPS benchmark (grpc#…
Browse files Browse the repository at this point in the history
…1298)

Motivation:

The QPS benchmark synchronises state (the number of outstanding RPCs and
whether RPCs should continue to be made) on an `EventLoop`. This incurs
unnecessary thread hops for every request.

Modifications:

- Use two atomics: one tracking how many RPCs are still active and
  another for whether we should continue making RPCs

Result:

Less benchmark overhead.
  • Loading branch information
glbrntt authored and bimawa committed Nov 10, 2021
1 parent c89df03 commit 92c4b20
Showing 1 changed file with 33 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,11 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {

private let stats: StatsWithLock

/// Has a stop been requested - if it has don't submit any more
/// requests and when all existing requests are complete signal
/// using `stopComplete`
private var stopRequested = false
/// Succeeds after a stop has been requested and all outstanding requests have completed.
private var stopComplete: EventLoopPromise<Void>
private var numberOfOutstandingRequests = 0

private let running: NIOAtomic<Bool> = .makeAtomic(value: false)
private let outstanding: NIOAtomic<Int> = .makeAtomic(value: 0)

private var requestMaker: RequestMakerType

Expand Down Expand Up @@ -205,50 +203,39 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {
)
}

/// Launch as many requests as allowed on the channel.
/// This must be called from the connection eventLoop.
/// Launch as many requests as allowed on the channel. Must only be called once.
private func launchRequests() {
self.eventLoop.preconditionInEventLoop()

while self.canMakeRequest {
self.makeRequestAndRepeat()
}
}

/// Returns if it is permissible to make another request - ie we've not been asked to stop, and we're not at the limit of outstanding requests.
private var canMakeRequest: Bool {
self.eventLoop.assertInEventLoop()
return !self.stopRequested
&& self.numberOfOutstandingRequests < self.maxPermittedOutstandingRequests
}

/// If there is spare permitted capacity make a request and repeat when it is done.
private func makeRequestAndRepeat() {
self.eventLoop.preconditionInEventLoop()
// Check for capacity.
if !self.canMakeRequest {
return
}
self.numberOfOutstandingRequests += 1
let resultStatus = self.requestMaker.makeRequest()
// The plan here is:
// - store the max number of outstanding requests in an atomic
// - start that many requests asynchronously
// - when a request finishes it will either start a new request or decrement the
// the atomic counter (if we've been told to stop)
// - if the counter drops to zero we're finished.
let exchangedRunning = self.running.compareAndExchange(expected: false, desired: true)
precondition(exchangedRunning, "launchRequests should only be called once")

// We only decrement the outstanding count when running has been changed back to false.
let exchangedOutstanding = self.outstanding.compareAndExchange(
expected: 0,
desired: self.maxPermittedOutstandingRequests
)
precondition(exchangedOutstanding, "launchRequests should only be called once")

// Wait for the request to complete.
resultStatus.whenSuccess { status in
self.requestCompleted(status: status)
for _ in 0 ..< self.maxPermittedOutstandingRequests {
self.requestMaker.makeRequest().whenComplete { _ in
self.makeRequest()
}
}
}

/// Call when a request has completed.
/// Records stats and attempts to make more requests if there is available capacity.
private func requestCompleted(status: GRPCStatus) {
self.eventLoop.preconditionInEventLoop()
self.numberOfOutstandingRequests -= 1
if self.stopRequested, self.numberOfOutstandingRequests == 0 {
private func makeRequest() {
if self.running.load() {
self.requestMaker.makeRequest().whenComplete { _ in
self.makeRequest()
}
} else if self.outstanding.sub(1) == 1 {
self.stopIsComplete()
} else {
// Try scheduling another request.
self.makeRequestAndRepeat()
}
} // else we're no longer running but not all RPCs have finished.
}

/// Get stats for sending to the driver.
Expand All @@ -261,32 +248,19 @@ final class AsyncQPSClient<RequestMakerType: RequestMaker>: QPSClient {

/// Start sending requests to the server.
func start() {
if self.eventLoop.inEventLoop {
self.launchRequests()
} else {
self.eventLoop.execute {
self.launchRequests()
}
}
self.launchRequests()
}

private func stopIsComplete() {
assert(self.stopRequested)
assert(self.numberOfOutstandingRequests == 0)
// Close the connection then signal done.
self.channel.close().cascade(to: self.stopComplete)
}

/// Stop sending requests to the server.
/// - returns: A future which can be waited on to signal when all activity has ceased.
func stop() -> EventLoopFuture<Void> {
self.eventLoop.execute {
self.stopRequested = true
self.requestMaker.requestStop()
if self.numberOfOutstandingRequests == 0 {
self.stopIsComplete()
}
}
self.requestMaker.requestStop()
self.running.store(false)
return self.stopComplete.futureResult
}
}
Expand Down

0 comments on commit 92c4b20

Please sign in to comment.