Skip to content

Commit

Permalink
Improve logging at Arrow stream shutdown; avoid the explicit Canceled…
Browse files Browse the repository at this point in the history
… message at stream lifetime (#170)

Fixes #160.
Fixes a test-flake discovered while preparing
open-telemetry/opentelemetry-collector-contrib#31996

For the test flake, rewrite the test. The test was relying on the
behavior of a single stream at shutdown, whereas the test wanted to test
the seamlessness of stream restart. The new test covers the intended
condition better -- it runs a short lifetime stream and runs the test
long enough for 5 stream lifetimes, then checks for no observed logs.

I carried out manual testing of the shutdown process to help with #160.
The problem was that the exporter receives an error from both its writer
and its reader goroutines, and it logs both. There is new, consistent
handling of the EOF and Canceled error states, which are very similar.
EOF is what happens when the server responds to the client's
CloseSend(). Canceled is gRPC-internally generated.

Both exporter and receiver have similar error-logging logic now. Both
consider EOF and Canceled the same condition, and will Debug-log a
"shutdown" message when this happens. All other error conditions are
logged as errors.

This removes the former use of the StatusCode called Canceled, it wasn't
necessary after switching the code to use a Status-wrapped Canceled code
instead. If receivers are updated ahead of exporters, good. If exporters
are upgraded ahead of receivers, they will log spurious errors at stream
shutdown (but as #160 shows they were already issuing spurious logs).
The code remains defined in the protocol, as it is mapped to the gRPC
code space, and remains handled by the code as an error condition.

---------

Co-authored-by: Laurent Quérel <[email protected]>
  • Loading branch information
jmacd and lquerel authored Apr 3, 2024
1 parent e955f56 commit 9551a08
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 220 deletions.
61 changes: 59 additions & 2 deletions collector/exporter/otelarrowexporter/internal/arrow/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"fmt"
"io"
"sync"
"testing"

arrowpb "github.com/open-telemetry/otel-arrow/api/experimental/arrow/v1"
Expand Down Expand Up @@ -37,6 +38,7 @@ type testChannel interface {
onRecv(context.Context) func() (*arrowpb.BatchStatus, error)
onSend(context.Context) func(*arrowpb.BatchArrowRecords) error
onConnect(context.Context) error
onCloseSend() func() error
}

type commonTestCase struct {
Expand Down Expand Up @@ -110,7 +112,7 @@ func (ctc *commonTestCase) newMockStream(ctx context.Context) *commonTestStream
gomock.Any(), // *arrowpb.BatchArrowRecords
).Times(0),
recvCall: client.EXPECT().Recv().Times(0),
closeSendCall: client.EXPECT().CloseSend().AnyTimes().Return(nil),
closeSendCall: client.EXPECT().CloseSend().Times(0),
}
return testStream
}
Expand All @@ -137,6 +139,7 @@ func (ctc *commonTestCase) returnNewStream(hs ...testChannel) func(context.Conte
str := ctc.newMockStream(ctx)
str.sendCall.AnyTimes().DoAndReturn(h.onSend(ctx))
str.recvCall.AnyTimes().DoAndReturn(h.onRecv(ctx))
str.closeSendCall.AnyTimes().DoAndReturn(h.onCloseSend())
return str.anyStreamClient, nil
}
}
Expand All @@ -158,12 +161,17 @@ func (ctc *commonTestCase) repeatedNewStream(nc func() testChannel) func(context
str := ctc.newMockStream(ctx)
str.sendCall.AnyTimes().DoAndReturn(h.onSend(ctx))
str.recvCall.AnyTimes().DoAndReturn(h.onRecv(ctx))
str.closeSendCall.AnyTimes().DoAndReturn(h.onCloseSend())
return str.anyStreamClient, nil
}
}

// healthyTestChannel accepts the connection and returns an OK status immediately.
type healthyTestChannel struct {
// This lock is needed to avoid a race because CloseSend() now closes the sent channel,
// which some former tests were doing manually. Now CloseSend() is always called and
// the closed channel serves to assist with some tests.
lock sync.Mutex
sent chan *arrowpb.BatchArrowRecords
recv chan *arrowpb.BatchStatus
}
Expand All @@ -175,14 +183,33 @@ func newHealthyTestChannel() *healthyTestChannel {
}
}

func (tc *healthyTestChannel) doClose() {
tc.lock.Lock()
defer tc.lock.Unlock()
if tc.sent != nil {
close(tc.sent)
tc.sent = nil
}
}

func (tc *healthyTestChannel) onConnect(_ context.Context) error {
return nil
}

