Skip to content

Commit

Permalink
add integration tests for publish flow control
Browse files Browse the repository at this point in the history
  • Loading branch information
hongalex committed Jun 22, 2021
1 parent 119e3fa commit df39d40
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 55 deletions.
1 change: 1 addition & 0 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ func withGoogleClientInfo(ctx context.Context) context.Context {
func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous bool, numMsgs, extraBytes int) {
t.Run(fmt.Sprintf("maxMsgs:%d,synchronous:%t,numMsgs:%d", maxMsgs, synchronous, numMsgs),
func(t *testing.T) {
t.Parallel()
ctx := context.Background()
topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,9 +572,9 @@ func (t *Topic) initBundler() {
}
t.scheduler.BufferedByteLimit = bufferedByteLimit

// Calculate the max limit of a single bundle. 5 comes from the number of bytes
// needed to be reserved for encoding the PubsubMessage repeated field.
t.scheduler.BundleByteLimit = MaxPublishRequestBytes - calcFieldSizeString(t.name) - 5
fmt.Printf("calcFieldSizeString is: %d\n", calcFieldSizeString(t.name))
fmt.Printf("limit is: %d\n", t.scheduler.BundleByteLimit)
}

func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) {
Expand Down
198 changes: 145 additions & 53 deletions pubsub/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ import (
"bytes"
"context"
"fmt"
"sync"
"testing"
"time"

"cloud.google.com/go/internal/testutil"
"cloud.google.com/go/pubsub/pstest"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
"google.golang.org/api/support/bundler"
pb "google.golang.org/genproto/googleapis/pubsub/v1"
pubsubpb "google.golang.org/genproto/googleapis/pubsub/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -399,80 +402,169 @@ func TestPublishFlowControl_SignalError(t *testing.T) {
}
topic.PublishSettings.FlowControlSettings = fc

srv.SetAutoPublishResponse(false)

// Sending a message that is too large results in an error in SignalError mode.
r := publishSingleMessage(ctx, topic, "AAAAAAAAAAA")
if _, err := r.Get(ctx); err != ErrFlowControllerMaxOutstandingBytes {
r1 := publishSingleMessage(ctx, topic, "AAAAAAAAAAA")
if _, err := r1.Get(ctx); err != ErrFlowControllerMaxOutstandingBytes {
t.Fatalf("publishResult.Get(): got %v, want %v", err, ErrFlowControllerMaxOutstandingBytes)
}

// Sending a second message succeeds.
r = publishSingleMessage(ctx, topic, "AAAA")
if _, err := r.Get(ctx); err != nil {
t.Fatalf("publishResult.Get(): got %v, want nil", err)
}
r2 := publishSingleMessage(ctx, topic, "AAAA")

// Sending a third message fails because of the outstanding message.
r = publishSingleMessage(ctx, topic, "AA")
if _, err := r.Get(ctx); err != ErrFlowControllerMaxOutstandingMessages {
r3 := publishSingleMessage(ctx, topic, "AA")
if _, err := r3.Get(ctx); err != ErrFlowControllerMaxOutstandingMessages {
t.Fatalf("publishResult.Get(): got %v, want %v", err, ErrFlowControllerMaxOutstandingMessages)
}

// Publish 11 messages, nothing should fail.
var res []*PublishResult
for i := 0; i < 11; i++ {
r := topic.Publish(ctx, &Message{
Data: []byte("test"),
})
res = append(res, r)
srv.AddPublishResponse(&pb.PublishResponse{
MessageIds: []string{"1"},
}, nil)
got, err := r2.Get(ctx)
if err != nil {
t.Fatalf("publishResult.Get(): got %v", err)
}
for _, r := range res {
if _, err := r.Get(ctx); err != nil {
t.Fatalf("got err: %v", err)
}
if want := "1"; got != want {
t.Fatalf("publishResult.Get() got: %s, want %s", got, want)
}

topic.scheduler = nil
topic.PublishSettings.FlowControlSettings.LimitExceededBehavior = FlowControlBlock
// Publish 11 messages, nothing should fail.
res = []*PublishResult{}
for i := 0; i < 11; i++ {
r := topic.Publish(ctx, &Message{
Data: []byte("test"),
})
res = append(res, r)
// Sending another messages succeeds.
r4 := publishSingleMessage(ctx, topic, "AAAA")
srv.AddPublishResponse(&pb.PublishResponse{
MessageIds: []string{"2"},
}, nil)
got, err = r4.Get(ctx)
if err != nil {
t.Fatalf("publishResult.Get(): got %v", err)
}
for _, r := range res {
if _, err := r.Get(ctx); err != nil {
t.Fatalf("got err: %v", err)
}
if want := "2"; got != want {
t.Fatalf("publishResult.Get() got: %s, want %s", got, want)
}

topic.scheduler = nil
topic.PublishSettings.FlowControlSettings.LimitExceededBehavior = FlowControlSignalError
// Publish 11 messages, should signal error on the 11th message.
res = []*PublishResult{}
for i := 0; i < 11; i++ {
r := topic.Publish(ctx, &Message{
Data: []byte("test"),
})
res = append(res, r)
}
for i, r := range res {
if i == 10 {
if _, err := r.Get(ctx); err != ErrFlowControllerMaxOutstandingMessages {
t.Fatalf("publishResult.Get(): got %v, want %v", err, ErrFlowControllerMaxOutstandingMessages)
}
} else {
if _, err := r.Get(ctx); err != nil {
t.Fatalf("got err: %v", err)
}
}
}

func TestPublishFlowControl_SignalErrorOrderingKey(t *testing.T) {
ctx := context.Background()
c, srv := newFake(t)
defer c.Close()
defer srv.Close()

topic, err := c.CreateTopic(ctx, "some-topic")
if err != nil {
t.Fatal(err)
}
fc := FlowControlSettings{
MaxOutstandingMessages: 1,
MaxOutstandingBytes: 10,
LimitExceededBehavior: FlowControlSignalError,
}
topic.PublishSettings.FlowControlSettings = fc
topic.PublishSettings.DelayThreshold = 5 * time.Second
topic.PublishSettings.CountThreshold = 1
topic.EnableMessageOrdering = true

// Sending a message that is too large reuslts in an error.
r1 := publishSingleMessageWithKey(ctx, topic, "AAAAAAAAAAA", "a")
if _, err := r1.Get(ctx); err != ErrFlowControllerMaxOutstandingBytes {
t.Fatalf("r1.Get() got: %v, want %v", err, ErrFlowControllerMaxOutstandingBytes)
}

// Sending a second message for the same ordering key fails because the first one failed.
r2 := publishSingleMessageWithKey(ctx, topic, "AAAA", "a")
if _, err := r2.Get(ctx); err == nil {
t.Fatal("r2.Get() got nil instead of error before calling topic.ResumePublish(key)")
}
}

// publishSingleMessage published a single message to a topic.
func TestPublishFlowControl_Block(t *testing.T) {
ctx := context.Background()
c, srv := newFake(t)
defer c.Close()
defer srv.Close()

topic, err := c.CreateTopic(ctx, "some-topic")
if err != nil {
t.Fatal(err)
}
fc := FlowControlSettings{
MaxOutstandingMessages: 2,
MaxOutstandingBytes: 10,
LimitExceededBehavior: FlowControlBlock,
}
topic.PublishSettings.FlowControlSettings = fc
topic.PublishSettings.DelayThreshold = 5 * time.Second
topic.PublishSettings.CountThreshold = 1

srv.SetAutoPublishResponse(false)

var sendResponse1, response1Sent, sendResponse2 sync.WaitGroup
sendResponse1.Add(1)
response1Sent.Add(1)
sendResponse2.Add(1)

go func() {
sendResponse1.Wait()
addSingleResponse(srv, "1")
response1Sent.Done()
sendResponse2.Wait()
addSingleResponse(srv, "2")
}()

// Sending two messages succeeds.
publishSingleMessage(ctx, topic, "AA")
publishSingleMessage(ctx, topic, "AA")

// Sendinga third message blocks because the messages are outstanding
var publish3Completed, response3Sent sync.WaitGroup
publish3Completed.Add(1)
response3Sent.Add(1)
go func() {
publishSingleMessage(ctx, topic, "AAAAAA")
publish3Completed.Done()
}()

go func() {
sendResponse1.Done()
response1Sent.Wait()
sendResponse2.Done()
}()

var publish4Completed sync.WaitGroup
publish4Completed.Add(1)

go func() {
publish3Completed.Wait()
publishSingleMessage(ctx, topic, "A")
publish4Completed.Done()
}()

publish3Completed.Wait()
addSingleResponse(srv, "3")
response3Sent.Done()

publish4Completed.Wait()
}

// publishSingleMessage publishes a single message to a topic.
func publishSingleMessage(ctx context.Context, t *Topic, data string) *PublishResult {
return t.Publish(ctx, &Message{
Data: []byte(data),
})
}

// publishSingleMessageWithKey publishes a single message to a topic with an ordering key.
func publishSingleMessageWithKey(ctx context.Context, t *Topic, data, key string) *PublishResult {
return t.Publish(ctx, &Message{
Data: []byte(data),
OrderingKey: key,
})
}

// addSingleResponse adds a publish response to the provided fake.
func addSingleResponse(srv *pstest.Server, id string) {
srv.AddPublishResponse(&pb.PublishResponse{
MessageIds: []string{id},
}, nil)
}

0 comments on commit df39d40

Please sign in to comment.