From 4854aa3a3287c5cf068fcb8c85a1e4a02c255340 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Wed, 6 Jan 2021 16:23:45 -0800 Subject: [PATCH 01/15] feat(pubsub): add publish flow control --- pubsub/flow_controller.go | 72 +++++++++++++++++++++++++++++++--- pubsub/flow_controller_test.go | 16 ++++---- pubsub/subscription.go | 2 +- pubsub/topic.go | 37 ++++++++++++++--- 4 files changed, 107 insertions(+), 20 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index 3f165a0ac18c..cbf005064e58 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -16,11 +16,26 @@ package pubsub import ( "context" + "errors" + "fmt" "sync/atomic" "golang.org/x/sync/semaphore" ) +// LimitExceededBehavior configures the behavior that flowController can use in case +// the flow control limits are exceeded. +type LimitExceededBehavior int + +const ( + // Ignore disables flow control. + Ignore LimitExceededBehavior = iota + // Block signals to wait until the request can be made without exceeding the limit. + Block + // SignalError signals an error to the caller of acquire. + SignalError +) + // flowController implements flow control for Subscription.Receive. type flowController struct { maxCount int @@ -31,18 +46,20 @@ type flowController struct { // small releases. // Atomic. countRemaining int64 + limitBehavior LimitExceededBehavior } // newFlowController creates a new flowController that ensures no more than // maxCount messages or maxSize bytes are outstanding at once. If maxCount or // maxSize is < 1, then an unlimited number of messages or bytes is permitted, // respectively. -func newFlowController(maxCount, maxSize int) *flowController { +func newFlowController(maxCount, maxSize int, behavior LimitExceededBehavior) *flowController { fc := &flowController{ - maxCount: maxCount, - maxSize: maxSize, - semCount: nil, - semSize: nil, + maxCount: maxCount, + maxSize: maxSize, + semCount: nil, + semSize: nil, + limitBehavior: behavior, } if maxCount > 0 { fc.semCount = semaphore.NewWeighted(int64(maxCount)) @@ -59,6 +76,9 @@ func newFlowController(maxCount, maxSize int) *flowController { // acquire allows large messages to proceed by treating a size greater than maxSize // as if it were equal to maxSize. func (f *flowController) acquire(ctx context.Context, size int) error { + if f.limitBehavior == Ignore { + return nil + } if f.semCount != nil { if err := f.semCount.Acquire(ctx, 1); err != nil { return err @@ -99,8 +119,50 @@ func (f *flowController) tryAcquire(size int) bool { return true } +func (f *flowController) newAcquire(ctx context.Context, size int) error { + switch f.limitBehavior { + case Ignore: + return nil + case Block: + return fmt.Errorf("blah") + if f.semCount != nil { + if err := f.semCount.Acquire(ctx, 1); err != nil { + return err + } + } + if f.semSize != nil { + if err := f.semSize.Acquire(ctx, f.bound(size)); err != nil { + if f.semCount != nil { + f.semCount.Release(1) + } + return err + } + } + atomic.AddInt64(&f.countRemaining, 1) + case SignalError: + if f.semCount != nil { + if !f.semCount.TryAcquire(1) { + return errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded") + } + } + if f.semSize != nil { + if !f.semSize.TryAcquire(f.bound(size)) { + if f.semCount != nil { + f.semCount.Release(1) + } + return errors.New("pubsub: MaxOutstandingBytes flow controller limit exceeded") + } + } + atomic.AddInt64(&f.countRemaining, 1) + } + return nil +} + // release notes that one message of size bytes is no longer outstanding. func (f *flowController) release(size int) { + if f.limitBehavior == Ignore { + return + } atomic.AddInt64(&f.countRemaining, -1) if f.semCount != nil { f.semCount.Release(1) diff --git a/pubsub/flow_controller_test.go b/pubsub/flow_controller_test.go index 71c41cbf165c..af7c8f3b2637 100644 --- a/pubsub/flow_controller_test.go +++ b/pubsub/flow_controller_test.go @@ -28,7 +28,7 @@ import ( func TestFlowControllerCancel(t *testing.T) { // Test canceling a flow controller's context. t.Parallel() - fc := newFlowController(3, 10) + fc := newFlowController(3, 10, Block) if err := fc.acquire(context.Background(), 5); err != nil { t.Fatal(err) } @@ -51,7 +51,7 @@ func TestFlowControllerCancel(t *testing.T) { func TestFlowControllerLargeRequest(t *testing.T) { // Large requests succeed, consuming the entire allotment. t.Parallel() - fc := newFlowController(3, 10) + fc := newFlowController(3, 10, Block) err := fc.acquire(context.Background(), 11) if err != nil { t.Fatal(err) @@ -64,7 +64,7 @@ func TestFlowControllerNoStarve(t *testing.T) { t.Parallel() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - fc := newFlowController(10, 10) + fc := newFlowController(10, 10, Block) first := make(chan int) for i := 0; i < 20; i++ { go func() { @@ -120,7 +120,7 @@ func TestFlowControllerSaturation(t *testing.T) { wantSize: 9, }, } { - fc := newFlowController(maxCount, maxSize) + fc := newFlowController(maxCount, maxSize, Block) // Atomically track flow controller state. // The flowController itself tracks count. var curSize int64 @@ -176,7 +176,7 @@ func TestFlowControllerSaturation(t *testing.T) { func TestFlowControllerTryAcquire(t *testing.T) { t.Parallel() - fc := newFlowController(3, 10) + fc := newFlowController(3, 10, Block) // Successfully tryAcquire 4 bytes. if !fc.tryAcquire(4) { @@ -197,7 +197,7 @@ func TestFlowControllerTryAcquire(t *testing.T) { func TestFlowControllerUnboundedCount(t *testing.T) { t.Parallel() ctx := context.Background() - fc := newFlowController(0, 10) + fc := newFlowController(0, 10, Block) // Successfully acquire 4 bytes. if err := fc.acquire(ctx, 4); err != nil { @@ -218,7 +218,7 @@ func TestFlowControllerUnboundedCount(t *testing.T) { func TestFlowControllerUnboundedCount2(t *testing.T) { t.Parallel() ctx := context.Background() - fc := newFlowController(0, 0) + fc := newFlowController(0, 0, Block) // Successfully acquire 4 bytes. if err := fc.acquire(ctx, 4); err != nil { t.Errorf("got %v, wanted no error", err) @@ -236,7 +236,7 @@ func TestFlowControllerUnboundedCount2(t *testing.T) { func TestFlowControllerUnboundedBytes(t *testing.T) { t.Parallel() ctx := context.Background() - fc := newFlowController(2, 0) + fc := newFlowController(2, 0, Block) // Successfully acquire 4GB. if err := fc.acquire(ctx, 4e9); err != nil { diff --git a/pubsub/subscription.go b/pubsub/subscription.go index d1dee3ca9a1b..31aa54beb904 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -834,7 +834,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes maxOutstandingBytes: maxBytes, useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl, } - fc := newFlowController(maxCount, maxBytes) + fc := newFlowController(maxCount, maxBytes, Block) sched := scheduler.NewReceiveScheduler(maxCount) diff --git a/pubsub/topic.go b/pubsub/topic.go index 91bcaf934283..c0d8741ddcac 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -68,6 +68,8 @@ type Topic struct { stopped bool scheduler *scheduler.PublishScheduler + fc *flowController + // EnableMessageOrdering enables delivery of ordered keys. EnableMessageOrdering bool } @@ -100,6 +102,19 @@ type PublishSettings struct { // // Defaults to DefaultPublishSettings.BufferedByteLimit. BufferedByteLimit int + + // MaxOutstandingMessages is the maximum number of unsent messages + MaxOutstandingMessages int + + // MaxOutstandingBytes is the maximum size of messages waiting + // to be published. + MaxOutstandingBytes int + + // LimitExceededBehavior configures the behavior when trying to publish + // additional messages while the flow controller is full. The available options + // include Block (default), Ignore (disable), and SignalError (publish + // results will return an error). + LimitExceededBehavior LimitExceededBehavior } // DefaultPublishSettings holds the default values for topics' PublishSettings. @@ -450,9 +465,12 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { return r } - // TODO(jba) [from bcmills] consider using a shared channel per bundle - // (requires Bundler API changes; would reduce allocations) - err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r}, msgSize) + if err := t.fc.newAcquire(ctx, msgSize); err != nil { + t.scheduler.Pause(msg.OrderingKey) + ipubsub.SetPublishResult(r, "", err) + return r + } + err := t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize) if err != nil { t.scheduler.Pause(msg.OrderingKey) ipubsub.SetPublishResult(r, "", err) @@ -475,8 +493,9 @@ func (t *Topic) Stop() { } type bundledMessage struct { - msg *Message - res *PublishResult + msg *Message + res *PublishResult + size int } func (t *Topic) initBundler() { @@ -503,6 +522,8 @@ func (t *Topic) initBundler() { workers = 25 * runtime.GOMAXPROCS(0) } + t.fc = newFlowController(t.PublishSettings.MaxOutstandingMessages, t.PublishSettings.MaxOutstandingBytes, t.PublishSettings.LimitExceededBehavior) + t.scheduler = scheduler.NewPublishScheduler(workers, func(bundle interface{}) { // TODO(jba): use a context detached from the one passed to NewClient. ctx := context.TODO() @@ -538,11 +559,14 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) var orderingKey string for i, bm := range bms { orderingKey = bm.msg.OrderingKey - pbMsgs[i] = &pb.PubsubMessage{ + msg := &pb.PubsubMessage{ Data: bm.msg.Data, Attributes: bm.msg.Attributes, OrderingKey: bm.msg.OrderingKey, } + pbMsgs[i] = msg + // Calculate size of message for flow control release. + bm.size = proto.Size(msg) bm.msg = nil // release bm.msg for GC } var res *pb.PublishResponse @@ -567,6 +591,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) PublishLatency.M(float64(end.Sub(start)/time.Millisecond)), PublishedMessages.M(int64(len(bms)))) for i, bm := range bms { + t.fc.release(bm.size) if err != nil { ipubsub.SetPublishResult(bm.res, "", err) } else { From b4789caaed5ae3b21ddfa85c9c0cca4362640906 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Fri, 11 Jun 2021 16:46:44 -0700 Subject: [PATCH 02/15] add SignalError logic to flow controller --- pubsub/flow_controller.go | 75 +++++++++++++++------------ pubsub/flow_controller_test.go | 16 +++--- pubsub/pstest/fake.go | 83 ++++++++++++++++++++++++------ pubsub/pstest/fake_test.go | 32 ++++++++++++ pubsub/subscription.go | 4 +- pubsub/topic.go | 5 +- pubsub/topic_test.go | 92 ++++++++++++++++++++++++++++++++++ 7 files changed, 247 insertions(+), 60 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index cbf005064e58..e43853040551 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -17,7 +17,6 @@ package pubsub import ( "context" "errors" - "fmt" "sync/atomic" "golang.org/x/sync/semaphore" @@ -28,12 +27,17 @@ import ( type LimitExceededBehavior int const ( - // Ignore disables flow control. - Ignore LimitExceededBehavior = iota - // Block signals to wait until the request can be made without exceeding the limit. - Block - // SignalError signals an error to the caller of acquire. - SignalError + // FlowControlIgnore disables flow control. + FlowControlIgnore LimitExceededBehavior = iota + // FlowControlBlock signals to wait until the request can be made without exceeding the limit. + FlowControlBlock + // FlowControlSignalError signals an error to the caller of acquire. + FlowControlSignalError +) + +var ( + ErrFlowControllerMaxOutstandingMessages = errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded") + ErrFlowControllerMaxOutstandingBytes = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded") ) // flowController implements flow control for Subscription.Receive. @@ -76,7 +80,7 @@ func newFlowController(maxCount, maxSize int, behavior LimitExceededBehavior) *f // acquire allows large messages to proceed by treating a size greater than maxSize // as if it were equal to maxSize. func (f *flowController) acquire(ctx context.Context, size int) error { - if f.limitBehavior == Ignore { + if f.limitBehavior == FlowControlIgnore { return nil } if f.semCount != nil { @@ -101,30 +105,34 @@ func (f *flowController) acquire(ctx context.Context, size int) error { // // tryAcquire allows large messages to proceed by treating a size greater than // maxSize as if it were equal to maxSize. -func (f *flowController) tryAcquire(size int) bool { - if f.semCount != nil { - if !f.semCount.TryAcquire(1) { - return false - } - } - if f.semSize != nil { - if !f.semSize.TryAcquire(f.bound(size)) { - if f.semCount != nil { - f.semCount.Release(1) - } - return false - } - } - atomic.AddInt64(&f.countRemaining, 1) - return true -} +// func (f *flowController) tryAcquire(size int) bool { +// if f.semCount != nil { +// if !f.semCount.TryAcquire(1) { +// return false +// } +// } +// if f.semSize != nil { +// if !f.semSize.TryAcquire(f.bound(size)) { +// if f.semCount != nil { +// f.semCount.Release(1) +// } +// return false +// } +// } +// atomic.AddInt64(&f.countRemaining, 1) +// return true +// } +// newAcquire acquires space for a message: the message count and its size. +// +// In FlowControlSignalError mode, large messages greater than maxSize +// will be result in an error. In other modes, large messages will be treated +// as if it were equal to maxSize. func (f *flowController) newAcquire(ctx context.Context, size int) error { switch f.limitBehavior { - case Ignore: + case FlowControlIgnore: return nil - case Block: - return fmt.Errorf("blah") + case FlowControlBlock: if f.semCount != nil { if err := f.semCount.Acquire(ctx, 1); err != nil { return err @@ -139,18 +147,19 @@ func (f *flowController) newAcquire(ctx context.Context, size int) error { } } atomic.AddInt64(&f.countRemaining, 1) - case SignalError: + case FlowControlSignalError: if f.semCount != nil { if !f.semCount.TryAcquire(1) { - return errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded") + return ErrFlowControllerMaxOutstandingMessages } } if f.semSize != nil { - if !f.semSize.TryAcquire(f.bound(size)) { + // Try to acquire the full size of the message here. + if !f.semSize.TryAcquire(int64(size)) { if f.semCount != nil { f.semCount.Release(1) } - return errors.New("pubsub: MaxOutstandingBytes flow controller limit exceeded") + return ErrFlowControllerMaxOutstandingBytes } } atomic.AddInt64(&f.countRemaining, 1) @@ -160,7 +169,7 @@ func (f *flowController) newAcquire(ctx context.Context, size int) error { // release notes that one message of size bytes is no longer outstanding. func (f *flowController) release(size int) { - if f.limitBehavior == Ignore { + if f.limitBehavior == FlowControlIgnore { return } atomic.AddInt64(&f.countRemaining, -1) diff --git a/pubsub/flow_controller_test.go b/pubsub/flow_controller_test.go index af7c8f3b2637..ba272b23e234 100644 --- a/pubsub/flow_controller_test.go +++ b/pubsub/flow_controller_test.go @@ -28,7 +28,7 @@ import ( func TestFlowControllerCancel(t *testing.T) { // Test canceling a flow controller's context. t.Parallel() - fc := newFlowController(3, 10, Block) + fc := newFlowController(3, 10, FlowControlBlock) if err := fc.acquire(context.Background(), 5); err != nil { t.Fatal(err) } @@ -51,7 +51,7 @@ func TestFlowControllerCancel(t *testing.T) { func TestFlowControllerLargeRequest(t *testing.T) { // Large requests succeed, consuming the entire allotment. t.Parallel() - fc := newFlowController(3, 10, Block) + fc := newFlowController(3, 10, FlowControlBlock) err := fc.acquire(context.Background(), 11) if err != nil { t.Fatal(err) @@ -64,7 +64,7 @@ func TestFlowControllerNoStarve(t *testing.T) { t.Parallel() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - fc := newFlowController(10, 10, Block) + fc := newFlowController(10, 10, FlowControlBlock) first := make(chan int) for i := 0; i < 20; i++ { go func() { @@ -120,7 +120,7 @@ func TestFlowControllerSaturation(t *testing.T) { wantSize: 9, }, } { - fc := newFlowController(maxCount, maxSize, Block) + fc := newFlowController(maxCount, maxSize, FlowControlBlock) // Atomically track flow controller state. // The flowController itself tracks count. var curSize int64 @@ -176,7 +176,7 @@ func TestFlowControllerSaturation(t *testing.T) { func TestFlowControllerTryAcquire(t *testing.T) { t.Parallel() - fc := newFlowController(3, 10, Block) + fc := newFlowController(3, 10, FlowControlBlock) // Successfully tryAcquire 4 bytes. if !fc.tryAcquire(4) { @@ -197,7 +197,7 @@ func TestFlowControllerTryAcquire(t *testing.T) { func TestFlowControllerUnboundedCount(t *testing.T) { t.Parallel() ctx := context.Background() - fc := newFlowController(0, 10, Block) + fc := newFlowController(0, 10, FlowControlBlock) // Successfully acquire 4 bytes. if err := fc.acquire(ctx, 4); err != nil { @@ -218,7 +218,7 @@ func TestFlowControllerUnboundedCount(t *testing.T) { func TestFlowControllerUnboundedCount2(t *testing.T) { t.Parallel() ctx := context.Background() - fc := newFlowController(0, 0, Block) + fc := newFlowController(0, 0, FlowControlBlock) // Successfully acquire 4 bytes. if err := fc.acquire(ctx, 4); err != nil { t.Errorf("got %v, wanted no error", err) @@ -236,7 +236,7 @@ func TestFlowControllerUnboundedCount2(t *testing.T) { func TestFlowControllerUnboundedBytes(t *testing.T) { t.Parallel() ctx := context.Background() - fc := newFlowController(2, 0, Block) + fc := newFlowController(2, 0, FlowControlBlock) // Successfully acquire 4GB. if err := fc.acquire(ctx, 4e9); err != nil { diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index 9b330874fbdd..975a557eb0f5 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -60,6 +60,11 @@ type ServerReactorOption struct { Reactor Reactor } +type publishResponse struct { + resp *pb.PublishResponse + err error +} + // For testing. Note that even though changes to the now variable are atomic, a call // to the stored function can race with a change to that function. This could be a // problem if tests are run in parallel, or even if concurrent parts of the same test @@ -88,16 +93,24 @@ type GServer struct { pb.PublisherServer pb.SubscriberServer - mu sync.Mutex - topics map[string]*topic - subs map[string]*subscription - msgs []*Message // all messages ever published - msgsByID map[string]*Message - wg sync.WaitGroup - nextID int - streamTimeout time.Duration - timeNowFunc func() time.Time + mu sync.Mutex + topics map[string]*topic + subs map[string]*subscription + msgs []*Message // all messages ever published + msgsByID map[string]*Message + wg sync.WaitGroup + nextID int + streamTimeout time.Duration + timeNowFunc func() time.Time + + // reactorOptions allow configuring the reaction function for calls. reactorOptions ReactorOptions + // PublishResponses is a channel of responses to use for Publish. + publishResponses chan *publishResponse + // autoPublishResponse enables the server to automatically generate + // PublishResponse when publish is called. Otherwise, responses + // are generated from the publishResponses channel. + autoPublishResponse bool } // NewServer creates a new fake server running in the current process. @@ -114,11 +127,13 @@ func NewServer(opts ...ServerReactorOption) *Server { srv: srv, Addr: srv.Addr, GServer: GServer{ - topics: map[string]*topic{}, - subs: map[string]*subscription{}, - msgsByID: map[string]*Message{}, - timeNowFunc: timeNow, - reactorOptions: reactorOptions, + topics: map[string]*topic{}, + subs: map[string]*subscription{}, + msgsByID: map[string]*Message{}, + timeNowFunc: timeNow, + reactorOptions: reactorOptions, + publishResponses: make(chan *publishResponse, 100), + autoPublishResponse: true, }, } pb.RegisterPublisherServer(srv.Gsrv, &s.GServer) @@ -168,6 +183,37 @@ func (s *Server) PublishOrdered(topic string, data []byte, attrs map[string]stri return res.MessageIds[0] } +// AddPublishResponse adds a new publish response to the channel used for +// responding to publish requests. +func (s *Server) AddPublishResponse(pbr *pb.PublishResponse, err error) { + s.GServer.mu.Lock() + defer s.GServer.mu.Unlock() + pr := &publishResponse{} + if err != nil { + pr.err = err + } else { + pr.resp = pbr + } + s.GServer.publishResponses <- pr +} + +// SetAutoPublishResponse controls whether to automatically respond +// to messages published or to use user-added responses from the +// publishResponses channel. +func (s *Server) SetAutoPublishResponse(autoPublishResponse bool) { + s.GServer.mu.Lock() + defer s.GServer.mu.Unlock() + s.GServer.autoPublishResponse = autoPublishResponse +} + +// ResetPublishResponses resets the buffered publishResponses channel +// with a new buffered channel with the given size. +func (s *Server) ResetPublishResponses(size int) { + s.GServer.mu.Lock() + defer s.GServer.mu.Unlock() + s.GServer.publishResponses = make(chan *publishResponse, size) +} + // SetStreamTimeout sets the amount of time a stream will be active before it shuts // itself down. This mimics the real service's behavior of closing streams after 30 // minutes. If SetStreamTimeout is never called or is passed zero, streams never shut @@ -613,6 +659,15 @@ func (s *GServer) Publish(_ context.Context, req *pb.PublishRequest) (*pb.Publis if top == nil { return nil, status.Errorf(codes.NotFound, "topic %q", req.Topic) } + + if !s.autoPublishResponse { + r := <-s.publishResponses + if r.err != nil { + return nil, r.err + } + return r.resp, nil + } + var ids []string for _, pm := range req.Messages { id := fmt.Sprintf("m%d", s.nextID) diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index 1ca043342587..285ab483208d 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -1047,3 +1047,35 @@ func TestErrorInjection(t *testing.T) { } } } + +func TestPublishResponse(t *testing.T) { + ctx := context.Background() + _, _, srv, cleanup := newFake(ctx, t) + defer cleanup() + + // By default, autoPublishResponse is true so this should succeed immediately. + got := srv.Publish("projects/p/topics/t", []byte("msg1"), nil) + if want := "m0"; got != want { + t.Fatalf("srv.Publish(): got %v, want %v", got, want) + } + + // After disabling autoPublishResponse, publish() operations + // will read from the channel instead of auto generating messages. + srv.SetAutoPublishResponse(false) + + srv.AddPublishResponse(&pb.PublishResponse{ + MessageIds: []string{"1"}, + }, nil) + got = srv.Publish("projects/p/topics/t", []byte("msg2"), nil) + if want := "1"; got != want { + t.Fatalf("srv.Publish(): got %v, want %v", got, want) + } + + srv.AddPublishResponse(&pb.PublishResponse{ + MessageIds: []string{"2"}, + }, nil) + got = srv.Publish("projects/p/topics/t", []byte("msg3"), nil) + if want := "2"; got != want { + t.Fatalf("srv.Publish(): got %v, want %v", got, want) + } +} diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 31aa54beb904..84d2acd6bda0 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -834,7 +834,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes maxOutstandingBytes: maxBytes, useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl, } - fc := newFlowController(maxCount, maxBytes, Block) + fc := newFlowController(maxCount, maxBytes, FlowControlBlock) sched := scheduler.NewReceiveScheduler(maxCount) @@ -959,7 +959,7 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes type pullOptions struct { maxExtension time.Duration // the maximum time to extend a message's ack deadline in tota maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc - maxPrefetch int32 + maxPrefetch int32 // the max number of outstanding messages, used to calculate maxToPull // If true, use unary Pull instead of StreamingPull, and never pull more // than maxPrefetch messages. synchronous bool diff --git a/pubsub/topic.go b/pubsub/topic.go index c0d8741ddcac..eef8d32fcc48 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -103,11 +103,10 @@ type PublishSettings struct { // Defaults to DefaultPublishSettings.BufferedByteLimit. BufferedByteLimit int - // MaxOutstandingMessages is the maximum number of unsent messages + // MaxOutstandingMessages is the maximum number of bufered messages to be published. MaxOutstandingMessages int - // MaxOutstandingBytes is the maximum size of messages waiting - // to be published. + // MaxOutstandingBytes is the maximum size of buffered messages to be published. MaxOutstandingBytes int // LimitExceededBehavior configures the behavior when trying to publish diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index 2736fb2e9c5b..5a4e3782c75f 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -307,3 +307,95 @@ func TestDetachSubscription(t *testing.T) { t.Errorf("DetachSubscription failed: %v", err) } } + +func TestPublishFlowControl_SignalError(t *testing.T) { + ctx := context.Background() + c, srv := newFake(t) + defer c.Close() + defer srv.Close() + + topic, err := c.CreateTopic(ctx, "some-topic") + if err != nil { + t.Fatal(err) + } + topic.PublishSettings.MaxOutstandingMessages = 1 + topic.PublishSettings.MaxOutstandingBytes = 10 + topic.PublishSettings.LimitExceededBehavior = FlowControlSignalError + + // Sending a message that is too large results in an error in SignalError mode. + r := publishSingleMessage(ctx, topic, "AAAAAAAAAAA") + if _, err := r.Get(ctx); err != ErrFlowControllerMaxOutstandingBytes { + t.Fatalf("publishResult.Get(): got %v, want %v", err, ErrFlowControllerMaxOutstandingBytes) + } + + // Sending a second message succeeds. + r = publishSingleMessage(ctx, topic, "AAAA") + if _, err := r.Get(ctx); err != nil { + t.Fatalf("publishResult.Get(): got %v, want nil", err) + } + + // Sending a third message fails because of the outstanding message. + r = publishSingleMessage(ctx, topic, "AA") + if _, err := r.Get(ctx); err != ErrFlowControllerMaxOutstandingMessages { + t.Fatalf("publishResult.Get(): got %v, want %v", err, ErrFlowControllerMaxOutstandingMessages) + } + + // Publish 11 messages, nothing should fail. + var res []*PublishResult + for i := 0; i < 11; i++ { + r := topic.Publish(ctx, &Message{ + Data: []byte("test"), + }) + res = append(res, r) + } + for _, r := range res { + if _, err := r.Get(ctx); err != nil { + t.Fatalf("got err: %v", err) + } + } + + topic.scheduler = nil + topic.PublishSettings.LimitExceededBehavior = FlowControlBlock + // Publish 11 messages, nothing should fail. + res = []*PublishResult{} + for i := 0; i < 11; i++ { + r := topic.Publish(ctx, &Message{ + Data: []byte("test"), + }) + res = append(res, r) + } + for _, r := range res { + if _, err := r.Get(ctx); err != nil { + t.Fatalf("got err: %v", err) + } + } + + topic.scheduler = nil + topic.PublishSettings.LimitExceededBehavior = FlowControlSignalError + // Publish 11 messages, should signal error on the 11th message. + res = []*PublishResult{} + for i := 0; i < 11; i++ { + r := topic.Publish(ctx, &Message{ + Data: []byte("test"), + }) + res = append(res, r) + } + for i, r := range res { + if i == 10 { + if _, err := r.Get(ctx); err != ErrFlowControllerMaxOutstandingMessages { + t.Fatalf("publishResult.Get(): got %v, want %v", err, ErrFlowControllerMaxOutstandingMessages) + } + } else { + if _, err := r.Get(ctx); err != nil { + t.Fatalf("got err: %v", err) + } + } + } +} + +// publishSingleMessage published a single message to a topic. +func publishSingleMessage(ctx context.Context, t *Topic, data string) *PublishResult { + return t.Publish(ctx, &Message{ + Data: []byte(data), + }) +} From 4cf48d880fc79bb49393f22f8b087ecb24a59b89 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Fri, 11 Jun 2021 16:48:47 -0700 Subject: [PATCH 03/15] feat(pubsub/pstest): add channel to support user-defined publish responses --- pubsub/pstest/fake.go | 111 +++++++++++++++++++++++++++++-------- pubsub/pstest/fake_test.go | 56 +++++++++++++++---- 2 files changed, 133 insertions(+), 34 deletions(-) diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index 2f50f97908f4..975a557eb0f5 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -34,12 +34,12 @@ import ( "time" "cloud.google.com/go/internal/testutil" + "github.com/golang/protobuf/ptypes" + durpb "github.com/golang/protobuf/ptypes/duration" + emptypb "github.com/golang/protobuf/ptypes/empty" pb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - durpb "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/emptypb" - "google.golang.org/protobuf/types/known/timestamppb" ) // ReactorOptions is a map that Server uses to look up reactors. @@ -60,6 +60,11 @@ type ServerReactorOption struct { Reactor Reactor } +type publishResponse struct { + resp *pb.PublishResponse + err error +} + // For testing. Note that even though changes to the now variable are atomic, a call // to the stored function can race with a change to that function. This could be a // problem if tests are run in parallel, or even if concurrent parts of the same test @@ -88,16 +93,24 @@ type GServer struct { pb.PublisherServer pb.SubscriberServer - mu sync.Mutex - topics map[string]*topic - subs map[string]*subscription - msgs []*Message // all messages ever published - msgsByID map[string]*Message - wg sync.WaitGroup - nextID int - streamTimeout time.Duration - timeNowFunc func() time.Time + mu sync.Mutex + topics map[string]*topic + subs map[string]*subscription + msgs []*Message // all messages ever published + msgsByID map[string]*Message + wg sync.WaitGroup + nextID int + streamTimeout time.Duration + timeNowFunc func() time.Time + + // reactorOptions allow configuring the reaction function for calls. reactorOptions ReactorOptions + // PublishResponses is a channel of responses to use for Publish. + publishResponses chan *publishResponse + // autoPublishResponse enables the server to automatically generate + // PublishResponse when publish is called. Otherwise, responses + // are generated from the publishResponses channel. + autoPublishResponse bool } // NewServer creates a new fake server running in the current process. @@ -114,11 +127,13 @@ func NewServer(opts ...ServerReactorOption) *Server { srv: srv, Addr: srv.Addr, GServer: GServer{ - topics: map[string]*topic{}, - subs: map[string]*subscription{}, - msgsByID: map[string]*Message{}, - timeNowFunc: timeNow, - reactorOptions: reactorOptions, + topics: map[string]*topic{}, + subs: map[string]*subscription{}, + msgsByID: map[string]*Message{}, + timeNowFunc: timeNow, + reactorOptions: reactorOptions, + publishResponses: make(chan *publishResponse, 100), + autoPublishResponse: true, }, } pb.RegisterPublisherServer(srv.Gsrv, &s.GServer) @@ -168,6 +183,37 @@ func (s *Server) PublishOrdered(topic string, data []byte, attrs map[string]stri return res.MessageIds[0] } +// AddPublishResponse adds a new publish response to the channel used for +// responding to publish requests. +func (s *Server) AddPublishResponse(pbr *pb.PublishResponse, err error) { + s.GServer.mu.Lock() + defer s.GServer.mu.Unlock() + pr := &publishResponse{} + if err != nil { + pr.err = err + } else { + pr.resp = pbr + } + s.GServer.publishResponses <- pr +} + +// SetAutoPublishResponse controls whether to automatically respond +// to messages published or to use user-added responses from the +// publishResponses channel. +func (s *Server) SetAutoPublishResponse(autoPublishResponse bool) { + s.GServer.mu.Lock() + defer s.GServer.mu.Unlock() + s.GServer.autoPublishResponse = autoPublishResponse +} + +// ResetPublishResponses resets the buffered publishResponses channel +// with a new buffered channel with the given size. +func (s *Server) ResetPublishResponses(size int) { + s.GServer.mu.Lock() + defer s.GServer.mu.Unlock() + s.GServer.publishResponses = make(chan *publishResponse, size) +} + // SetStreamTimeout sets the amount of time a stream will be active before it shuts // itself down. This mimics the real service's behavior of closing streams after 30 // minutes. If SetStreamTimeout is never called or is passed zero, streams never shut @@ -455,11 +501,11 @@ const ( maxMessageRetentionDuration = 168 * time.Hour ) -var defaultMessageRetentionDuration = durpb.New(maxMessageRetentionDuration) +var defaultMessageRetentionDuration = ptypes.DurationProto(maxMessageRetentionDuration) func checkMRD(pmrd *durpb.Duration) error { - mrd := pmrd.AsDuration() - if mrd < minMessageRetentionDuration || mrd > maxMessageRetentionDuration { + mrd, err := ptypes.Duration(pmrd) + if err != nil || mrd < minMessageRetentionDuration || mrd > maxMessageRetentionDuration { return status.Errorf(codes.InvalidArgument, "bad message_retention_duration %+v", pmrd) } return nil @@ -613,13 +659,25 @@ func (s *GServer) Publish(_ context.Context, req *pb.PublishRequest) (*pb.Publis if top == nil { return nil, status.Errorf(codes.NotFound, "topic %q", req.Topic) } + + if !s.autoPublishResponse { + r := <-s.publishResponses + if r.err != nil { + return nil, r.err + } + return r.resp, nil + } + var ids []string for _, pm := range req.Messages { id := fmt.Sprintf("m%d", s.nextID) s.nextID++ pm.MessageId = id pubTime := s.timeNowFunc() - tsPubTime := timestamppb.New(pubTime) + tsPubTime, err := ptypes.TimestampProto(pubTime) + if err != nil { + return nil, status.Errorf(codes.Internal, err.Error()) + } pm.PublishTime = tsPubTime m := &Message{ ID: id, @@ -830,7 +888,11 @@ func (s *GServer) Seek(ctx context.Context, req *pb.SeekRequest) (*pb.SeekRespon case nil: return nil, status.Errorf(codes.InvalidArgument, "missing Seek target type") case *pb.SeekRequest_Time: - target = v.Time.AsTime() + var err error + target, err = ptypes.Timestamp(v.Time) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "bad Time target: %v", err) + } default: return nil, status.Errorf(codes.Unimplemented, "unhandled Seek target type %T", v) } @@ -978,7 +1040,10 @@ func (s *subscription) maintainMessages(now time.Time) { if m.outstanding() && now.After(m.ackDeadline) { m.makeAvailable() } - pubTime := m.proto.Message.PublishTime.AsTime() + pubTime, err := ptypes.Timestamp(m.proto.Message.PublishTime) + if err != nil { + panic(err) + } // Remove messages that have been undelivered for a long time. if !m.outstanding() && now.Sub(pubTime) > retentionDuration { delete(s.msgs, id) diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index 6b75c2fd4a38..285ab483208d 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -25,13 +25,12 @@ import ( "time" "cloud.google.com/go/internal/testutil" + "github.com/golang/protobuf/ptypes" pb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/genproto/protobuf/field_mask" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" ) func TestTopics(t *testing.T) { @@ -186,7 +185,7 @@ func TestSubscriptionErrors(t *testing.T) { checkCode(err, codes.NotFound) _, err = sclient.Seek(ctx, &pb.SeekRequest{}) checkCode(err, codes.InvalidArgument) - srt := &pb.SeekRequest_Time{Time: timestamppb.Now()} + srt := &pb.SeekRequest_Time{Time: ptypes.TimestampNow()} _, err = sclient.Seek(ctx, &pb.SeekRequest{Target: srt}) checkCode(err, codes.InvalidArgument) _, err = sclient.Seek(ctx, &pb.SeekRequest{Target: srt, Subscription: "s"}) @@ -279,7 +278,10 @@ func publish(t *testing.T, pclient pb.PublisherClient, topic *pb.Topic, messages if err != nil { t.Fatal(err) } - tsPubTime := timestamppb.New(pubTime) + tsPubTime, err := ptypes.TimestampProto(pubTime) + if err != nil { + t.Fatal(err) + } want := map[string]*pb.PubsubMessage{} for i, id := range res.MessageIds { want[id] = &pb.PubsubMessage{ @@ -637,7 +639,7 @@ func TestSeek(t *testing.T) { Topic: top.Name, AckDeadlineSeconds: 10, }) - ts := timestamppb.Now() + ts := ptypes.TimestampNow() _, err := sclient.Seek(context.Background(), &pb.SeekRequest{ Subscription: sub.Name, Target: &pb.SeekRequest_Time{Time: ts}, @@ -698,7 +700,7 @@ func TestTimeNowFunc(t *testing.T) { m := s.Message(id) if m == nil { - t.Fatalf("got nil, want a message") + t.Error("got nil, want a message") } if got, want := m.PublishTime, timeFunc(); got != want { t.Fatalf("got %v, want %v", got, want) @@ -795,8 +797,8 @@ func TestUpdateRetryPolicy(t *testing.T) { Name: "projects/P/subscriptions/S", Topic: top.Name, RetryPolicy: &pb.RetryPolicy{ - MinimumBackoff: durationpb.New(10 * time.Second), - MaximumBackoff: durationpb.New(60 * time.Second), + MinimumBackoff: ptypes.DurationProto(10 * time.Second), + MaximumBackoff: ptypes.DurationProto(60 * time.Second), }, }) @@ -805,8 +807,8 @@ func TestUpdateRetryPolicy(t *testing.T) { Name: sub.Name, Topic: top.Name, RetryPolicy: &pb.RetryPolicy{ - MinimumBackoff: durationpb.New(20 * time.Second), - MaximumBackoff: durationpb.New(100 * time.Second), + MinimumBackoff: ptypes.DurationProto(20 * time.Second), + MaximumBackoff: ptypes.DurationProto(100 * time.Second), }, } @@ -1005,7 +1007,7 @@ func TestErrorInjection(t *testing.T) { }, { funcName: "Seek", - param: &pb.SeekRequest{Target: &pb.SeekRequest_Time{Time: timestamppb.Now()}}, + param: &pb.SeekRequest{Target: &pb.SeekRequest_Time{Time: ptypes.TimestampNow()}}, }, } @@ -1045,3 +1047,35 @@ func TestErrorInjection(t *testing.T) { } } } + +func TestPublishResponse(t *testing.T) { + ctx := context.Background() + _, _, srv, cleanup := newFake(ctx, t) + defer cleanup() + + // By default, autoPublishResponse is true so this should succeed immediately. + got := srv.Publish("projects/p/topics/t", []byte("msg1"), nil) + if want := "m0"; got != want { + t.Fatalf("srv.Publish(): got %v, want %v", got, want) + } + + // After disabling autoPublishResponse, publish() operations + // will read from the channel instead of auto generating messages. + srv.SetAutoPublishResponse(false) + + srv.AddPublishResponse(&pb.PublishResponse{ + MessageIds: []string{"1"}, + }, nil) + got = srv.Publish("projects/p/topics/t", []byte("msg2"), nil) + if want := "1"; got != want { + t.Fatalf("srv.Publish(): got %v, want %v", got, want) + } + + srv.AddPublishResponse(&pb.PublishResponse{ + MessageIds: []string{"2"}, + }, nil) + got = srv.Publish("projects/p/topics/t", []byte("msg3"), nil) + if want := "2"; got != want { + t.Fatalf("srv.Publish(): got %v, want %v", got, want) + } +} From 5db7ba76ce0686aece9a30b9300bf1a8c4b1c92a Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Fri, 11 Jun 2021 17:03:36 -0700 Subject: [PATCH 04/15] resolve bad merge for proto library migration --- pubsub/pstest/fake.go | 49 +++++++++++++++----------------------- pubsub/pstest/fake_test.go | 24 +++++++++---------- 2 files changed, 30 insertions(+), 43 deletions(-) diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index 975a557eb0f5..fcc546bee846 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -34,12 +34,12 @@ import ( "time" "cloud.google.com/go/internal/testutil" - "github.com/golang/protobuf/ptypes" - durpb "github.com/golang/protobuf/ptypes/duration" - emptypb "github.com/golang/protobuf/ptypes/empty" pb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + durpb "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/protobuf/types/known/timestamppb" ) // ReactorOptions is a map that Server uses to look up reactors. @@ -93,18 +93,17 @@ type GServer struct { pb.PublisherServer pb.SubscriberServer - mu sync.Mutex - topics map[string]*topic - subs map[string]*subscription - msgs []*Message // all messages ever published - msgsByID map[string]*Message - wg sync.WaitGroup - nextID int - streamTimeout time.Duration - timeNowFunc func() time.Time - - // reactorOptions allow configuring the reaction function for calls. + mu sync.Mutex + topics map[string]*topic + subs map[string]*subscription + msgs []*Message // all messages ever published + msgsByID map[string]*Message + wg sync.WaitGroup + nextID int + streamTimeout time.Duration + timeNowFunc func() time.Time reactorOptions ReactorOptions + // PublishResponses is a channel of responses to use for Publish. publishResponses chan *publishResponse // autoPublishResponse enables the server to automatically generate @@ -501,11 +500,11 @@ const ( maxMessageRetentionDuration = 168 * time.Hour ) -var defaultMessageRetentionDuration = ptypes.DurationProto(maxMessageRetentionDuration) +var defaultMessageRetentionDuration = durpb.New(maxMessageRetentionDuration) func checkMRD(pmrd *durpb.Duration) error { - mrd, err := ptypes.Duration(pmrd) - if err != nil || mrd < minMessageRetentionDuration || mrd > maxMessageRetentionDuration { + mrd := pmrd.AsDuration() + if mrd < minMessageRetentionDuration || mrd > maxMessageRetentionDuration { return status.Errorf(codes.InvalidArgument, "bad message_retention_duration %+v", pmrd) } return nil @@ -674,10 +673,7 @@ func (s *GServer) Publish(_ context.Context, req *pb.PublishRequest) (*pb.Publis s.nextID++ pm.MessageId = id pubTime := s.timeNowFunc() - tsPubTime, err := ptypes.TimestampProto(pubTime) - if err != nil { - return nil, status.Errorf(codes.Internal, err.Error()) - } + tsPubTime := timestamppb.New(pubTime) pm.PublishTime = tsPubTime m := &Message{ ID: id, @@ -888,11 +884,7 @@ func (s *GServer) Seek(ctx context.Context, req *pb.SeekRequest) (*pb.SeekRespon case nil: return nil, status.Errorf(codes.InvalidArgument, "missing Seek target type") case *pb.SeekRequest_Time: - var err error - target, err = ptypes.Timestamp(v.Time) - if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "bad Time target: %v", err) - } + target = v.Time.AsTime() default: return nil, status.Errorf(codes.Unimplemented, "unhandled Seek target type %T", v) } @@ -1040,10 +1032,7 @@ func (s *subscription) maintainMessages(now time.Time) { if m.outstanding() && now.After(m.ackDeadline) { m.makeAvailable() } - pubTime, err := ptypes.Timestamp(m.proto.Message.PublishTime) - if err != nil { - panic(err) - } + pubTime := m.proto.Message.PublishTime.AsTime() // Remove messages that have been undelivered for a long time. if !m.outstanding() && now.Sub(pubTime) > retentionDuration { delete(s.msgs, id) diff --git a/pubsub/pstest/fake_test.go b/pubsub/pstest/fake_test.go index 285ab483208d..4546f46368d7 100644 --- a/pubsub/pstest/fake_test.go +++ b/pubsub/pstest/fake_test.go @@ -25,12 +25,13 @@ import ( "time" "cloud.google.com/go/internal/testutil" - "github.com/golang/protobuf/ptypes" pb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/genproto/protobuf/field_mask" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" ) func TestTopics(t *testing.T) { @@ -185,7 +186,7 @@ func TestSubscriptionErrors(t *testing.T) { checkCode(err, codes.NotFound) _, err = sclient.Seek(ctx, &pb.SeekRequest{}) checkCode(err, codes.InvalidArgument) - srt := &pb.SeekRequest_Time{Time: ptypes.TimestampNow()} + srt := &pb.SeekRequest_Time{Time: timestamppb.Now()} _, err = sclient.Seek(ctx, &pb.SeekRequest{Target: srt}) checkCode(err, codes.InvalidArgument) _, err = sclient.Seek(ctx, &pb.SeekRequest{Target: srt, Subscription: "s"}) @@ -278,10 +279,7 @@ func publish(t *testing.T, pclient pb.PublisherClient, topic *pb.Topic, messages if err != nil { t.Fatal(err) } - tsPubTime, err := ptypes.TimestampProto(pubTime) - if err != nil { - t.Fatal(err) - } + tsPubTime := timestamppb.New(pubTime) want := map[string]*pb.PubsubMessage{} for i, id := range res.MessageIds { want[id] = &pb.PubsubMessage{ @@ -639,7 +637,7 @@ func TestSeek(t *testing.T) { Topic: top.Name, AckDeadlineSeconds: 10, }) - ts := ptypes.TimestampNow() + ts := timestamppb.Now() _, err := sclient.Seek(context.Background(), &pb.SeekRequest{ Subscription: sub.Name, Target: &pb.SeekRequest_Time{Time: ts}, @@ -700,7 +698,7 @@ func TestTimeNowFunc(t *testing.T) { m := s.Message(id) if m == nil { - t.Error("got nil, want a message") + t.Fatalf("got nil, want a message") } if got, want := m.PublishTime, timeFunc(); got != want { t.Fatalf("got %v, want %v", got, want) @@ -797,8 +795,8 @@ func TestUpdateRetryPolicy(t *testing.T) { Name: "projects/P/subscriptions/S", Topic: top.Name, RetryPolicy: &pb.RetryPolicy{ - MinimumBackoff: ptypes.DurationProto(10 * time.Second), - MaximumBackoff: ptypes.DurationProto(60 * time.Second), + MinimumBackoff: durationpb.New(10 * time.Second), + MaximumBackoff: durationpb.New(60 * time.Second), }, }) @@ -807,8 +805,8 @@ func TestUpdateRetryPolicy(t *testing.T) { Name: sub.Name, Topic: top.Name, RetryPolicy: &pb.RetryPolicy{ - MinimumBackoff: ptypes.DurationProto(20 * time.Second), - MaximumBackoff: ptypes.DurationProto(100 * time.Second), + MinimumBackoff: durationpb.New(20 * time.Second), + MaximumBackoff: durationpb.New(100 * time.Second), }, } @@ -1007,7 +1005,7 @@ func TestErrorInjection(t *testing.T) { }, { funcName: "Seek", - param: &pb.SeekRequest{Target: &pb.SeekRequest_Time{Time: ptypes.TimestampNow()}}, + param: &pb.SeekRequest{Target: &pb.SeekRequest_Time{Time: timestamppb.Now()}}, }, } From 8bfe09eea2894a9b9208675a06887a1ab76d4bc9 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Tue, 15 Jun 2021 12:19:47 -0700 Subject: [PATCH 05/15] move flow control settings to struct --- pubsub/flow_controller.go | 34 ++++++++++++++++++++++++---------- pubsub/flow_controller_test.go | 24 ++++++++++++++++-------- pubsub/subscription.go | 6 +++++- pubsub/topic.go | 24 +++++++++++------------- pubsub/topic_test.go | 13 ++++++++----- 5 files changed, 64 insertions(+), 37 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index 61fef347ad15..893ee7588d2a 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -35,6 +35,20 @@ const ( FlowControlSignalError ) +type FlowControlSettings struct { + // MaxOutstandingMessages is the maximum number of bufered messages to be published. + MaxOutstandingMessages int + + // MaxOutstandingBytes is the maximum size of buffered messages to be published. + MaxOutstandingBytes int + + // LimitExceededBehavior configures the behavior when trying to publish + // additional messages while the flow controller is full. The available options + // include Block (default), Ignore (disable), and SignalError (publish + // results will return an error). + LimitExceededBehavior LimitExceededBehavior +} + var ( ErrFlowControllerMaxOutstandingMessages = errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded") ErrFlowControllerMaxOutstandingBytes = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded") @@ -59,21 +73,21 @@ type flowController struct { // maxCount messages or maxSize bytes are outstanding at once. If maxCount or // maxSize is < 1, then an unlimited number of messages or bytes is permitted, // respectively. -func newFlowController(maxCount, maxSize int, behavior LimitExceededBehavior) *flowController { - fc := &flowController{ - maxCount: maxCount, - maxSize: maxSize, +func newFlowController(fc FlowControlSettings) *flowController { + f := &flowController{ + maxCount: fc.MaxOutstandingMessages, + maxSize: fc.MaxOutstandingBytes, semCount: nil, semSize: nil, - limitBehavior: behavior, + limitBehavior: fc.LimitExceededBehavior, } - if maxCount > 0 { - fc.semCount = semaphore.NewWeighted(int64(maxCount)) + if fc.MaxOutstandingMessages > 0 { + f.semCount = semaphore.NewWeighted(int64(fc.MaxOutstandingMessages)) } - if maxSize > 0 { - fc.semSize = semaphore.NewWeighted(int64(maxSize)) + if fc.MaxOutstandingBytes > 0 { + f.semSize = semaphore.NewWeighted(int64(fc.MaxOutstandingBytes)) } - return fc + return f } // acquire allocates space for a message: the message count and its size. diff --git a/pubsub/flow_controller_test.go b/pubsub/flow_controller_test.go index c6474367d071..0cf9038e0633 100644 --- a/pubsub/flow_controller_test.go +++ b/pubsub/flow_controller_test.go @@ -25,10 +25,18 @@ import ( "golang.org/x/sync/errgroup" ) +func fcSettings(c int, s int, l LimitExceededBehavior) FlowControlSettings { + return FlowControlSettings{ + MaxOutstandingMessages: c, + MaxOutstandingBytes: s, + LimitExceededBehavior: l, + } +} + func TestFlowControllerCancel(t *testing.T) { // Test canceling a flow controller's context. t.Parallel() - fc := newFlowController(3, 10, FlowControlBlock) + fc := newFlowController(fcSettings(3, 10, FlowControlBlock)) if err := fc.acquire(context.Background(), 5); err != nil { t.Fatal(err) } @@ -51,7 +59,7 @@ func TestFlowControllerCancel(t *testing.T) { func TestFlowControllerLargeRequest(t *testing.T) { // Large requests succeed, consuming the entire allotment. t.Parallel() - fc := newFlowController(3, 10, FlowControlBlock) + fc := newFlowController(fcSettings(3, 10, FlowControlBlock)) err := fc.acquire(context.Background(), 11) if err != nil { t.Fatal(err) @@ -64,7 +72,7 @@ func TestFlowControllerNoStarve(t *testing.T) { t.Parallel() ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) defer cancel() - fc := newFlowController(10, 10, FlowControlBlock) + fc := newFlowController(fcSettings(10, 10, FlowControlBlock)) first := make(chan int) for i := 0; i < 20; i++ { go func() { @@ -120,7 +128,7 @@ func TestFlowControllerSaturation(t *testing.T) { wantSize: 9, }, } { - fc := newFlowController(maxCount, maxSize, FlowControlBlock) + fc := newFlowController(fcSettings(maxCount, maxSize, FlowControlBlock)) // Atomically track flow controller state. // The flowController itself tracks count. var curSize int64 @@ -176,7 +184,7 @@ func TestFlowControllerSaturation(t *testing.T) { func TestFlowControllerTryAcquire(t *testing.T) { t.Parallel() - fc := newFlowController(3, 10, FlowControlSignalError) + fc := newFlowController(fcSettings(3, 10, FlowControlSignalError)) ctx := context.Background() // Successfully newAcquire 4 bytes. @@ -198,7 +206,7 @@ func TestFlowControllerTryAcquire(t *testing.T) { func TestFlowControllerUnboundedCount(t *testing.T) { t.Parallel() ctx := context.Background() - fc := newFlowController(0, 10, FlowControlSignalError) + fc := newFlowController(fcSettings(0, 10, FlowControlSignalError)) // Successfully acquire 4 bytes. if err := fc.acquire(ctx, 4); err != nil { @@ -219,7 +227,7 @@ func TestFlowControllerUnboundedCount(t *testing.T) { func TestFlowControllerUnboundedCount2(t *testing.T) { t.Parallel() ctx := context.Background() - fc := newFlowController(0, 0, FlowControlSignalError) + fc := newFlowController(fcSettings(0, 0, FlowControlSignalError)) // Successfully acquire 4 bytes. if err := fc.acquire(ctx, 4); err != nil { t.Errorf("got %v, wanted no error", err) @@ -237,7 +245,7 @@ func TestFlowControllerUnboundedCount2(t *testing.T) { func TestFlowControllerUnboundedBytes(t *testing.T) { t.Parallel() ctx := context.Background() - fc := newFlowController(2, 0, FlowControlBlock) + fc := newFlowController(fcSettings(2, 0, FlowControlBlock)) // Successfully acquire 4GB. if err := fc.acquire(ctx, 4e9); err != nil { diff --git a/pubsub/subscription.go b/pubsub/subscription.go index 340c0e36dbb5..3e0a05e32f16 100644 --- a/pubsub/subscription.go +++ b/pubsub/subscription.go @@ -824,7 +824,11 @@ func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Mes maxOutstandingBytes: maxBytes, useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl, } - fc := newFlowController(maxCount, maxBytes, FlowControlBlock) + fc := newFlowController(FlowControlSettings{ + MaxOutstandingMessages: maxCount, + MaxOutstandingBytes: maxBytes, + LimitExceededBehavior: FlowControlBlock, + }) sched := scheduler.NewReceiveScheduler(maxCount) diff --git a/pubsub/topic.go b/pubsub/topic.go index af2e8166153d..9c73cfdcfd95 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -98,22 +98,15 @@ type PublishSettings struct { Timeout time.Duration // The maximum number of bytes that the Bundler will keep in memory before - // returning ErrOverflow. + // returning ErrOverflow. This is now superseded by FlowControlSettings.MaxOutstandingBytes. + // If both are set, this value will be used for both settings. // // Defaults to DefaultPublishSettings.BufferedByteLimit. + // Deprecated. BufferedByteLimit int - // MaxOutstandingMessages is the maximum number of bufered messages to be published. - MaxOutstandingMessages int - - // MaxOutstandingBytes is the maximum size of buffered messages to be published. - MaxOutstandingBytes int - - // LimitExceededBehavior configures the behavior when trying to publish - // additional messages while the flow controller is full. The available options - // include Block (default), Ignore (disable), and SignalError (publish - // results will return an error). - LimitExceededBehavior LimitExceededBehavior + // FlowControlSettings defines publisher flow control settings. + FlowControlSettings FlowControlSettings } // DefaultPublishSettings holds the default values for topics' PublishSettings. @@ -126,6 +119,11 @@ var DefaultPublishSettings = PublishSettings{ // chosen as a reasonable amount of messages in the worst case whilst still // capping the number to a low enough value to not OOM users. BufferedByteLimit: 10 * MaxPublishRequestBytes, + FlowControlSettings: FlowControlSettings{ + MaxOutstandingMessages: 1000, + MaxOutstandingBytes: 10 * 1024 * 1024, + LimitExceededBehavior: FlowControlBlock, + }, } // CreateTopic creates a new topic. @@ -529,7 +527,7 @@ func (t *Topic) initBundler() { workers = 25 * runtime.GOMAXPROCS(0) } - t.fc = newFlowController(t.PublishSettings.MaxOutstandingMessages, t.PublishSettings.MaxOutstandingBytes, t.PublishSettings.LimitExceededBehavior) + t.fc = newFlowController(t.PublishSettings.FlowControlSettings) t.scheduler = scheduler.NewPublishScheduler(workers, func(bundle interface{}) { // TODO(jba): use a context detached from the one passed to NewClient. diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index 653167befa78..e4d69b9445de 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -360,9 +360,12 @@ func TestPublishFlowControl_SignalError(t *testing.T) { if err != nil { t.Fatal(err) } - topic.PublishSettings.MaxOutstandingMessages = 1 - topic.PublishSettings.MaxOutstandingBytes = 10 - topic.PublishSettings.LimitExceededBehavior = FlowControlSignalError + fc := FlowControlSettings{ + MaxOutstandingMessages: 1, + MaxOutstandingBytes: 10, + LimitExceededBehavior: FlowControlSignalError, + } + topic.PublishSettings.FlowControlSettings = fc // Sending a message that is too large results in an error in SignalError mode. r := publishSingleMessage(ctx, topic, "AAAAAAAAAAA") @@ -397,7 +400,7 @@ func TestPublishFlowControl_SignalError(t *testing.T) { } topic.scheduler = nil - topic.PublishSettings.LimitExceededBehavior = FlowControlBlock + topic.PublishSettings.FlowControlSettings.LimitExceededBehavior = FlowControlBlock // Publish 11 messages, nothing should fail. res = []*PublishResult{} for i := 0; i < 11; i++ { @@ -413,7 +416,7 @@ func TestPublishFlowControl_SignalError(t *testing.T) { } topic.scheduler = nil - topic.PublishSettings.LimitExceededBehavior = FlowControlSignalError + topic.PublishSettings.FlowControlSettings.LimitExceededBehavior = FlowControlSignalError // Publish 11 messages, should signal error on the 11th message. res = []*PublishResult{} for i := 0; i < 11; i++ { From 7cadb33e42e78b9a1c5350b3e96350223075e66d Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Thu, 17 Jun 2021 10:13:07 -0700 Subject: [PATCH 06/15] fix issue with max bundler size calculation --- pubsub/flow_controller.go | 10 +-- pubsub/integration_test.go | 168 +++++++++++++++++++------------------ pubsub/topic.go | 58 ++++++++----- 3 files changed, 130 insertions(+), 106 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index 893ee7588d2a..00f60e1d2415 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -27,10 +27,10 @@ import ( type LimitExceededBehavior int const ( - // FlowControlIgnore disables flow control. - FlowControlIgnore LimitExceededBehavior = iota // FlowControlBlock signals to wait until the request can be made without exceeding the limit. - FlowControlBlock + FlowControlBlock LimitExceededBehavior = iota + // FlowControlIgnore disables flow control. + FlowControlIgnore // FlowControlSignalError signals an error to the caller of acquire. FlowControlSignalError ) @@ -144,9 +144,9 @@ func (f *flowController) release(ctx context.Context, size int) { return } atomic.AddInt64(&f.countRemaining, -1) - outstandingMessages := atomic.AddInt64(&f.countRemaining, 1) + outstandingMessages := atomic.AddInt64(&f.countRemaining, -1) recordStat(ctx, OutstandingMessages, outstandingMessages) - outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size)) + outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size)) recordStat(ctx, OutstandingBytes, outstandingBytes) if f.semCount != nil { diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index 9a9452257b95..d19e56e96c35 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -224,92 +224,95 @@ func withGoogleClientInfo(ctx context.Context) context.Context { } func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous bool, numMsgs, extraBytes int) { - ctx := context.Background() - topic, err := client.CreateTopic(ctx, topicIDs.New()) - if err != nil { - t.Errorf("CreateTopic error: %v", err) - } - defer topic.Stop() - exists, err := topic.Exists(ctx) - if err != nil { - t.Fatalf("TopicExists error: %v", err) - } - if !exists { - t.Errorf("topic %v should exist, but it doesn't", topic) - } - - var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil { - t.Errorf("CreateSub error: %v", err) - } - exists, err = sub.Exists(ctx) - if err != nil { - t.Fatalf("SubExists error: %v", err) - } - if !exists { - t.Errorf("subscription %s should exist, but it doesn't", sub.ID()) - } - var msgs []*Message - for i := 0; i < numMsgs; i++ { - text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes)) - attrs := make(map[string]string) - attrs["foo"] = "bar" - msgs = append(msgs, &Message{ - Data: []byte(text), - Attributes: attrs, - }) - } + t.Run(fmt.Sprintf("maxMsgs:%d,synchronous:%t,numMsgs:%d", maxMsgs, synchronous, numMsgs), + func(t *testing.T) { + ctx := context.Background() + topic, err := client.CreateTopic(ctx, topicIDs.New()) + if err != nil { + t.Errorf("CreateTopic error: %v", err) + } + defer topic.Stop() + exists, err := topic.Exists(ctx) + if err != nil { + t.Fatalf("TopicExists error: %v", err) + } + if !exists { + t.Errorf("topic %v should exist, but it doesn't", topic) + } - // Publish some messages. - type pubResult struct { - m *Message - r *PublishResult - } - var rs []pubResult - for _, m := range msgs { - r := topic.Publish(ctx, m) - rs = append(rs, pubResult{m, r}) - } - want := make(map[string]messageData) - for _, res := range rs { - id, err := res.r.Get(ctx) - if err != nil { - t.Fatal(err) - } - md := extractMessageData(res.m) - md.ID = id - want[md.ID] = md - } + var sub *Subscription + if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil { + t.Errorf("CreateSub error: %v", err) + } + exists, err = sub.Exists(ctx) + if err != nil { + t.Fatalf("SubExists error: %v", err) + } + if !exists { + t.Errorf("subscription %s should exist, but it doesn't", sub.ID()) + } + var msgs []*Message + for i := 0; i < numMsgs; i++ { + text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes)) + attrs := make(map[string]string) + attrs["foo"] = "bar" + msgs = append(msgs, &Message{ + Data: []byte(text), + Attributes: attrs, + }) + } - sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs - sub.ReceiveSettings.Synchronous = synchronous + // Publish some messages. + type pubResult struct { + m *Message + r *PublishResult + } + var rs []pubResult + for _, m := range msgs { + r := topic.Publish(ctx, m) + rs = append(rs, pubResult{m, r}) + } + want := make(map[string]messageData) + for _, res := range rs { + id, err := res.r.Get(ctx) + if err != nil { + t.Fatal(err) + } + md := extractMessageData(res.m) + md.ID = id + want[md.ID] = md + } - // Use a timeout to ensure that Pull does not block indefinitely if there are - // unexpectedly few messages available. - now := time.Now() - timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() - gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) { - m.Ack() - }) - if err != nil { - if c := status.Convert(err); c.Code() == codes.Canceled { - if time.Since(now) >= time.Minute { - t.Fatal("pullN took too long") + sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs + sub.ReceiveSettings.Synchronous = synchronous + + // Use a timeout to ensure that Pull does not block indefinitely if there are + // unexpectedly few messages available. + now := time.Now() + timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) { + m.Ack() + }) + if err != nil { + if c := status.Convert(err); c.Code() == codes.Canceled { + if time.Since(now) >= time.Minute { + t.Fatal("pullN took too long") + } + } else { + t.Fatalf("Pull: %v", err) + } } - } else { - t.Fatalf("Pull: %v", err) - } - } - got := make(map[string]messageData) - for _, m := range gotMsgs { - md := extractMessageData(m) - got[md.ID] = md - } - if !testutil.Equal(got, want) { - t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v", - maxMsgs, synchronous, got, want) - } + got := make(map[string]messageData) + for _, m := range gotMsgs { + md := extractMessageData(m) + got[md.ID] = md + } + if !testutil.Equal(got, want) { + t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v", + maxMsgs, synchronous, got, want) + } + }) } // IAM tests. @@ -413,6 +416,7 @@ func TestIntegration_LargePublishSize(t *testing.T) { msg := &Message{ Data: bytes.Repeat([]byte{'A'}, maxLengthSingleMessage), } + topic.PublishSettings.FlowControlSettings.LimitExceededBehavior = FlowControlSignalError r := topic.Publish(ctx, msg) if _, err := r.Get(ctx); err != nil { t.Fatalf("Failed to publish max length message: %v", err) diff --git a/pubsub/topic.go b/pubsub/topic.go index 9c73cfdcfd95..33ac77c61f35 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -121,7 +121,7 @@ var DefaultPublishSettings = PublishSettings{ BufferedByteLimit: 10 * MaxPublishRequestBytes, FlowControlSettings: FlowControlSettings{ MaxOutstandingMessages: 1000, - MaxOutstandingBytes: 10 * 1024 * 1024, + MaxOutstandingBytes: 10 * MaxPublishRequestBytes, LimitExceededBehavior: FlowControlBlock, }, } @@ -439,20 +439,14 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { return r } - // Use a PublishRequest with only the Messages field to calculate the size - // of an individual message. This accurately calculates the size of the - // encoded proto message by accounting for the length of an individual - // PubSubMessage and Data/Attributes field. - // TODO(hongalex): if this turns out to take significant time, try to approximate it. - msgSize := proto.Size(&pb.PublishRequest{ - Messages: []*pb.PubsubMessage{ - { - Data: msg.Data, - Attributes: msg.Attributes, - OrderingKey: msg.OrderingKey, - }, - }, + // Calculate the size of the encoded proto message by accounting + // for the length of an individual PubSubMessage and Data/Attributes field. + msgSize := proto.Size(&pb.PubsubMessage{ + Data: msg.Data, + Attributes: msg.Attributes, + OrderingKey: msg.OrderingKey, }) + t.initBundler() t.mu.RLock() defer t.mu.RUnlock() @@ -527,8 +521,6 @@ func (t *Topic) initBundler() { workers = 25 * runtime.GOMAXPROCS(0) } - t.fc = newFlowController(t.PublishSettings.FlowControlSettings) - t.scheduler = scheduler.NewPublishScheduler(workers, func(bundle interface{}) { // TODO(jba): use a context detached from the one passed to NewClient. ctx := context.TODO() @@ -546,13 +538,43 @@ func (t *Topic) initBundler() { } t.scheduler.BundleByteThreshold = t.PublishSettings.ByteThreshold + fcs := DefaultPublishSettings.FlowControlSettings + // FlowControlSettings.MaxOutstandingBytes and BufferedByteLimit have generally + // the same behavior, with the latter always returning and error. BufferedByteLimit + // is deprecated in favor of MaxOutstandingBytes. + // + // While we continue to support both, check if either has been set directly, and use + // that value. If both have been set, we will respect MaxOutstandingBytes first. + var mo, bb bool + if t.PublishSettings.BufferedByteLimit > 0 { + bb = true + } + if t.PublishSettings.FlowControlSettings.MaxOutstandingBytes > 0 { + mo = true + } + if bb && !mo { + fcs.MaxOutstandingBytes = t.PublishSettings.BufferedByteLimit + } else if mo { + fcs.MaxOutstandingBytes = t.PublishSettings.FlowControlSettings.MaxOutstandingBytes + } + if t.PublishSettings.FlowControlSettings.LimitExceededBehavior != FlowControlBlock { + fcs.LimitExceededBehavior = t.PublishSettings.FlowControlSettings.LimitExceededBehavior + } + if t.PublishSettings.FlowControlSettings.MaxOutstandingMessages > 0 { + fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages + } + + t.fc = newFlowController(fcs) + bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit if t.PublishSettings.BufferedByteLimit > 0 { bufferedByteLimit = t.PublishSettings.BufferedByteLimit } t.scheduler.BufferedByteLimit = bufferedByteLimit - t.scheduler.BundleByteLimit = MaxPublishRequestBytes - calcFieldSizeString(t.name) + t.scheduler.BundleByteLimit = MaxPublishRequestBytes - calcFieldSizeString(t.name) - 5 + fmt.Printf("calcFieldSizeString is: %d\n", calcFieldSizeString(t.name)) + fmt.Printf("limit is: %d\n", t.scheduler.BundleByteLimit) } func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) { @@ -570,8 +592,6 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) OrderingKey: bm.msg.OrderingKey, } pbMsgs[i] = msg - // Calculate size of message for flow control release. - bm.size = proto.Size(msg) bm.msg = nil // release bm.msg for GC } var res *pb.PublishResponse From df39d408b6f62372060ca2140c1dba61a4dbcd2e Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Mon, 21 Jun 2021 22:19:53 -0700 Subject: [PATCH 07/15] add integration tests for publish flow control --- pubsub/integration_test.go | 1 + pubsub/topic.go | 4 +- pubsub/topic_test.go | 198 +++++++++++++++++++++++++++---------- 3 files changed, 148 insertions(+), 55 deletions(-) diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index d19e56e96c35..dc2cf60b336c 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -226,6 +226,7 @@ func withGoogleClientInfo(ctx context.Context) context.Context { func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous bool, numMsgs, extraBytes int) { t.Run(fmt.Sprintf("maxMsgs:%d,synchronous:%t,numMsgs:%d", maxMsgs, synchronous, numMsgs), func(t *testing.T) { + t.Parallel() ctx := context.Background() topic, err := client.CreateTopic(ctx, topicIDs.New()) if err != nil { diff --git a/pubsub/topic.go b/pubsub/topic.go index 33ac77c61f35..7a7d6ebb31a1 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -572,9 +572,9 @@ func (t *Topic) initBundler() { } t.scheduler.BufferedByteLimit = bufferedByteLimit + // Calculate the max limit of a single bundle. 5 comes from the number of bytes + // needed to be reserved for encoding the PubsubMessage repeated field. t.scheduler.BundleByteLimit = MaxPublishRequestBytes - calcFieldSizeString(t.name) - 5 - fmt.Printf("calcFieldSizeString is: %d\n", calcFieldSizeString(t.name)) - fmt.Printf("limit is: %d\n", t.scheduler.BundleByteLimit) } func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) { diff --git a/pubsub/topic_test.go b/pubsub/topic_test.go index 218556e419f2..446ab424b1f1 100644 --- a/pubsub/topic_test.go +++ b/pubsub/topic_test.go @@ -18,13 +18,16 @@ import ( "bytes" "context" "fmt" + "sync" "testing" "time" "cloud.google.com/go/internal/testutil" + "cloud.google.com/go/pubsub/pstest" "google.golang.org/api/iterator" "google.golang.org/api/option" "google.golang.org/api/support/bundler" + pb "google.golang.org/genproto/googleapis/pubsub/v1" pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -399,80 +402,169 @@ func TestPublishFlowControl_SignalError(t *testing.T) { } topic.PublishSettings.FlowControlSettings = fc + srv.SetAutoPublishResponse(false) + // Sending a message that is too large results in an error in SignalError mode. - r := publishSingleMessage(ctx, topic, "AAAAAAAAAAA") - if _, err := r.Get(ctx); err != ErrFlowControllerMaxOutstandingBytes { + r1 := publishSingleMessage(ctx, topic, "AAAAAAAAAAA") + if _, err := r1.Get(ctx); err != ErrFlowControllerMaxOutstandingBytes { t.Fatalf("publishResult.Get(): got %v, want %v", err, ErrFlowControllerMaxOutstandingBytes) } // Sending a second message succeeds. - r = publishSingleMessage(ctx, topic, "AAAA") - if _, err := r.Get(ctx); err != nil { - t.Fatalf("publishResult.Get(): got %v, want nil", err) - } + r2 := publishSingleMessage(ctx, topic, "AAAA") // Sending a third message fails because of the outstanding message. - r = publishSingleMessage(ctx, topic, "AA") - if _, err := r.Get(ctx); err != ErrFlowControllerMaxOutstandingMessages { + r3 := publishSingleMessage(ctx, topic, "AA") + if _, err := r3.Get(ctx); err != ErrFlowControllerMaxOutstandingMessages { t.Fatalf("publishResult.Get(): got %v, want %v", err, ErrFlowControllerMaxOutstandingMessages) } - // Publish 11 messages, nothing should fail. - var res []*PublishResult - for i := 0; i < 11; i++ { - r := topic.Publish(ctx, &Message{ - Data: []byte("test"), - }) - res = append(res, r) + srv.AddPublishResponse(&pb.PublishResponse{ + MessageIds: []string{"1"}, + }, nil) + got, err := r2.Get(ctx) + if err != nil { + t.Fatalf("publishResult.Get(): got %v", err) } - for _, r := range res { - if _, err := r.Get(ctx); err != nil { - t.Fatalf("got err: %v", err) - } + if want := "1"; got != want { + t.Fatalf("publishResult.Get() got: %s, want %s", got, want) } - topic.scheduler = nil - topic.PublishSettings.FlowControlSettings.LimitExceededBehavior = FlowControlBlock - // Publish 11 messages, nothing should fail. - res = []*PublishResult{} - for i := 0; i < 11; i++ { - r := topic.Publish(ctx, &Message{ - Data: []byte("test"), - }) - res = append(res, r) + // Sending another messages succeeds. + r4 := publishSingleMessage(ctx, topic, "AAAA") + srv.AddPublishResponse(&pb.PublishResponse{ + MessageIds: []string{"2"}, + }, nil) + got, err = r4.Get(ctx) + if err != nil { + t.Fatalf("publishResult.Get(): got %v", err) } - for _, r := range res { - if _, err := r.Get(ctx); err != nil { - t.Fatalf("got err: %v", err) - } + if want := "2"; got != want { + t.Fatalf("publishResult.Get() got: %s, want %s", got, want) } - topic.scheduler = nil - topic.PublishSettings.FlowControlSettings.LimitExceededBehavior = FlowControlSignalError - // Publish 11 messages, should signal error on the 11th message. - res = []*PublishResult{} - for i := 0; i < 11; i++ { - r := topic.Publish(ctx, &Message{ - Data: []byte("test"), - }) - res = append(res, r) - } - for i, r := range res { - if i == 10 { - if _, err := r.Get(ctx); err != ErrFlowControllerMaxOutstandingMessages { - t.Fatalf("publishResult.Get(): got %v, want %v", err, ErrFlowControllerMaxOutstandingMessages) - } - } else { - if _, err := r.Get(ctx); err != nil { - t.Fatalf("got err: %v", err) - } - } +} + +func TestPublishFlowControl_SignalErrorOrderingKey(t *testing.T) { + ctx := context.Background() + c, srv := newFake(t) + defer c.Close() + defer srv.Close() + + topic, err := c.CreateTopic(ctx, "some-topic") + if err != nil { + t.Fatal(err) + } + fc := FlowControlSettings{ + MaxOutstandingMessages: 1, + MaxOutstandingBytes: 10, + LimitExceededBehavior: FlowControlSignalError, + } + topic.PublishSettings.FlowControlSettings = fc + topic.PublishSettings.DelayThreshold = 5 * time.Second + topic.PublishSettings.CountThreshold = 1 + topic.EnableMessageOrdering = true + + // Sending a message that is too large reuslts in an error. + r1 := publishSingleMessageWithKey(ctx, topic, "AAAAAAAAAAA", "a") + if _, err := r1.Get(ctx); err != ErrFlowControllerMaxOutstandingBytes { + t.Fatalf("r1.Get() got: %v, want %v", err, ErrFlowControllerMaxOutstandingBytes) + } + + // Sending a second message for the same ordering key fails because the first one failed. + r2 := publishSingleMessageWithKey(ctx, topic, "AAAA", "a") + if _, err := r2.Get(ctx); err == nil { + t.Fatal("r2.Get() got nil instead of error before calling topic.ResumePublish(key)") } } -// publishSingleMessage published a single message to a topic. +func TestPublishFlowControl_Block(t *testing.T) { + ctx := context.Background() + c, srv := newFake(t) + defer c.Close() + defer srv.Close() + + topic, err := c.CreateTopic(ctx, "some-topic") + if err != nil { + t.Fatal(err) + } + fc := FlowControlSettings{ + MaxOutstandingMessages: 2, + MaxOutstandingBytes: 10, + LimitExceededBehavior: FlowControlBlock, + } + topic.PublishSettings.FlowControlSettings = fc + topic.PublishSettings.DelayThreshold = 5 * time.Second + topic.PublishSettings.CountThreshold = 1 + + srv.SetAutoPublishResponse(false) + + var sendResponse1, response1Sent, sendResponse2 sync.WaitGroup + sendResponse1.Add(1) + response1Sent.Add(1) + sendResponse2.Add(1) + + go func() { + sendResponse1.Wait() + addSingleResponse(srv, "1") + response1Sent.Done() + sendResponse2.Wait() + addSingleResponse(srv, "2") + }() + + // Sending two messages succeeds. + publishSingleMessage(ctx, topic, "AA") + publishSingleMessage(ctx, topic, "AA") + + // Sendinga third message blocks because the messages are outstanding + var publish3Completed, response3Sent sync.WaitGroup + publish3Completed.Add(1) + response3Sent.Add(1) + go func() { + publishSingleMessage(ctx, topic, "AAAAAA") + publish3Completed.Done() + }() + + go func() { + sendResponse1.Done() + response1Sent.Wait() + sendResponse2.Done() + }() + + var publish4Completed sync.WaitGroup + publish4Completed.Add(1) + + go func() { + publish3Completed.Wait() + publishSingleMessage(ctx, topic, "A") + publish4Completed.Done() + }() + + publish3Completed.Wait() + addSingleResponse(srv, "3") + response3Sent.Done() + + publish4Completed.Wait() +} + +// publishSingleMessage publishes a single message to a topic. func publishSingleMessage(ctx context.Context, t *Topic, data string) *PublishResult { return t.Publish(ctx, &Message{ Data: []byte(data), }) } + +// publishSingleMessageWithKey publishes a single message to a topic with an ordering key. +func publishSingleMessageWithKey(ctx context.Context, t *Topic, data, key string) *PublishResult { + return t.Publish(ctx, &Message{ + Data: []byte(data), + OrderingKey: key, + }) +} + +// addSingleResponse adds a publish response to the provided fake. +func addSingleResponse(srv *pstest.Server, id string) { + srv.AddPublishResponse(&pb.PublishResponse{ + MessageIds: []string{id}, + }, nil) +} From 3b5fd1f28294ca45f1ca76be14f468328c6a2557 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Mon, 21 Jun 2021 22:33:40 -0700 Subject: [PATCH 08/15] improve docs --- pubsub/flow_controller.go | 4 +++- pubsub/topic.go | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index 00f60e1d2415..edc844fe7845 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -37,9 +37,11 @@ const ( type FlowControlSettings struct { // MaxOutstandingMessages is the maximum number of bufered messages to be published. + // If less than or equal to zero, this is disabled. MaxOutstandingMessages int // MaxOutstandingBytes is the maximum size of buffered messages to be published. + // If less than or equal to zero, this is disabled. MaxOutstandingBytes int // LimitExceededBehavior configures the behavior when trying to publish @@ -54,7 +56,7 @@ var ( ErrFlowControllerMaxOutstandingBytes = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded") ) -// flowController implements flow control for Subscription.Receive. +// flowController implements flow control for publishing and subscribing. type flowController struct { maxCount int maxSize int // max total size of messages diff --git a/pubsub/topic.go b/pubsub/topic.go index 7a7d6ebb31a1..ff3e396a81be 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -99,10 +99,10 @@ type PublishSettings struct { // The maximum number of bytes that the Bundler will keep in memory before // returning ErrOverflow. This is now superseded by FlowControlSettings.MaxOutstandingBytes. - // If both are set, this value will be used for both settings. + // If MaxOutstandingBytes is set, that value will override BufferedByteLimit. // // Defaults to DefaultPublishSettings.BufferedByteLimit. - // Deprecated. + // Deprecated: Set `topic.PublishSettings.FlowControlSettings.MaxOutstandingBytes` instead. BufferedByteLimit int // FlowControlSettings defines publisher flow control settings. @@ -540,7 +540,7 @@ func (t *Topic) initBundler() { fcs := DefaultPublishSettings.FlowControlSettings // FlowControlSettings.MaxOutstandingBytes and BufferedByteLimit have generally - // the same behavior, with the latter always returning and error. BufferedByteLimit + // the same behavior, with the latter always returning an error. BufferedByteLimit // is deprecated in favor of MaxOutstandingBytes. // // While we continue to support both, check if either has been set directly, and use From c3c0141b988ba05df3e16551b72771d13cd5edd6 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Mon, 21 Jun 2021 22:36:46 -0700 Subject: [PATCH 09/15] reduce diff for review switch flowController to struct not pointer fix more comments fix more comments --- pubsub/flow_controller.go | 4 +- pubsub/flow_controller_test.go | 14 +-- pubsub/integration_test.go | 168 ++++++++++++++++----------------- pubsub/topic.go | 11 +-- 4 files changed, 96 insertions(+), 101 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index edc844fe7845..c779d83141a1 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -75,8 +75,8 @@ type flowController struct { // maxCount messages or maxSize bytes are outstanding at once. If maxCount or // maxSize is < 1, then an unlimited number of messages or bytes is permitted, // respectively. -func newFlowController(fc FlowControlSettings) *flowController { - f := &flowController{ +func newFlowController(fc FlowControlSettings) flowController { + f := flowController{ maxCount: fc.MaxOutstandingMessages, maxSize: fc.MaxOutstandingBytes, semCount: nil, diff --git a/pubsub/flow_controller_test.go b/pubsub/flow_controller_test.go index 0cf9038e0633..320a96a62c21 100644 --- a/pubsub/flow_controller_test.go +++ b/pubsub/flow_controller_test.go @@ -187,19 +187,19 @@ func TestFlowControllerTryAcquire(t *testing.T) { fc := newFlowController(fcSettings(3, 10, FlowControlSignalError)) ctx := context.Background() - // Successfully newAcquire 4 bytes. + // Successfully acquired 4 bytes. if err := fc.acquire(ctx, 4); err != nil { - t.Errorf("fc.newAcquire got err: %v", err) + t.Errorf("fc.acquired got err: %v", err) } - // Fail to newAcquire 7 bytes. + // Fail to acquire 7 bytes. if err := fc.acquire(ctx, 7); err == nil { t.Errorf("got nil, wanted err: %v", ErrFlowControllerMaxOutstandingBytes) } - // Successfully newAcquire 6 byte. + // Successfully acquired 6 byte. if err := fc.acquire(ctx, 6); err != nil { - t.Errorf("fc.newAcquire got err: %v", err) + t.Errorf("fc.acquired got err: %v", err) } } @@ -252,12 +252,12 @@ func TestFlowControllerUnboundedBytes(t *testing.T) { t.Errorf("got %v, wanted no error", err) } - // Successfully newAcquire 4GB bytes. + // Successfully acquired 4GB bytes. if err := fc.acquire(ctx, 4e9); err != nil { t.Errorf("got %v, wanted no error", err) } - // Fail to newAcquire a third message. + // Fail to acquire a third message. if err := fc.acquire(ctx, 3); err == nil { t.Errorf("got nil, wanted %v", ErrFlowControllerMaxOutstandingMessages) } diff --git a/pubsub/integration_test.go b/pubsub/integration_test.go index dc2cf60b336c..659e9f050f6b 100644 --- a/pubsub/integration_test.go +++ b/pubsub/integration_test.go @@ -224,96 +224,92 @@ func withGoogleClientInfo(ctx context.Context) context.Context { } func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous bool, numMsgs, extraBytes int) { - t.Run(fmt.Sprintf("maxMsgs:%d,synchronous:%t,numMsgs:%d", maxMsgs, synchronous, numMsgs), - func(t *testing.T) { - t.Parallel() - ctx := context.Background() - topic, err := client.CreateTopic(ctx, topicIDs.New()) - if err != nil { - t.Errorf("CreateTopic error: %v", err) - } - defer topic.Stop() - exists, err := topic.Exists(ctx) - if err != nil { - t.Fatalf("TopicExists error: %v", err) - } - if !exists { - t.Errorf("topic %v should exist, but it doesn't", topic) - } + ctx := context.Background() + topic, err := client.CreateTopic(ctx, topicIDs.New()) + if err != nil { + t.Errorf("CreateTopic error: %v", err) + } + defer topic.Stop() + exists, err := topic.Exists(ctx) + if err != nil { + t.Fatalf("TopicExists error: %v", err) + } + if !exists { + t.Errorf("topic %v should exist, but it doesn't", topic) + } - var sub *Subscription - if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil { - t.Errorf("CreateSub error: %v", err) - } - exists, err = sub.Exists(ctx) - if err != nil { - t.Fatalf("SubExists error: %v", err) - } - if !exists { - t.Errorf("subscription %s should exist, but it doesn't", sub.ID()) - } - var msgs []*Message - for i := 0; i < numMsgs; i++ { - text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes)) - attrs := make(map[string]string) - attrs["foo"] = "bar" - msgs = append(msgs, &Message{ - Data: []byte(text), - Attributes: attrs, - }) - } + var sub *Subscription + if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil { + t.Errorf("CreateSub error: %v", err) + } + exists, err = sub.Exists(ctx) + if err != nil { + t.Fatalf("SubExists error: %v", err) + } + if !exists { + t.Errorf("subscription %s should exist, but it doesn't", sub.ID()) + } + var msgs []*Message + for i := 0; i < numMsgs; i++ { + text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes)) + attrs := make(map[string]string) + attrs["foo"] = "bar" + msgs = append(msgs, &Message{ + Data: []byte(text), + Attributes: attrs, + }) + } - // Publish some messages. - type pubResult struct { - m *Message - r *PublishResult - } - var rs []pubResult - for _, m := range msgs { - r := topic.Publish(ctx, m) - rs = append(rs, pubResult{m, r}) - } - want := make(map[string]messageData) - for _, res := range rs { - id, err := res.r.Get(ctx) - if err != nil { - t.Fatal(err) - } - md := extractMessageData(res.m) - md.ID = id - want[md.ID] = md - } + // Publish some messages. + type pubResult struct { + m *Message + r *PublishResult + } + var rs []pubResult + for _, m := range msgs { + r := topic.Publish(ctx, m) + rs = append(rs, pubResult{m, r}) + } + want := make(map[string]messageData) + for _, res := range rs { + id, err := res.r.Get(ctx) + if err != nil { + t.Fatal(err) + } + md := extractMessageData(res.m) + md.ID = id + want[md.ID] = md + } - sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs - sub.ReceiveSettings.Synchronous = synchronous - - // Use a timeout to ensure that Pull does not block indefinitely if there are - // unexpectedly few messages available. - now := time.Now() - timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute) - defer cancel() - gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) { - m.Ack() - }) - if err != nil { - if c := status.Convert(err); c.Code() == codes.Canceled { - if time.Since(now) >= time.Minute { - t.Fatal("pullN took too long") - } - } else { - t.Fatalf("Pull: %v", err) - } - } - got := make(map[string]messageData) - for _, m := range gotMsgs { - md := extractMessageData(m) - got[md.ID] = md - } - if !testutil.Equal(got, want) { - t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v", - maxMsgs, synchronous, got, want) + sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs + sub.ReceiveSettings.Synchronous = synchronous + + // Use a timeout to ensure that Pull does not block indefinitely if there are + // unexpectedly few messages available. + now := time.Now() + timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) { + m.Ack() + }) + if err != nil { + if c := status.Convert(err); c.Code() == codes.Canceled { + if time.Since(now) >= time.Minute { + t.Fatal("pullN took too long") } - }) + } else { + t.Fatalf("Pull: %v", err) + } + } + got := make(map[string]messageData) + for _, m := range gotMsgs { + md := extractMessageData(m) + got[md.ID] = md + } + if !testutil.Equal(got, want) { + t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v", + maxMsgs, synchronous, got, want) + } } // IAM tests. diff --git a/pubsub/topic.go b/pubsub/topic.go index ff3e396a81be..1d1979af1619 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -68,7 +68,7 @@ type Topic struct { stopped bool scheduler *scheduler.PublishScheduler - fc *flowController + flowController // EnableMessageOrdering enables delivery of ordered keys. EnableMessageOrdering bool @@ -456,7 +456,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { return r } - if err := t.fc.acquire(ctx, msgSize); err != nil { + if err := t.flowController.acquire(ctx, msgSize); err != nil { t.scheduler.Pause(msg.OrderingKey) ipubsub.SetPublishResult(r, "", err) return r @@ -564,7 +564,7 @@ func (t *Topic) initBundler() { fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages } - t.fc = newFlowController(fcs) + t.flowController = newFlowController(fcs) bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit if t.PublishSettings.BufferedByteLimit > 0 { @@ -586,12 +586,11 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) var orderingKey string for i, bm := range bms { orderingKey = bm.msg.OrderingKey - msg := &pb.PubsubMessage{ + pbMsgs[i] = &pb.PubsubMessage{ Data: bm.msg.Data, Attributes: bm.msg.Attributes, OrderingKey: bm.msg.OrderingKey, } - pbMsgs[i] = msg bm.msg = nil // release bm.msg for GC } var res *pb.PublishResponse @@ -616,7 +615,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) PublishLatency.M(float64(end.Sub(start)/time.Millisecond)), PublishedMessages.M(int64(len(bms)))) for i, bm := range bms { - t.fc.release(ctx, bm.size) + t.flowController.release(ctx, bm.size) if err != nil { ipubsub.SetPublishResult(bm.res, "", err) } else { From 11d68fa75ccc330269168b22927ab5fe0a4f84b8 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Thu, 24 Jun 2021 15:03:39 -0700 Subject: [PATCH 10/15] fix double counting issue with messages --- pubsub/flow_controller.go | 3 --- pubsub/flow_controller_test.go | 27 +++------------------------ 2 files changed, 3 insertions(+), 27 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index c779d83141a1..8f4aa7ee264c 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -115,7 +115,6 @@ func (f *flowController) acquire(ctx context.Context, size int) error { return err } } - atomic.AddInt64(&f.countRemaining, 1) case FlowControlSignalError: if f.semCount != nil { if !f.semCount.TryAcquire(1) { @@ -131,7 +130,6 @@ func (f *flowController) acquire(ctx context.Context, size int) error { return ErrFlowControllerMaxOutstandingBytes } } - atomic.AddInt64(&f.countRemaining, 1) } outstandingMessages := atomic.AddInt64(&f.countRemaining, 1) recordStat(ctx, OutstandingMessages, outstandingMessages) @@ -145,7 +143,6 @@ func (f *flowController) release(ctx context.Context, size int) { if f.limitBehavior == FlowControlIgnore { return } - atomic.AddInt64(&f.countRemaining, -1) outstandingMessages := atomic.AddInt64(&f.countRemaining, -1) recordStat(ctx, OutstandingMessages, outstandingMessages) outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size)) diff --git a/pubsub/flow_controller_test.go b/pubsub/flow_controller_test.go index 320a96a62c21..d5aae4c5d444 100644 --- a/pubsub/flow_controller_test.go +++ b/pubsub/flow_controller_test.go @@ -182,27 +182,6 @@ func TestFlowControllerSaturation(t *testing.T) { } } -func TestFlowControllerTryAcquire(t *testing.T) { - t.Parallel() - fc := newFlowController(fcSettings(3, 10, FlowControlSignalError)) - ctx := context.Background() - - // Successfully acquired 4 bytes. - if err := fc.acquire(ctx, 4); err != nil { - t.Errorf("fc.acquired got err: %v", err) - } - - // Fail to acquire 7 bytes. - if err := fc.acquire(ctx, 7); err == nil { - t.Errorf("got nil, wanted err: %v", ErrFlowControllerMaxOutstandingBytes) - } - - // Successfully acquired 6 byte. - if err := fc.acquire(ctx, 6); err != nil { - t.Errorf("fc.acquired got err: %v", err) - } -} - func TestFlowControllerUnboundedCount(t *testing.T) { t.Parallel() ctx := context.Background() @@ -213,12 +192,12 @@ func TestFlowControllerUnboundedCount(t *testing.T) { t.Errorf("got %v, wanted no error", err) } - // Successfully tryAcquire 4 bytes. + // Successfully acquire 4 bytes. if err := fc.acquire(ctx, 4); err != nil { t.Errorf("got %v, wanted no error", err) } - // Fail to tryAcquire 3 bytes. + // Fail to acquire 3 bytes. if err := fc.acquire(ctx, 3); err == nil { t.Errorf("got nil, wanted %v", ErrFlowControllerMaxOutstandingBytes) } @@ -245,7 +224,7 @@ func TestFlowControllerUnboundedCount2(t *testing.T) { func TestFlowControllerUnboundedBytes(t *testing.T) { t.Parallel() ctx := context.Background() - fc := newFlowController(fcSettings(2, 0, FlowControlBlock)) + fc := newFlowController(fcSettings(2, 0, FlowControlSignalError)) // Successfully acquire 4GB. if err := fc.acquire(ctx, 4e9); err != nil { From e1ba281d6e58e0fdb933cd2c32249c5180ccde44 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Thu, 1 Jul 2021 10:59:20 -0700 Subject: [PATCH 11/15] fix exported variable comments --- pubsub/flow_controller.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index 8f4aa7ee264c..585673a2626a 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -35,6 +35,7 @@ const ( FlowControlSignalError ) +// FlowControlSettings controls flow control for messages while publishing or subscribing. type FlowControlSettings struct { // MaxOutstandingMessages is the maximum number of bufered messages to be published. // If less than or equal to zero, this is disabled. @@ -52,8 +53,11 @@ type FlowControlSettings struct { } var ( + // ErrFlowControllerMaxOutstandingMessages indicates that outstanding messages exceeds MaxOutstandingMessages. ErrFlowControllerMaxOutstandingMessages = errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded") - ErrFlowControllerMaxOutstandingBytes = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded") + + // ErrFlowControllerMaxOutstandingBytes indicates that outstanding bytes of messages exceeds MaxOutstandingBytes. + ErrFlowControllerMaxOutstandingBytes = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded") ) // flowController implements flow control for publishing and subscribing. From f935174c60506e55bc89f83bd3386c8d547f0e11 Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Fri, 9 Jul 2021 13:35:49 -0700 Subject: [PATCH 12/15] do not report oc metrics if flow controller is disabled --- pubsub/flow_controller.go | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index 585673a2626a..82699f911022 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -135,10 +135,14 @@ func (f *flowController) acquire(ctx context.Context, size int) error { } } } - outstandingMessages := atomic.AddInt64(&f.countRemaining, 1) - recordStat(ctx, OutstandingMessages, outstandingMessages) - outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size)) - recordStat(ctx, OutstandingBytes, outstandingBytes) + if f.semCount != nil { + outstandingMessages := atomic.AddInt64(&f.countRemaining, 1) + recordStat(ctx, OutstandingMessages, outstandingMessages) + } + if f.semSize != nil { + outstandingBytes := atomic.AddInt64(&f.bytesRemaining, f.bound(size)) + recordStat(ctx, OutstandingBytes, outstandingBytes) + } return nil } @@ -147,15 +151,15 @@ func (f *flowController) release(ctx context.Context, size int) { if f.limitBehavior == FlowControlIgnore { return } - outstandingMessages := atomic.AddInt64(&f.countRemaining, -1) - recordStat(ctx, OutstandingMessages, outstandingMessages) - outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size)) - recordStat(ctx, OutstandingBytes, outstandingBytes) if f.semCount != nil { + outstandingMessages := atomic.AddInt64(&f.countRemaining, -1) + recordStat(ctx, OutstandingMessages, outstandingMessages) f.semCount.Release(1) } if f.semSize != nil { + outstandingBytes := atomic.AddInt64(&f.bytesRemaining, -1*f.bound(size)) + recordStat(ctx, OutstandingBytes, outstandingBytes) f.semSize.Release(f.bound(size)) } } From 99e3d59b1394a1d0b72a487aed150eaf78f7667a Mon Sep 17 00:00:00 2001 From: Alex Hong Date: Tue, 13 Jul 2021 11:55:53 -0700 Subject: [PATCH 13/15] disable MaxOutstandingMessages by default --- pubsub/flow_controller.go | 2 ++ pubsub/flow_controller_test.go | 2 +- pubsub/topic.go | 21 +++------------------ 3 files changed, 6 insertions(+), 19 deletions(-) diff --git a/pubsub/flow_controller.go b/pubsub/flow_controller.go index 82699f911022..4fa86b671467 100644 --- a/pubsub/flow_controller.go +++ b/pubsub/flow_controller.go @@ -171,6 +171,8 @@ func (f *flowController) bound(size int) int64 { return int64(size) } +// count returns the number of outstanding messages. +// if maxCount is 0, this will always return 0. func (f *flowController) count() int { return int(atomic.LoadInt64(&f.countRemaining)) } diff --git a/pubsub/flow_controller_test.go b/pubsub/flow_controller_test.go index d5aae4c5d444..459663adb62a 100644 --- a/pubsub/flow_controller_test.go +++ b/pubsub/flow_controller_test.go @@ -214,7 +214,7 @@ func TestFlowControllerUnboundedCount2(t *testing.T) { fc.release(ctx, 1) fc.release(ctx, 1) fc.release(ctx, 1) - wantCount := int64(-2) + wantCount := int64(0) c := int64(fc.count()) if c != wantCount { t.Fatalf("got count %d, want %d", c, wantCount) diff --git a/pubsub/topic.go b/pubsub/topic.go index 1d1979af1619..acd8732d0558 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -121,7 +121,7 @@ var DefaultPublishSettings = PublishSettings{ BufferedByteLimit: 10 * MaxPublishRequestBytes, FlowControlSettings: FlowControlSettings{ MaxOutstandingMessages: 1000, - MaxOutstandingBytes: 10 * MaxPublishRequestBytes, + MaxOutstandingBytes: -1, LimitExceededBehavior: FlowControlBlock, }, } @@ -539,27 +539,12 @@ func (t *Topic) initBundler() { t.scheduler.BundleByteThreshold = t.PublishSettings.ByteThreshold fcs := DefaultPublishSettings.FlowControlSettings - // FlowControlSettings.MaxOutstandingBytes and BufferedByteLimit have generally - // the same behavior, with the latter always returning an error. BufferedByteLimit - // is deprecated in favor of MaxOutstandingBytes. - // - // While we continue to support both, check if either has been set directly, and use - // that value. If both have been set, we will respect MaxOutstandingBytes first. - var mo, bb bool - if t.PublishSettings.BufferedByteLimit > 0 { - bb = true + if t.PublishSettings.FlowControlSettings.LimitExceededBehavior != FlowControlBlock { + fcs.LimitExceededBehavior = t.PublishSettings.FlowControlSettings.LimitExceededBehavior } if t.PublishSettings.FlowControlSettings.MaxOutstandingBytes > 0 { - mo = true - } - if bb && !mo { - fcs.MaxOutstandingBytes = t.PublishSettings.BufferedByteLimit - } else if mo { fcs.MaxOutstandingBytes = t.PublishSettings.FlowControlSettings.MaxOutstandingBytes } - if t.PublishSettings.FlowControlSettings.LimitExceededBehavior != FlowControlBlock { - fcs.LimitExceededBehavior = t.PublishSettings.FlowControlSettings.LimitExceededBehavior - } if t.PublishSettings.FlowControlSettings.MaxOutstandingMessages > 0 { fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages } From 324062b4478a1a98fdd5b8d388d324e9aa3e7b7b Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Wed, 1 Sep 2021 13:21:35 -0700 Subject: [PATCH 14/15] override bufferedbytelimit --- pubsub/topic.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pubsub/topic.go b/pubsub/topic.go index 27e37082eaf8..c9c8a04cafb8 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -592,7 +592,10 @@ func (t *Topic) initBundler() { fcs.LimitExceededBehavior = t.PublishSettings.FlowControlSettings.LimitExceededBehavior } if t.PublishSettings.FlowControlSettings.MaxOutstandingBytes > 0 { - fcs.MaxOutstandingBytes = t.PublishSettings.FlowControlSettings.MaxOutstandingBytes + b := t.PublishSettings.FlowControlSettings.MaxOutstandingBytes + fcs.MaxOutstandingBytes = b + // If MaxOutstandingBytes is set, override BufferedByteLimit. + t.PublishSettings.BufferedByteLimit = b } if t.PublishSettings.FlowControlSettings.MaxOutstandingMessages > 0 { fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages From 0d5c9033467ae63772c49bdb88a608397d107005 Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Fri, 3 Sep 2021 12:22:14 -0700 Subject: [PATCH 15/15] make default fc behavior ignore --- pubsub/topic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pubsub/topic.go b/pubsub/topic.go index c9c8a04cafb8..47deb490080b 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -124,7 +124,7 @@ var DefaultPublishSettings = PublishSettings{ FlowControlSettings: FlowControlSettings{ MaxOutstandingMessages: 1000, MaxOutstandingBytes: -1, - LimitExceededBehavior: FlowControlBlock, + LimitExceededBehavior: FlowControlIgnore, }, }