Skip to content

Commit

Permalink
reduce diff for review
Browse files Browse the repository at this point in the history
switch flowController to struct not pointer

fix more comments

fix more comments
  • Loading branch information
hongalex committed Jun 22, 2021
1 parent 3b5fd1f commit c3c0141
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 101 deletions.
4 changes: 2 additions & 2 deletions pubsub/flow_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ type flowController struct {
// maxCount messages or maxSize bytes are outstanding at once. If maxCount or
// maxSize is < 1, then an unlimited number of messages or bytes is permitted,
// respectively.
func newFlowController(fc FlowControlSettings) *flowController {
f := &flowController{
func newFlowController(fc FlowControlSettings) flowController {
f := flowController{
maxCount: fc.MaxOutstandingMessages,
maxSize: fc.MaxOutstandingBytes,
semCount: nil,
Expand Down
14 changes: 7 additions & 7 deletions pubsub/flow_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,19 +187,19 @@ func TestFlowControllerTryAcquire(t *testing.T) {
fc := newFlowController(fcSettings(3, 10, FlowControlSignalError))
ctx := context.Background()

// Successfully newAcquire 4 bytes.
// Successfully acquired 4 bytes.
if err := fc.acquire(ctx, 4); err != nil {
t.Errorf("fc.newAcquire got err: %v", err)
t.Errorf("fc.acquired got err: %v", err)
}

// Fail to newAcquire 7 bytes.
// Fail to acquire 7 bytes.
if err := fc.acquire(ctx, 7); err == nil {
t.Errorf("got nil, wanted err: %v", ErrFlowControllerMaxOutstandingBytes)
}

// Successfully newAcquire 6 byte.
// Successfully acquired 6 byte.
if err := fc.acquire(ctx, 6); err != nil {
t.Errorf("fc.newAcquire got err: %v", err)
t.Errorf("fc.acquired got err: %v", err)
}
}

Expand Down Expand Up @@ -252,12 +252,12 @@ func TestFlowControllerUnboundedBytes(t *testing.T) {
t.Errorf("got %v, wanted no error", err)
}

// Successfully newAcquire 4GB bytes.
// Successfully acquired 4GB bytes.
if err := fc.acquire(ctx, 4e9); err != nil {
t.Errorf("got %v, wanted no error", err)
}

// Fail to newAcquire a third message.
// Fail to acquire a third message.
if err := fc.acquire(ctx, 3); err == nil {
t.Errorf("got nil, wanted %v", ErrFlowControllerMaxOutstandingMessages)
}
Expand Down
168 changes: 82 additions & 86 deletions pubsub/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,96 +224,92 @@ func withGoogleClientInfo(ctx context.Context) context.Context {
}

func testPublishAndReceive(t *testing.T, client *Client, maxMsgs int, synchronous bool, numMsgs, extraBytes int) {
t.Run(fmt.Sprintf("maxMsgs:%d,synchronous:%t,numMsgs:%d", maxMsgs, synchronous, numMsgs),
func(t *testing.T) {
t.Parallel()
ctx := context.Background()
topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Errorf("CreateTopic error: %v", err)
}
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
t.Fatalf("TopicExists error: %v", err)
}
if !exists {
t.Errorf("topic %v should exist, but it doesn't", topic)
}
ctx := context.Background()
topic, err := client.CreateTopic(ctx, topicIDs.New())
if err != nil {
t.Errorf("CreateTopic error: %v", err)
}
defer topic.Stop()
exists, err := topic.Exists(ctx)
if err != nil {
t.Fatalf("TopicExists error: %v", err)
}
if !exists {
t.Errorf("topic %v should exist, but it doesn't", topic)
}

var sub *Subscription
if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil {
t.Errorf("CreateSub error: %v", err)
}
exists, err = sub.Exists(ctx)
if err != nil {
t.Fatalf("SubExists error: %v", err)
}
if !exists {
t.Errorf("subscription %s should exist, but it doesn't", sub.ID())
}
var msgs []*Message
for i := 0; i < numMsgs; i++ {
text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes))
attrs := make(map[string]string)
attrs["foo"] = "bar"
msgs = append(msgs, &Message{
Data: []byte(text),
Attributes: attrs,
})
}
var sub *Subscription
if sub, err = client.CreateSubscription(ctx, subIDs.New(), SubscriptionConfig{Topic: topic}); err != nil {
t.Errorf("CreateSub error: %v", err)
}
exists, err = sub.Exists(ctx)
if err != nil {
t.Fatalf("SubExists error: %v", err)
}
if !exists {
t.Errorf("subscription %s should exist, but it doesn't", sub.ID())
}
var msgs []*Message
for i := 0; i < numMsgs; i++ {
text := fmt.Sprintf("a message with an index %d - %s", i, strings.Repeat(".", extraBytes))
attrs := make(map[string]string)
attrs["foo"] = "bar"
msgs = append(msgs, &Message{
Data: []byte(text),
Attributes: attrs,
})
}

// Publish some messages.
type pubResult struct {
m *Message
r *PublishResult
}
var rs []pubResult
for _, m := range msgs {
r := topic.Publish(ctx, m)
rs = append(rs, pubResult{m, r})
}
want := make(map[string]messageData)
for _, res := range rs {
id, err := res.r.Get(ctx)
if err != nil {
t.Fatal(err)
}
md := extractMessageData(res.m)
md.ID = id
want[md.ID] = md
}
// Publish some messages.
type pubResult struct {
m *Message
r *PublishResult
}
var rs []pubResult
for _, m := range msgs {
r := topic.Publish(ctx, m)
rs = append(rs, pubResult{m, r})
}
want := make(map[string]messageData)
for _, res := range rs {
id, err := res.r.Get(ctx)
if err != nil {
t.Fatal(err)
}
md := extractMessageData(res.m)
md.ID = id
want[md.ID] = md
}

sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs
sub.ReceiveSettings.Synchronous = synchronous

// Use a timeout to ensure that Pull does not block indefinitely if there are
// unexpectedly few messages available.
now := time.Now()
timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) {
m.Ack()
})
if err != nil {
if c := status.Convert(err); c.Code() == codes.Canceled {
if time.Since(now) >= time.Minute {
t.Fatal("pullN took too long")
}
} else {
t.Fatalf("Pull: %v", err)
}
}
got := make(map[string]messageData)
for _, m := range gotMsgs {
md := extractMessageData(m)
got[md.ID] = md
}
if !testutil.Equal(got, want) {
t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v",
maxMsgs, synchronous, got, want)
sub.ReceiveSettings.MaxOutstandingMessages = maxMsgs
sub.ReceiveSettings.Synchronous = synchronous

// Use a timeout to ensure that Pull does not block indefinitely if there are
// unexpectedly few messages available.
now := time.Now()
timeoutCtx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
gotMsgs, err := pullN(timeoutCtx, sub, len(want), func(ctx context.Context, m *Message) {
m.Ack()
})
if err != nil {
if c := status.Convert(err); c.Code() == codes.Canceled {
if time.Since(now) >= time.Minute {
t.Fatal("pullN took too long")
}
})
} else {
t.Fatalf("Pull: %v", err)
}
}
got := make(map[string]messageData)
for _, m := range gotMsgs {
md := extractMessageData(m)
got[md.ID] = md
}
if !testutil.Equal(got, want) {
t.Fatalf("MaxOutstandingMessages=%d, Synchronous=%t, messages got: %+v, messages want: %+v",
maxMsgs, synchronous, got, want)
}
}

