From 54009eab1c3b11a28531ad9e621917d01c9e5339 Mon Sep 17 00:00:00 2001 From: rahul2393 Date: Wed, 7 Aug 2024 15:58:05 +0530 Subject: [PATCH] feat(spanner): add support of multiplexed session support in writeAtleastOnce mutations (#10646) * feat(spanner): add support of multiplexed session support in mutations * fix tests * remove BatchWrite support * revert unrelated changes --- spanner/client_test.go | 48 +++++++++++++++++++++++++++---------- spanner/transaction.go | 3 ++- spanner/transaction_test.go | 23 +++++++++++++++--- 3 files changed, 58 insertions(+), 16 deletions(-) diff --git a/spanner/client_test.go b/spanner/client_test.go index 818fcef89456..6d2870cdfcfc 100644 --- a/spanner/client_test.go +++ b/spanner/client_test.go @@ -3555,15 +3555,21 @@ func TestClient_ApplyAtLeastOnceReuseSession(t *testing.T) { if err != nil { t.Fatal(err) } + expectedIdleSesions := sp.incStep + if isMultiplexEnabled { + expectedIdleSesions = 0 + } sp.mu.Lock() - if g, w := uint64(sp.idleList.Len())+sp.createReqs, sp.incStep; g != w { + if g, w := uint64(sp.idleList.Len())+sp.createReqs, expectedIdleSesions; g != w { + sp.mu.Unlock() t.Fatalf("idle session count mismatch:\nGot: %v\nWant: %v", g, w) } - expectedSessions := sp.incStep + expectedSessions := expectedIdleSesions if isMultiplexEnabled { expectedSessions++ } if g, w := uint64(len(server.TestSpanner.DumpSessions())), expectedSessions; g != w { + sp.mu.Unlock() t.Fatalf("server session count mismatch:\nGot: %v\nWant: %v", g, w) } sp.mu.Unlock() @@ -3602,14 +3608,20 @@ func TestClient_ApplyAtLeastOnceInvalidArgument(t *testing.T) { t.Fatal(err) } sp.mu.Lock() - if g, w := uint64(sp.idleList.Len())+sp.createReqs, sp.incStep; g != w { + expectedIdleSesions := sp.incStep + if isMultiplexEnabled { + expectedIdleSesions = 0 + } + if g, w := uint64(sp.idleList.Len())+sp.createReqs, expectedIdleSesions; g != w { + sp.mu.Unlock() t.Fatalf("idle session count mismatch:\nGot: %v\nWant: %v", g, w) } var countMuxSess uint64 if isMultiplexEnabled { countMuxSess = 1 } - if g, w := uint64(len(server.TestSpanner.DumpSessions())), sp.incStep+countMuxSess; g != w { + if g, w := uint64(len(server.TestSpanner.DumpSessions())), expectedIdleSesions+countMuxSess; g != w { + sp.mu.Unlock() t.Fatalf("server session count mismatch:\nGot: %v\nWant: %v", g, w) } sp.mu.Unlock() @@ -6277,18 +6289,30 @@ func TestClient_ApplyAtLeastOnceExcludeTxnFromChangeStreams(t *testing.T) { if err != nil { t.Fatal(err) } - requests := drainRequestsFromServer(server.TestSpanner) - if err := compareRequests([]interface{}{ + + expectedReqs := []interface{}{ &sppb.BatchCreateSessionsRequest{}, - &sppb.CommitRequest{}}, requests); err != nil { - t.Fatal(err) + &sppb.CommitRequest{}, } - muxCreateBuffer := 0 if isMultiplexEnabled { - muxCreateBuffer = 1 + expectedReqs = []interface{}{ + &sppb.CreateSessionRequest{}, + &sppb.CommitRequest{}, + } } - if !requests[1+muxCreateBuffer].(*sppb.CommitRequest).Transaction.(*sppb.CommitRequest_SingleUseTransaction).SingleUseTransaction.ExcludeTxnFromChangeStreams { - t.Fatal("Transaction is not set to be excluded from change streams") + requests := drainRequestsFromServer(server.TestSpanner) + if err := compareRequests(expectedReqs, requests); err != nil { + t.Fatal(err) + } + for _, req := range requests { + if request, ok := req.(*sppb.CommitRequest); ok { + if !request.Transaction.(*sppb.CommitRequest_SingleUseTransaction).SingleUseTransaction.ExcludeTxnFromChangeStreams { + t.Fatal("Transaction is not set to be excluded from change streams") + } + if !testEqual(isMultiplexEnabled, strings.Contains(request.GetSession(), "multiplexed")) { + t.Errorf("TestClient_ApplyAtLeastOnceExcludeTxnFromChangeStreams expected multiplexed session to be used, got: %v", request.GetSession()) + } + } } } diff --git a/spanner/transaction.go b/spanner/transaction.go index ab10ba433177..20a3e4037bd9 100644 --- a/spanner/transaction.go +++ b/spanner/transaction.go @@ -1890,7 +1890,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta for { if sh == nil || sh.getID() == "" || sh.getClient() == nil { // No usable session for doing the commit, take one from pool. - sh, err = t.sp.take(ctx) + sh, err = t.sp.takeMultiplexed(ctx) if err != nil { // sessionPool.Take already retries for session // creations/retrivals. @@ -1912,6 +1912,7 @@ func (t *writeOnlyTransaction) applyAtLeastOnce(ctx context.Context, ms ...*Muta RequestOptions: createRequestOptions(t.commitPriority, "", t.transactionTag), }) if err != nil && !isAbortedErr(err) { + // should not be the case with multiplexed sessions if isSessionNotFoundError(err) { // Discard the bad session. sh.destroy() diff --git a/spanner/transaction_test.go b/spanner/transaction_test.go index 90a3d7867b7d..94849d608148 100644 --- a/spanner/transaction_test.go +++ b/spanner/transaction_test.go @@ -163,13 +163,30 @@ func TestApply_Single(t *testing.T) { if _, e := client.Apply(ctx, ms, ApplyAtLeastOnce()); e != nil { t.Fatalf("applyAtLeastOnce retry on abort, got %v, want nil.", e) } - - if _, err := shouldHaveReceived(server.TestSpanner, []interface{}{ + requests := drainRequestsFromServer(server.TestSpanner) + expectedReqs := []interface{}{ &sppb.BatchCreateSessionsRequest{}, &sppb.CommitRequest{}, - }); err != nil { + } + if isMultiplexEnabled { + expectedReqs = []interface{}{ + &sppb.CreateSessionRequest{}, + &sppb.CommitRequest{}, + } + } + if err := compareRequests(expectedReqs, requests); err != nil { t.Fatal(err) } + for _, s := range requests { + switch s.(type) { + case *sppb.CommitRequest: + req, _ := s.(*sppb.CommitRequest) + // Validate the session is multiplexed + if !testEqual(isMultiplexEnabled, strings.Contains(req.Session, "multiplexed")) { + t.Errorf("TestApply_Single expected multiplexed session to be used, got: %v", req.Session) + } + } + } } // Transaction retries on abort.