func (tc *healthyTestChannel) onCloseSend() func() error {
return func() error {
tc.doClose()
return nil
}
}

func (tc *healthyTestChannel) onSend(ctx context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(req *arrowpb.BatchArrowRecords) error {
tc.lock.Lock()
sent := tc.sent
tc.lock.Unlock()
select {
case tc.sent <- req:
case sent <- req:
return nil
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -221,6 +248,12 @@ func (tc *unresponsiveTestChannel) onConnect(_ context.Context) error {
return nil
}

func (tc *unresponsiveTestChannel) onCloseSend() func() error {
return func() error {
return nil
}
}

func (tc *unresponsiveTestChannel) onSend(ctx context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(req *arrowpb.BatchArrowRecords) error {
select {
Expand Down Expand Up @@ -263,6 +296,12 @@ func (tc *arrowUnsupportedTestChannel) onConnect(_ context.Context) error {
return nil
}

func (tc *arrowUnsupportedTestChannel) onCloseSend() func() error {
return func() error {
return nil
}
}

func (tc *arrowUnsupportedTestChannel) onSend(ctx context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(req *arrowpb.BatchArrowRecords) error {
<-ctx.Done()
Expand Down Expand Up @@ -290,6 +329,12 @@ func (tc *disconnectedTestChannel) onConnect(ctx context.Context) error {
return ctx.Err()
}

func (tc *disconnectedTestChannel) onCloseSend() func() error {
return func() error {
panic("unreachable")
}
}

func (tc *disconnectedTestChannel) onSend(_ context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(req *arrowpb.BatchArrowRecords) error {
panic("unreachable")
Expand Down Expand Up @@ -317,6 +362,12 @@ func (tc *sendErrorTestChannel) onConnect(_ context.Context) error {
return nil
}

func (tc *sendErrorTestChannel) onCloseSend() func() error {
return func() error {
return nil
}
}

func (tc *sendErrorTestChannel) onSend(_ context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(*arrowpb.BatchArrowRecords) error {
return io.EOF
Expand Down Expand Up @@ -346,6 +397,12 @@ func (tc *connectErrorTestChannel) onConnect(_ context.Context) error {
return fmt.Errorf("test connect error")
}

func (tc *connectErrorTestChannel) onCloseSend() func() error {
return func() error {
panic("unreachable")
}
}

func (tc *connectErrorTestChannel) onSend(_ context.Context) func(*arrowpb.BatchArrowRecords) error {
return func(*arrowpb.BatchArrowRecords) error {
panic("not reached")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type Exporter struct {
// Exporter, used for shutdown.
cancel context.CancelFunc

// wg counts one per active goroutine belonging to all strings
// wg counts one per active goroutine belonging to all streams
// of this exporter. The wait group has Add(1) called before
// starting goroutines so that they can be properly waited for
// in shutdown(), so the pattern is:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,20 @@ type exporterTestCase struct {
}

func newSingleStreamTestCase(t *testing.T) *exporterTestCase {
return newExporterTestCaseCommon(t, NotNoisy, 1, false, nil)
return newExporterTestCaseCommon(t, NotNoisy, defaultMaxStreamLifetime, 1, false, nil)
}

func newShortLifetimeStreamTestCase(t *testing.T) *exporterTestCase {
return newExporterTestCaseCommon(t, NotNoisy, time.Second/2, 1, false, nil)
}

func newSingleStreamDowngradeDisabledTestCase(t *testing.T) *exporterTestCase {
return newExporterTestCaseCommon(t, NotNoisy, 1, true, nil)
return newExporterTestCaseCommon(t, NotNoisy, defaultMaxStreamLifetime, 1, true, nil)
}

func newSingleStreamMetadataTestCase(t *testing.T) *exporterTestCase {
var count int
return newExporterTestCaseCommon(t, NotNoisy, 1, false, func(ctx context.Context) (map[string]string, error) {
return newExporterTestCaseCommon(t, NotNoisy, defaultMaxStreamLifetime, 1, false, func(ctx context.Context) (map[string]string, error) {
defer func() { count++ }()
if count%2 == 0 {
return nil, nil
Expand All @@ -82,7 +86,7 @@ func newSingleStreamMetadataTestCase(t *testing.T) *exporterTestCase {
}

func newExporterNoisyTestCase(t *testing.T, numStreams int) *exporterTestCase {
return newExporterTestCaseCommon(t, Noisy, numStreams, false, nil)
return newExporterTestCaseCommon(t, Noisy, defaultMaxStreamLifetime, numStreams, false, nil)
}

func copyBatch[T any](recordFunc func(T) (*arrowpb.BatchArrowRecords, error)) func(T) (*arrowpb.BatchArrowRecords, error) {
Expand Down Expand Up @@ -117,7 +121,7 @@ func copyBatch[T any](recordFunc func(T) (*arrowpb.BatchArrowRecords, error)) fu
}
}

func newExporterTestCaseCommon(t *testing.T, noisy noisyTest, numStreams int, disableDowngrade bool, metadataFunc func(ctx context.Context) (map[string]string, error)) *exporterTestCase {
func newExporterTestCaseCommon(t *testing.T, noisy noisyTest, maxLifetime time.Duration, numStreams int, disableDowngrade bool, metadataFunc func(ctx context.Context) (map[string]string, error)) *exporterTestCase {
ctc := newCommonTestCase(t, noisy)

if metadataFunc == nil {
Expand All @@ -128,7 +132,7 @@ func newExporterTestCaseCommon(t *testing.T, noisy noisyTest, numStreams int, di
})
}

exp := NewExporter(defaultMaxStreamLifetime, numStreams, disableDowngrade, ctc.telset, nil, func() arrowRecord.ProducerAPI {
exp := NewExporter(maxLifetime, numStreams, disableDowngrade, ctc.telset, nil, func() arrowRecord.ProducerAPI {
// Mock the close function, use a real producer for testing dataflow.
mock := arrowRecordMock.NewMockProducerAPI(ctc.ctrl)
prod := arrowRecord.NewProducer()
Expand Down Expand Up @@ -506,8 +510,8 @@ func TestArrowExporterStreaming(t *testing.T) {
expectOutput = append(expectOutput, input)
}
// Stop the test conduit started above. If the sender were
// still sending, it would panic on a closed channel.
close(channel.sent)
// still sending, the test would panic on a closed channel.
channel.doClose()
wg.Wait()

// As this equality check doesn't support out of order slices,
Expand Down Expand Up @@ -569,8 +573,8 @@ func TestArrowExporterHeaders(t *testing.T) {
require.True(t, sent)
}
// Stop the test conduit started above. If the sender were
// still sending, it would panic on a closed channel.
close(channel.sent)
// still sending, the test would panic on a closed channel.
channel.doClose()
wg.Wait()

require.Equal(t, expectOutput, actualOutput)
Expand Down Expand Up @@ -640,8 +644,8 @@ func TestArrowExporterIsTraced(t *testing.T) {
require.True(t, sent)
}
// Stop the test conduit started above. If the sender were
// still sending, it would panic on a closed channel.
close(channel.sent)
// still sending, the test would panic on a closed channel.
channel.doClose()
wg.Wait()

require.Equal(t, expectOutput, actualOutput)
Expand All @@ -658,3 +662,65 @@ func TestAddJitter(t *testing.T) {
require.Less(t, x, 20*time.Minute)
}
}

// TestArrowExporterStreamLifetimeAndShutdown exercises multiple
// stream lifetimes and then shuts down, inspects the logs for
// legibility.
func TestArrowExporterStreamLifetimeAndShutdown(t *testing.T) {
tc := newShortLifetimeStreamTestCase(t)

var wg sync.WaitGroup

var expectCount uint64
var actualCount uint64

tc.traceCall.AnyTimes().DoAndReturn(func(ctx context.Context, opts ...grpc.CallOption) (
arrowpb.ArrowTracesService_ArrowTracesClient,
error,
) {
wg.Add(1)
channel := newHealthyTestChannel()

go func() {
defer wg.Done()
testCon := arrowRecord.NewConsumer()

for data := range channel.sent {
traces, err := testCon.TracesFrom(data)
require.NoError(t, err)
require.Equal(t, 1, len(traces))
atomic.AddUint64(&actualCount, 1)
channel.recv <- statusOKFor(data.BatchId)
}

// @@@ is this???
close(channel.recv)
}()

return tc.returnNewStream(channel)(ctx, opts...)
})

bg := context.Background()
require.NoError(t, tc.exporter.Start(bg))

start := time.Now()
// This is 10 stream lifetimes using the "ShortLifetime" test.
for time.Since(start) < 5*time.Second {
input := testdata.GenerateTraces(2)
ctx := context.Background()

sent, err := tc.exporter.SendAndWait(ctx, input)
require.NoError(t, err)
require.True(t, sent)

expectCount++
}

require.NoError(t, tc.exporter.Shutdown(bg))

require.Equal(t, expectCount, actualCount)

wg.Wait()

require.Empty(t, tc.observedLogs.All())
}
Loading

0 comments on commit 9551a08

Please sign in to comment.