// IAM tests.
Expand Down
11 changes: 5 additions & 6 deletions pubsub/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ type Topic struct {
stopped bool
scheduler *scheduler.PublishScheduler

fc *flowController
flowController

// EnableMessageOrdering enables delivery of ordered keys.
EnableMessageOrdering bool
Expand Down Expand Up @@ -456,7 +456,7 @@ func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
return r
}

if err := t.fc.acquire(ctx, msgSize); err != nil {
if err := t.flowController.acquire(ctx, msgSize); err != nil {
t.scheduler.Pause(msg.OrderingKey)
ipubsub.SetPublishResult(r, "", err)
return r
Expand Down Expand Up @@ -564,7 +564,7 @@ func (t *Topic) initBundler() {
fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages
}

t.fc = newFlowController(fcs)
t.flowController = newFlowController(fcs)

bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit
if t.PublishSettings.BufferedByteLimit > 0 {
Expand All @@ -586,12 +586,11 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage)
var orderingKey string
for i, bm := range bms {
orderingKey = bm.msg.OrderingKey
msg := &pb.PubsubMessage{
pbMsgs[i] = &pb.PubsubMessage{
Data: bm.msg.Data,
Attributes: bm.msg.Attributes,
OrderingKey: bm.msg.OrderingKey,
}
pbMsgs[i] = msg
bm.msg = nil // release bm.msg for GC
}
var res *pb.PublishResponse
Expand All @@ -616,7 +615,7 @@ func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage)
PublishLatency.M(float64(end.Sub(start)/time.Millisecond)),
PublishedMessages.M(int64(len(bms))))
for i, bm := range bms {
t.fc.release(ctx, bm.size)
t.flowController.release(ctx, bm.size)
if err != nil {
ipubsub.SetPublishResult(bm.res, "", err)
} else {
Expand Down

0 comments on commit c3c0141

Please sign in to comment.