From 8bb6d197120cde43e6c024cac459e37b686ddf2f Mon Sep 17 00:00:00 2001 From: Alex Barganier Date: Thu, 17 Aug 2023 15:33:32 -0400 Subject: [PATCH 1/2] pkg/util/log: alter bufferedSink to handle writes during sync flush Previously, if the bufferedSink had a synchronous flush scheduled, and an additional write (via the `output()` function) was sent to the bufferedSink, the bufferedSink would panic. After some investigation & analysis of the code, this approach was found to be unnecessary. We can gracefully handle this scenario without panicking. Instead, we can buffer the message to be included in the upcoming flush. In this scenario, if an additional forceSync output() call is sent to the bufferedSink, when one is already scheduled, we cannot make the call synchronous. Instead, we can buffer the message in the imminent flush, and return. Because of this, we change the name of the forceSync option to tryForceSync, to indicate that it's best-effort and not an ironclad guarantee. Release note: none --- pkg/util/log/buffered_sink.go | 136 ++++++++++++++++++----------- pkg/util/log/buffered_sink_test.go | 132 +++++++++++++++++++++++++--- pkg/util/log/clog.go | 2 +- pkg/util/log/sinks.go | 16 +++- 4 files changed, 219 insertions(+), 67 deletions(-) diff --git a/pkg/util/log/buffered_sink.go b/pkg/util/log/buffered_sink.go index dfbe3bdca9a6..b8f527ddb796 100644 --- a/pkg/util/log/buffered_sink.go +++ b/pkg/util/log/buffered_sink.go @@ -12,6 +12,7 @@ package log import ( "context" + "fmt" "time" "github.com/cockroachdb/cockroach/pkg/cli/exit" @@ -23,16 +24,19 @@ import ( // bufferedSink wraps a child sink to add buffering. Messages are accumulated // and passed to the child sink in bulk when the buffer is flushed. The buffer // is flushed periodically, and also when it reaches a configured size. Flushes -// can also be requested manually through the extraFlush and forceSync output() +// can also be requested manually through the extraFlush and tryForceSync output() // options. // // bufferedSink's output() method never blocks on the child (except when the -// forceSync option is used). Instead, old messages are dropped if the buffer is +// tryForceSync option is used). Instead, old messages are dropped if the buffer is // overflowing a configured limit. // // Should an error occur in the child sink, it's forwarded to the provided -// onAsyncFlushErr (unless forceSync is requested, in which case the error is +// onAsyncFlushErr (unless tryForceSync is requested, in which case the error is // returned synchronously, as it would for any other sink). +// +// Note that tryForceSync is best-effort, but there are scenarios where it can't +// be honored. See output() documentation for additional details. type bufferedSink struct { // child is the wrapped logSink. child logSink @@ -104,7 +108,7 @@ func newBufferFmtConfig(bufferFmt *logconfig.BufferFormat) *bufferFmtConfig { // automatically flushes its contents to the child sink. Zero values disable // these flush triggers. If all triggers are disabled, the buffer is only ever // flushed when a flush is explicitly requested through the extraFlush or -// forceSync options passed to output(). +// tryForceSync options passed to output(). // // maxBufferSize, if not zero, limits the size of the buffer. When a new message // is causing the buffer to overflow, old messages are dropped. The caller must @@ -193,10 +197,17 @@ func (bs *bufferedSink) attachHints(b []byte) []byte { // sinks must not recursively call into logging when implementing // this method. // -// If forceSync is set, the output() call blocks on the child sink flush and -// returns the child sink's error (which is otherwise handled via the -// bufferedSink's onAsyncFlushErr). If the bufferedSink drops this message instead of -// passing it to the child sink, errSyncMsgDropped is returned. +// If tryForceSync is set, the output() call attempts to block on the +// child sink flush and returns the child sink's error (which is otherwise +// handled via the bufferedSink's onAsyncFlushErr). However, if a previous +// tryForceSync is already scheduled on flushC, we do not block. In this case, +// the msg provided to this output() call will be included in the already +// scheduled tryForceSync's flush, which is imminent. This is an edge case +// that should rarely happen, but it's possible. +// +// If the bufferedSink drops this message instead of passing it to the child +// sink, errSyncMsgDropped is returned. This is generally due to buffer size +// limitations. func (bs *bufferedSink) output(b []byte, opts sinkOutputOptions) error { // Make a copy to live in the async buffer. // We can't take ownership of the slice we're passed -- @@ -206,36 +217,70 @@ func (bs *bufferedSink) output(b []byte, opts sinkOutputOptions) error { _, _ = msg.Write(b) var errC chan error - if opts.forceSync { - // We'll ask to be notified on errC when the flush is complete. - errC = make(chan error) - } - bs.mu.Lock() - // Append the message to the buffer. - if err := bs.mu.buf.appendMsg(msg, errC); err != nil { - bs.mu.Unlock() - return err - } + err := func() error { + bs.mu.Lock() + defer bs.mu.Unlock() + // Append the message to the buffer. + err := bs.mu.buf.appendMsg(msg) + if err != nil { + // Release the msg buffer, since our append failed. + putBuffer(msg) + return err + } - flush := opts.extraFlush || opts.forceSync || (bs.triggerSize > 0 && bs.mu.buf.size() >= bs.triggerSize) - if flush { - // Trigger a flush. The flush will take effect asynchronously (and can be - // arbitrarily delayed if there's another flush in progress). In the - // meantime, the current buffer continues accumulating messages until it - // hits its limit. - bs.flushAsyncLocked() - } else { - // Schedule a flush unless one is scheduled already. - if bs.mu.timer == nil && bs.maxStaleness > 0 { - bs.mu.timer = time.AfterFunc(bs.maxStaleness, func() { - bs.mu.Lock() - bs.flushAsyncLocked() - bs.mu.Unlock() - }) + // If the errC on the buffer is already set, then a synchronous + // flush must already be scheduled & waiting on flushC to be executed. + // We only support scheduling one single synchronous flush at a time + // in the bufferedSink, triggered via the tryForceSync option. + // + // Since b.errC is already set by the currently scheduled synchronous flush, + // we can't honor the tryForceSync option for this output() call. However, + // this output() call's msg will be picked up in the imminent flush anyway, + // since it's already been buffered. + // + // WARNING: Attempting to use the buffer's errC when a tryForceSync flush + // is already scheduled, such as overwriting the reference here with a new + // errC, will cause the goroutine already waiting on errC to deadlock! + // Don't do this! + syncFlushAlreadyScheduled := bs.mu.buf.errC != nil + if !syncFlushAlreadyScheduled && opts.tryForceSync { + // We'll ask to be notified on errC when the flush is complete. + errC = make(chan error) + bs.mu.buf.errC = errC + } + if syncFlushAlreadyScheduled && opts.tryForceSync { + fmt.Printf( + "tryForceSync called on %T while one already scheduled. Msg will be included in imminent flush instead.\n", + bs.child) } + + // If a synchronous flush is already scheduled, then a flush is imminent, so don't bother + // scheduling another. Our msg will be included in the upcoming flush. + flush := !syncFlushAlreadyScheduled && + (opts.extraFlush || opts.tryForceSync || (bs.triggerSize > 0 && bs.mu.buf.size() >= bs.triggerSize)) + if flush { + // Trigger a flush. The flush will take effect asynchronously (and can be + // arbitrarily delayed if there's another flush in progress). In the + // meantime, the current buffer continues accumulating messages until it + // hits its limit. + bs.flushAsyncLocked() + } else { + // Schedule a flush for the future based on maxStaleness, unless + // one is scheduled already. + if bs.mu.timer == nil && bs.maxStaleness > 0 { + bs.mu.timer = time.AfterFunc(bs.maxStaleness, func() { + bs.mu.Lock() + bs.flushAsyncLocked() + bs.mu.Unlock() + }) + } + } + return nil + }() + if err != nil { + return err } - bs.mu.Unlock() // If this is a synchronous flush, wait for its completion. if errC != nil { @@ -299,7 +344,7 @@ func (bs *bufferedSink) runFlusher(stopC <-chan struct{}) { continue } - err := bs.child.output(msg.Bytes(), sinkOutputOptions{extraFlush: true, forceSync: errC != nil}) + err := bs.child.output(msg.Bytes(), sinkOutputOptions{extraFlush: true, tryForceSync: errC != nil}) if errC != nil { errC <- err } else if err != nil { @@ -347,9 +392,12 @@ func (b *msgBuf) size() uint64 { var errMsgTooLarge = errors.New("message dropped because it is too large") -// appendMsg appends msg to the buffer. If errC is not nil, then this channel -// will be signaled when the buffer is flushed. -func (b *msgBuf) appendMsg(msg *buffer, errC chan<- error) error { +// appendMsg appends msg to the buffer. If msg can't fit in the buffer, +// errMsgTooLarge is returned. +// +// If the buffer is full, then we drop older messages in the buffer +// until we have space for the new message. +func (b *msgBuf) appendMsg(msg *buffer) error { msgLen := uint64(msg.Len()) // Make room for the new message, potentially by dropping the oldest messages @@ -368,18 +416,6 @@ func (b *msgBuf) appendMsg(msg *buffer, errC chan<- error) error { b.messages = append(b.messages, msg) b.sizeBytes += msgLen - - // Assert that b.errC is not already set. It shouldn't be set - // because, if there was a previous message with errC set, that - // message must have had the forceSync flag set and thus acts as a barrier: - // no more messages are sent until the flush of that message completes. - // - // If b.errorCh were to be set, we wouldn't know what to do about it - // since we can't overwrite it in case m.errorCh is also set. - if b.errC != nil { - panic(errors.AssertionFailedf("unexpected errC already set")) - } - b.errC = errC return nil } diff --git a/pkg/util/log/buffered_sink_test.go b/pkg/util/log/buffered_sink_test.go index 4b4785c103fc..62f05d85bc9d 100644 --- a/pkg/util/log/buffered_sink_test.go +++ b/pkg/util/log/buffered_sink_test.go @@ -202,9 +202,9 @@ func TestBufferedSinkCrashOnAsyncFlushErr(t *testing.T) { } } -// Test that a call to output() with the forceSync option doesn't return until -// the flush is done. -func TestBufferedSinkForceSync(t *testing.T) { +// Test that a call to output() with the tryForceSync option doesn't return until +// the flush is done, in the case where a tryForceSync isn't already scheduled. +func TestBufferedSinkTryForceSync(t *testing.T) { defer leaktest.AfterTest(t)() sink, mock, cleanup := getMockBufferedSync(t, noMaxStaleness, noSizeTrigger, noMaxBufferSize, nil) defer cleanup() @@ -213,7 +213,7 @@ func TestBufferedSinkForceSync(t *testing.T) { message := []byte("test") // Make the child sink block until ch is closed. mock.EXPECT(). - output(gomock.Eq(message), sinkOutputOptionsMatcher{forceSync: gomock.Eq(true)}). + output(gomock.Eq(message), sinkOutputOptionsMatcher{tryForceSync: gomock.Eq(true)}). Do(addArgs(func() { <-ch })) var marker int32 @@ -226,12 +226,82 @@ func TestBufferedSinkForceSync(t *testing.T) { } close(ch) }() - require.NoError(t, sink.output(message, sinkOutputOptions{forceSync: true})) + require.NoError(t, sink.output(message, sinkOutputOptions{tryForceSync: true})) // Set marker to be non-zero. // This should happen quickly after the above call unblocks. atomic.StoreInt32(&marker, 1) } +// Test that a call to output() with the tryForceSync option doesn't block if a +// tryForceSync is already scheduled, and that the message is included as part +// of the already-scheduled flush. +func TestBufferedSinkTryForceSync_SyncFlushAlreadyScheduled(t *testing.T) { + defer leaktest.AfterTest(t)() + closer := newBufferedSinkCloser() + defer func() { require.NoError(t, closer.Close(defaultCloserTimeout)) }() + ctrl := gomock.NewController(t) + defer ctrl.Finish() + child := newTestWaitGroupSink(ctrl) + sink := newBufferedSink(child, noMaxStaleness, noSizeTrigger, noMaxBufferSize, false /* crashOnAsyncFlushErr */, nil) + sink.Start(closer) + + // We're scheduling 3 tryForceSync flushes: + // A: The first flush, which will hang until we signal the child sink WaitGroup. + // This will cause the next flush to wait in the queue to simulate "stacking" + // scheduled tryForceSync flushes. + // B: A second flush, which will be scheduled in flushC but not execute until the + // first flush completes (which it can't until we signal the child sink WaitGroup). + // C: A third flush, where tryForceSync can no longer be supported as a blocking + // operation. This is because the second flush is already scheduled & consuming + // the 1 extra space in flushC. The expectation is the message is buffered and we + // return instead. The message is expected to be included in the second flush, + // which is already scheduled. + // + // We use channels and the child sink WaitGroup to coordinate these sequentially across + // 3 separate goroutines, simulating 3 separate writers to the bufferedSink. + child.mock.EXPECT().output(gomock.Eq([]byte("a")), sinkOutputOptionsMatcher{tryForceSync: gomock.Eq(true)}) + child.mock.EXPECT().output(gomock.Eq([]byte("b\nc")), sinkOutputOptionsMatcher{tryForceSync: gomock.Eq(true)}) + + child.wg.Add(1) + firstFlushTriggered := make(chan struct{}) + secondFlushTriggered := make(chan struct{}) + secondFlushDone := make(chan struct{}) + // First flush, which blocks until the child sink WaitGroup is signaled later on. + go func() { + time.AfterFunc(50*time.Millisecond, func() { + firstFlushTriggered <- struct{}{} + }) + require.NoError(t, sink.output([]byte("a"), sinkOutputOptions{tryForceSync: true})) + }() + // Second flush, which gets scheduled & blocks until the first flush completes, which + // won't do so until the child sink WaitGroup is signaled. + go func() { + <-firstFlushTriggered + time.AfterFunc(50*time.Millisecond, func() { + secondFlushTriggered <- struct{}{} + }) + require.NoError(t, sink.output([]byte("b"), sinkOutputOptions{tryForceSync: true})) + secondFlushDone <- struct{}{} + }() + // Third flush, which is unable to block because we are already at the max number of + // tryForceSync flushes scheduled. It doesn't block, but instead has its message buffered + // to be included in the second flush (already scheduled). + // + // This is where we signal the child sink's WaitGroup to finally unblock the first flush + // from completing, which will unblock the second flush and allow it to execute. + go func() { + <-secondFlushTriggered + require.NoError(t, sink.output([]byte("c"), sinkOutputOptions{tryForceSync: true})) + child.wg.Done() + }() + + select { + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for bufferedSink flushes to complete") + case <-secondFlushDone: + } +} + // Test that messages are buffered while a flush is in-flight. func TestBufferedSinkBlockedFlush(t *testing.T) { defer leaktest.AfterTest(t)() @@ -312,7 +382,7 @@ b9`), out) } } -// Test that multiple messages with the forceSync option work. +// Test that multiple messages with the tryForceSync option work. func TestBufferedSinkSyncFlush(t *testing.T) { defer leaktest.AfterTest(t)() closer := newBufferedSinkCloser() @@ -325,8 +395,8 @@ func TestBufferedSinkSyncFlush(t *testing.T) { mock.EXPECT().output(gomock.Eq([]byte("a")), gomock.Any()) mock.EXPECT().output(gomock.Eq([]byte("b")), gomock.Any()) - require.NoError(t, sink.output([]byte("a"), sinkOutputOptions{forceSync: true})) - require.NoError(t, sink.output([]byte("b"), sinkOutputOptions{forceSync: true})) + require.NoError(t, sink.output([]byte("a"), sinkOutputOptions{tryForceSync: true})) + require.NoError(t, sink.output([]byte("b"), sinkOutputOptions{tryForceSync: true})) } func TestBufferCtxDoneFlushesRemainingMsgs(t *testing.T) { @@ -436,7 +506,7 @@ func TestMsgBufFlushFormat(t *testing.T) { for _, strMsg := range tc.bufferContents { msg := getBuffer() msg.WriteString(strMsg) - require.NoError(t, buf.appendMsg(msg, nil)) + require.NoError(t, buf.appendMsg(msg)) } // Flush. @@ -450,7 +520,7 @@ func TestMsgBufFlushFormat(t *testing.T) { type sinkOutputOptionsMatcher struct { extraFlush gomock.Matcher ignoreErrors gomock.Matcher - forceSync gomock.Matcher + tryForceSync gomock.Matcher } func (m sinkOutputOptionsMatcher) Matches(x interface{}) bool { @@ -460,7 +530,7 @@ func (m sinkOutputOptionsMatcher) Matches(x interface{}) bool { } if m.extraFlush != nil && !m.extraFlush.Matches(opts.extraFlush) || m.ignoreErrors != nil && !m.ignoreErrors.Matches(opts.ignoreErrors) || - m.forceSync != nil && !m.forceSync.Matches(opts.forceSync) { + m.tryForceSync != nil && !m.tryForceSync.Matches(opts.tryForceSync) { return false } return true @@ -474,11 +544,47 @@ func (m sinkOutputOptionsMatcher) String() string { if m.ignoreErrors != nil { acc = append(acc, fmt.Sprintf("ignoreErrors %v", m.ignoreErrors.String())) } - if m.forceSync != nil { - acc = append(acc, fmt.Sprintf("forceSync %v", m.forceSync.String())) + if m.tryForceSync != nil { + acc = append(acc, fmt.Sprintf("tryForceSync %v", m.tryForceSync.String())) } if len(acc) == 0 { return "is anything" } return strings.Join(acc, ", ") } + +// testWaitGroupSink is a mock child sink that will wait on its sync.WaitGroup (wg) +// during calls to output(), before passing the call along to the underlying +// MockLogSink. This enables us to coordinate tests using concurrent writers by +// preventing the flusher goroutine in the bufferedSink from completing until +// we signal the sync.WaitGroup. +type testWaitGroupSink struct { + wg *sync.WaitGroup + mock *MockLogSink +} + +func newTestWaitGroupSink(ctrl *gomock.Controller) *testWaitGroupSink { + return &testWaitGroupSink{ + wg: &sync.WaitGroup{}, + mock: NewMockLogSink(ctrl), + } +} + +func (t *testWaitGroupSink) active() bool { + return true +} + +func (t *testWaitGroupSink) attachHints(i []byte) []byte { + return nil +} + +func (t *testWaitGroupSink) output(b []byte, opts sinkOutputOptions) error { + t.wg.Wait() + return t.mock.output(b, opts) +} + +func (t *testWaitGroupSink) exitCode() exit.Code { + return exit.Success() +} + +var _ logSink = (*testWaitGroupSink)(nil) diff --git a/pkg/util/log/clog.go b/pkg/util/log/clog.go index ea7c44bb474e..085b7911cf8b 100644 --- a/pkg/util/log/clog.go +++ b/pkg/util/log/clog.go @@ -370,7 +370,7 @@ func (l *loggerT) outputLogEntry(entry logEntry) { // The sink was not accepting entries at this level. Nothing to do. continue } - if err := s.sink.output(bufs.b[i].Bytes(), sinkOutputOptions{extraFlush: extraFlush, forceSync: isFatal}); err != nil { + if err := s.sink.output(bufs.b[i].Bytes(), sinkOutputOptions{extraFlush: extraFlush, tryForceSync: isFatal}); err != nil { if !s.criticality { // An error on this sink is not critical. Just report // the error and move on. diff --git a/pkg/util/log/sinks.go b/pkg/util/log/sinks.go index 9ef1455f3679..01bff8e3dd3a 100644 --- a/pkg/util/log/sinks.go +++ b/pkg/util/log/sinks.go @@ -28,9 +28,19 @@ type sinkOutputOptions struct { extraFlush bool // ignoreErrors disables internal error handling (i.e. fail fast). ignoreErrors bool - // forceSync forces synchronous operation of this output operation. - // That is, it will block until the output has been handled. - forceSync bool + // tryForceSync attempts to force a synchronous operation of this + // output operation. That is, it will block until the output has been + // handled, so long as the underlying sink can support the operation at + // that moment. + // + // This isn't an ironclad guarantee, but in the vast majority of + // scenarios, this option will be honored. + // + // If a sink can't support a synchronous flush, it should do its + // best to ensure a flush is imminent which will include the log + // message that accompanies the tryForceSync option. It should also + // give some indication that it was unable to do so. + tryForceSync bool } // logSink abstracts the destination of logging events, after all From 96acea6330e9c8c660f0312cf08061f427c589d0 Mon Sep 17 00:00:00 2001 From: Alex Barganier Date: Tue, 22 Aug 2023 14:38:29 -0400 Subject: [PATCH 2/2] pkg/util/log: clean up some unsafe lock usage in bufferedSink There are multiple undeferred lock usages in the bufferedSink beyond just the one that surrounds the call to `appendMsg`. This small patch cleans these usages up to use `defer` instead. Release note: none --- pkg/util/log/buffered_sink.go | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/pkg/util/log/buffered_sink.go b/pkg/util/log/buffered_sink.go index b8f527ddb796..84079b8fbb45 100644 --- a/pkg/util/log/buffered_sink.go +++ b/pkg/util/log/buffered_sink.go @@ -271,8 +271,8 @@ func (bs *bufferedSink) output(b []byte, opts sinkOutputOptions) error { if bs.mu.timer == nil && bs.maxStaleness > 0 { bs.mu.timer = time.AfterFunc(bs.maxStaleness, func() { bs.mu.Lock() + defer bs.mu.Unlock() bs.flushAsyncLocked() - bs.mu.Unlock() }) } } @@ -330,9 +330,11 @@ func (bs *bufferedSink) runFlusher(stopC <-chan struct{}) { // We'll return after flushing everything. done = true } - bs.mu.Lock() - msg, errC := buf.flush(bs.format.prefix, bs.format.suffix, bs.format.delimiter) - bs.mu.Unlock() + msg, errC := func() (*buffer, chan<- error) { + bs.mu.Lock() + defer bs.mu.Unlock() + return buf.flush(bs.format.prefix, bs.format.suffix, bs.format.delimiter) + }() if msg == nil { // Nothing to flush. // NOTE: This can happen in the done case, or if we get two flushC signals @@ -350,9 +352,11 @@ func (bs *bufferedSink) runFlusher(stopC <-chan struct{}) { } else if err != nil { Ops.Errorf(context.Background(), "logging error from %T: %v", bs.child, err) if bs.crashOnAsyncFlushFailure { - logging.mu.Lock() - f := logging.mu.exitOverride.f - logging.mu.Unlock() + f := func() func(exit.Code, error) { + logging.mu.Lock() + defer logging.mu.Unlock() + return logging.mu.exitOverride.f + }() code := bs.exitCode() if f != nil { f(code, err)