From a08d232348e7224ff5f9dc0ecbf7ee77c978f1c6 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Thu, 1 Nov 2018 15:00:47 -0400 Subject: [PATCH] storage/cmdq: create new signal type for cmd completion signaling signal is a type that can signal the completion of an operation. The type has three benefits over using a channel directly and closing the channel when the operation completes: 1. signaled() uses atomics to provide a fast-path for checking whether the operation has completed. It is ~75x faster than using a channel for this purpose. 2. the type's channel is lazily initialized when signalChan() is called, avoiding the allocation when one is not needed. 3. because of 2, the type's zero value can be used directly. Release note: None --- pkg/storage/cmdq/signal.go | 102 +++++++++++++++++ pkg/storage/cmdq/signal_test.go | 195 ++++++++++++++++++++++++++++++++ 2 files changed, 297 insertions(+) create mode 100644 pkg/storage/cmdq/signal.go create mode 100644 pkg/storage/cmdq/signal_test.go diff --git a/pkg/storage/cmdq/signal.go b/pkg/storage/cmdq/signal.go new file mode 100644 index 000000000000..e71a58948554 --- /dev/null +++ b/pkg/storage/cmdq/signal.go @@ -0,0 +1,102 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package cmdq + +import ( + "sync/atomic" + "unsafe" +) + +const ( + // not yet signaled. + noSig int32 = iota + // signaled and the channel was not closed. + sig + // signaled and the channel was closed. + sigClosed +) + +// signal is a type that can signal the completion of an operation. +// +// The type has three benefits over using a channel directly and +// closing the channel when the operation completes: +// 1. signaled() uses atomics to provide a fast-path for checking +// whether the operation has completed. It is ~75x faster than +// using a channel for this purpose. +// 2. the receiver's channel is lazily initialized when signalChan() +// is called, avoiding the allocation when one is not needed. +// 3. because of 2, the type's zero value can be used directly. +// +type signal struct { + a int32 + c unsafe.Pointer // chan struct{}, lazily initialized +} + +func (s *signal) signal() { + if !atomic.CompareAndSwapInt32(&s.a, noSig, sig) { + panic("signaled twice") + } + // Close the channel if it was ever initialized. + if cPtr := atomic.LoadPointer(&s.c); cPtr != nil { + // Coordinate with signalChan to avoid double-closing. + if atomic.CompareAndSwapInt32(&s.a, sig, sigClosed) { + close(ptrToChan(cPtr)) + } + } +} + +func (s *signal) signaled() bool { + return atomic.LoadInt32(&s.a) > noSig +} + +func (s *signal) signalChan() <-chan struct{} { + // If the signal has already been signaled, return a closed channel. + if s.signaled() { + return closedC + } + + // If the signal's channel has already been lazily initialized, return it. + if cPtr := atomic.LoadPointer(&s.c); cPtr != nil { + return ptrToChan(cPtr) + } + + // Lazily initialize the channel. + c := make(chan struct{}) + if !atomic.CompareAndSwapPointer(&s.c, nil, chanToPtr(c)) { + // We raced with another initialization. + return ptrToChan(atomic.LoadPointer(&s.c)) + } + + // Coordinate with signal to close the new channel, if necessary. + if atomic.CompareAndSwapInt32(&s.a, sig, sigClosed) { + close(c) + } + return c +} + +func chanToPtr(c chan struct{}) unsafe.Pointer { + return *(*unsafe.Pointer)(unsafe.Pointer(&c)) +} + +func ptrToChan(p unsafe.Pointer) chan struct{} { + return *(*chan struct{})(unsafe.Pointer(&p)) +} + +var closedC chan struct{} + +func init() { + closedC = make(chan struct{}) + close(closedC) +} diff --git a/pkg/storage/cmdq/signal_test.go b/pkg/storage/cmdq/signal_test.go new file mode 100644 index 000000000000..58755fabd841 --- /dev/null +++ b/pkg/storage/cmdq/signal_test.go @@ -0,0 +1,195 @@ +// Copyright 2018 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package cmdq + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSignal(t *testing.T) { + var s signal + require.False(t, s.signaled()) + + s.signal() + require.True(t, s.signaled()) + require.Equal(t, struct{}{}, <-s.signalChan()) +} + +func TestSignalConcurrency(t *testing.T) { + const trials = 100 + for i := 0; i < trials; i++ { + var s signal + var wg sync.WaitGroup + wg.Add(3) + go func() { + defer wg.Done() + <-s.signalChan() + require.True(t, s.signaled()) + }() + go func() { + defer wg.Done() + require.False(t, s.signaled()) + s.signal() + require.True(t, s.signaled()) + }() + go func() { + defer wg.Done() + <-s.signalChan() + require.True(t, s.signaled()) + }() + wg.Wait() + require.True(t, s.signaled()) + } +} + +func BenchmarkSignaled(b *testing.B) { + var s signal + s.signal() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = s.signaled() + } +} + +func BenchmarkSignalBeforeChan(b *testing.B) { + var s signal + for i := 0; i < b.N; i++ { + s = signal{} // reset + s.signal() + } +} + +func BenchmarkSignalAfterChan(b *testing.B) { + var s signal + chans := make([]chan struct{}, b.N) + for i := range chans { + chans[i] = make(chan struct{}) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + s = signal{} // reset + s.c = chanToPtr(chans[i]) + s.signal() + } +} + +func BenchmarkInitialChanBeforeSignal(b *testing.B) { + var s signal + for i := 0; i < b.N; i++ { + s = signal{} // reset + _ = s.signalChan() + } +} + +func BenchmarkSecondChanBeforeSignal(b *testing.B) { + var s signal + _ = s.signalChan() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = s.signalChan() + } +} + +func BenchmarkInitialChanAfterSignal(b *testing.B) { + var s signal + s.signal() + b.ResetTimer() + for i := 0; i < b.N; i++ { + s.c = nil + _ = s.signalChan() + } +} + +func BenchmarkSecondChanAfterSignal(b *testing.B) { + var s signal + s.signal() + _ = s.signalChan() + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = s.signalChan() + } +} + +// The following is a series of benchmarks demonstrating the value of the signal +// type and the fast-path that it provides. Closing channels to signal +// completion of a task is convenient, but in performance critical code paths it +// is essential to have a way to efficiently check for completion before falling +// back to waiting for the channel to close and entering select blocks. The +// benchmarks demonstrate that a channel on its own cannot be used to perform an +// efficient completion check, which is why the signal type mixes channels with +// atomics. The reason for this is that channels are forced to acquire an +// internal mutex before determining that they are closed and can return a zero +// value. This will always be more expensive than a single atomic load. +// +// Results with go1.10.4 on a Mac with a 3.1 GHz Intel Core i7 processor: +// +// ReadClosedChan-4 24.2ns ± 3% +// SingleSelectClosedChan-4 24.9ns ± 2% +// DefaultSelectClosedChan-4 24.6ns ± 1% +// MultiSelectClosedChan-4 97.9ns ± 2% +// Signaled-4 0.35ns ±13% +// + +func BenchmarkReadClosedChan(b *testing.B) { + c := make(chan struct{}) + close(c) + for i := 0; i < b.N; i++ { + <-c + } +} + +func BenchmarkSingleSelectClosedChan(b *testing.B) { + c := make(chan struct{}) + close(c) + //lint:ignore S1000 we don't want this simplified + for i := 0; i < b.N; i++ { + select { + case <-c: + } + } +} + +func BenchmarkDefaultSelectClosedChan(b *testing.B) { + c := make(chan struct{}) + close(c) + for i := 0; i < b.N; i++ { + select { + case <-c: + default: + } + } +} + +func BenchmarkMultiSelectClosedChan(b *testing.B) { + c, c2 := make(chan struct{}), make(chan struct{}) + close(c) + for i := 0; i < b.N; i++ { + select { + case <-c: + case <-c2: + } + } +} + +func BenchmarkAtomicLoad(b *testing.B) { + a := int32(1) + for i := 0; i < b.N; i++ { + _ = atomic.LoadInt32(&a) + } +}