diff --git a/common/flusher/flusher.go b/common/flusher/flusher.go index c00e5fdfb7b..f918c946f3e 100644 --- a/common/flusher/flusher.go +++ b/common/flusher/flusher.go @@ -37,6 +37,7 @@ type ( Flusher[T any] interface { common.Daemon Buffer(item T) future.Future[struct{}] + Flush() } FlushItem[T any] struct { diff --git a/common/flusher/flusher_impl.go b/common/flusher/flusher_impl.go index 6e2668eb39a..cb9306274a8 100644 --- a/common/flusher/flusher_impl.go +++ b/common/flusher/flusher_impl.go @@ -239,6 +239,26 @@ func (f *flusherImpl[T]) Buffer(item T) future.Future[struct{}] { return flushItem.Future } +func (f *flusherImpl[T]) Flush() { + if f.shutdownChan.IsShutdown() { + return + } + + f.Lock() + defer f.Unlock() + + if f.shutdownChan.IsShutdown() { + return + } + + if len(f.flushBuffer) == 0 { + // nothing to flush + return + } + f.stopTimerLocked() + f.pushDirtyBufferLocked() +} + func (f *flusherImpl[T]) appendLocked(flushItem FlushItem[T]) { if len(f.flushBuffer) == 0 { // start timer if it's first Item insertion f.startTimerLocked() diff --git a/common/flusher/flusher_test.go b/common/flusher/flusher_test.go index 9fd3eee360b..c68d91a2565 100644 --- a/common/flusher/flusher_test.go +++ b/common/flusher/flusher_test.go @@ -141,10 +141,10 @@ func (s *flusherSuite) TestBuffer_Switch() { s.Equal([]*fakeTask{}, writer.Get()) } -func (s *flusherSuite) TestBuffer_Full() { - bufferCapacity := 1 +func (s *flusherSuite) TestBuffer_Timer() { + bufferCapacity := 2 numBuffer := 2 - flushTimeout := time.Minute + flushTimeout := time.Millisecond writer := &fakeWriter{} flushBuffer := NewFlusher[*fakeTask]( bufferCapacity, @@ -153,35 +153,28 @@ func (s *flusherSuite) TestBuffer_Full() { writer, log.NewTestLogger(), ) + flushBuffer.Start() + defer flushBuffer.Stop() - task0 := newFakeTask() - task1 := newFakeTask() - task2 := newFakeTask() - fut0 := flushBuffer.Buffer(task0) - fut1 := flushBuffer.Buffer(task1) - fut2 := flushBuffer.Buffer(task2) - s.False(fut0.Ready()) - s.False(fut1.Ready()) - _, err := fut2.Get(s.ctx) - s.Equal(ErrFull, err) + task := newFakeTask() + fut := flushBuffer.Buffer(task) + _, err := fut.Get(s.ctx) + s.NoError(err) flushBuffer.Lock() - defer flushBuffer.Unlock() s.Equal(0, len(flushBuffer.flushBuffer)) s.Nil(flushBuffer.flushBufferPointer) - s.Equal(0, len(flushBuffer.freeBufferChan)) - s.Equal(2, len(flushBuffer.fullBufferChan)) - buffer := <-flushBuffer.fullBufferChan - s.Equal(task0, buffer[0].Item) - buffer = <-flushBuffer.fullBufferChan - s.Equal(task1, buffer[0].Item) - s.Equal([]*fakeTask{}, writer.Get()) + s.Equal(1, len(flushBuffer.freeBufferChan)) + s.Equal(0, len(flushBuffer.fullBufferChan)) + flushBuffer.Unlock() + + s.Equal([]*fakeTask{task}, writer.Get()) } -func (s *flusherSuite) TestBuffer_Timer() { +func (s *flusherSuite) TestBuffer_Flush() { bufferCapacity := 2 numBuffer := 2 - flushTimeout := time.Millisecond + flushTimeout := time.Minute writer := &fakeWriter{} flushBuffer := NewFlusher[*fakeTask]( bufferCapacity, @@ -195,6 +188,7 @@ func (s *flusherSuite) TestBuffer_Timer() { task := newFakeTask() fut := flushBuffer.Buffer(task) + flushBuffer.Flush() _, err := fut.Get(s.ctx) s.NoError(err) @@ -208,6 +202,43 @@ func (s *flusherSuite) TestBuffer_Timer() { s.Equal([]*fakeTask{task}, writer.Get()) } +func (s *flusherSuite) TestBuffer_Full() { + bufferCapacity := 1 + numBuffer := 2 + flushTimeout := time.Minute + writer := &fakeWriter{} + flushBuffer := NewFlusher[*fakeTask]( + bufferCapacity, + numBuffer, + flushTimeout, + writer, + log.NewTestLogger(), + ) + + task0 := newFakeTask() + task1 := newFakeTask() + task2 := newFakeTask() + fut0 := flushBuffer.Buffer(task0) + fut1 := flushBuffer.Buffer(task1) + fut2 := flushBuffer.Buffer(task2) + s.False(fut0.Ready()) + s.False(fut1.Ready()) + _, err := fut2.Get(s.ctx) + s.Equal(ErrFull, err) + + flushBuffer.Lock() + defer flushBuffer.Unlock() + s.Equal(0, len(flushBuffer.flushBuffer)) + s.Nil(flushBuffer.flushBufferPointer) + s.Equal(0, len(flushBuffer.freeBufferChan)) + s.Equal(2, len(flushBuffer.fullBufferChan)) + buffer := <-flushBuffer.fullBufferChan + s.Equal(task0, buffer[0].Item) + buffer = <-flushBuffer.fullBufferChan + s.Equal(task1, buffer[0].Item) + s.Equal([]*fakeTask{}, writer.Get()) +} + func (s *flusherSuite) TestBuffer_Shutdown() { bufferCapacity := 2 numBuffer := 2