Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: return pointers from TxnSender.GetLeafTxn{Input/Final}State #74346

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -1169,19 +1169,19 @@ func (tc *TxnCoordSender) Active() bool {
// GetLeafTxnInputState is part of the client.TxnSender interface.
func (tc *TxnCoordSender) GetLeafTxnInputState(
ctx context.Context, opt kv.TxnStatusOpt,
) (roachpb.LeafTxnInputState, error) {
) (*roachpb.LeafTxnInputState, error) {
tis := new(roachpb.LeafTxnInputState)
tc.mu.Lock()
defer tc.mu.Unlock()

if err := tc.checkTxnStatusLocked(ctx, opt); err != nil {
return roachpb.LeafTxnInputState{}, err
return nil, err
}

// Copy mutable state so access is safe for the caller.
var tis roachpb.LeafTxnInputState
tis.Txn = tc.mu.txn
for _, reqInt := range tc.interceptorStack {
reqInt.populateLeafInputState(&tis)
reqInt.populateLeafInputState(tis)
}

// Also mark the TxnCoordSender as "active". This prevents changing
Expand All @@ -1196,16 +1196,15 @@ func (tc *TxnCoordSender) GetLeafTxnInputState(
// GetLeafTxnFinalState is part of the client.TxnSender interface.
func (tc *TxnCoordSender) GetLeafTxnFinalState(
ctx context.Context, opt kv.TxnStatusOpt,
) (roachpb.LeafTxnFinalState, error) {
) (*roachpb.LeafTxnFinalState, error) {
tfs := new(roachpb.LeafTxnFinalState)
tc.mu.Lock()
defer tc.mu.Unlock()

if err := tc.checkTxnStatusLocked(ctx, opt); err != nil {
return roachpb.LeafTxnFinalState{}, err
return nil, err
}

var tfs roachpb.LeafTxnFinalState

// For compatibility with pre-20.1 nodes: populate the command
// count.
// TODO(knz,andrei): Remove this and the command count
Expand All @@ -1217,7 +1216,7 @@ func (tc *TxnCoordSender) GetLeafTxnFinalState(
// Copy mutable state so access is safe for the caller.
tfs.Txn = tc.mu.txn
for _, reqInt := range tc.interceptorStack {
reqInt.populateLeafFinalState(&tfs)
reqInt.populateLeafFinalState(tfs)
}

return tfs, nil
Expand Down
10 changes: 5 additions & 5 deletions pkg/kv/kvclient/kvcoord/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -852,7 +852,7 @@ func TestTxnMultipleCoord(t *testing.T) {

// New create a second, leaf coordinator.
leafInputState := txn.GetLeafTxnInputState(ctx)
txn2 := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, &leafInputState)
txn2 := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState)

// Start the second transaction.
key2 := roachpb.Key("b")
Expand All @@ -865,7 +865,7 @@ func TestTxnMultipleCoord(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if err := txn.UpdateRootWithLeafFinalState(ctx, &tfs); err != nil {
if err := txn.UpdateRootWithLeafFinalState(ctx, tfs); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -2426,7 +2426,7 @@ func TestLeafTxnClientRejectError(t *testing.T) {
leafInputState := rootTxn.GetLeafTxnInputState(ctx)

// New create a second, leaf coordinator.
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, &leafInputState)
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0 /* gatewayNodeID */, leafInputState)

if _, err := leafTxn.Get(ctx, errKey); !testutils.IsError(err, "TransactionAbortedError") {
t.Fatalf("expected injected err, got: %v", err)
Expand Down Expand Up @@ -2455,14 +2455,14 @@ func TestUpdateRoootWithLeafFinalStateInAbortedTxn(t *testing.T) {

txn := kv.NewTxn(ctx, s.DB, 0 /* gatewayNodeID */)
leafInputState := txn.GetLeafTxnInputState(ctx)
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0, &leafInputState)
leafTxn := kv.NewLeafTxn(ctx, s.DB, 0, leafInputState)

finalState, err := leafTxn.GetLeafTxnFinalState(ctx)
if err != nil {
t.Fatal(err)
}
finalState.Txn.Status = roachpb.ABORTED
if err := txn.UpdateRootWithLeafFinalState(ctx, &finalState); err != nil {
if err := txn.UpdateRootWithLeafFinalState(ctx, finalState); err != nil {
t.Fatal(err)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/mock_transactional_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ func (m *MockTransactionalSender) Send(
// GetLeafTxnInputState is part of the TxnSender interface.
func (m *MockTransactionalSender) GetLeafTxnInputState(
context.Context, TxnStatusOpt,
) (roachpb.LeafTxnInputState, error) {
) (*roachpb.LeafTxnInputState, error) {
panic("unimplemented")
}

// GetLeafTxnFinalState is part of the TxnSender interface.
func (m *MockTransactionalSender) GetLeafTxnFinalState(
context.Context, TxnStatusOpt,
) (roachpb.LeafTxnFinalState, error) {
) (*roachpb.LeafTxnFinalState, error) {
panic("unimplemented")
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ type TxnSender interface {
//
// If AnyTxnStatus is passed, then this function never returns
// errors.
GetLeafTxnInputState(context.Context, TxnStatusOpt) (roachpb.LeafTxnInputState, error)
GetLeafTxnInputState(context.Context, TxnStatusOpt) (*roachpb.LeafTxnInputState, error)

// GetLeafTxnFinalState retrieves the final state of a LeafTxn
// necessary and sufficient to update a RootTxn with progress made
// on its behalf by the LeafTxn.
GetLeafTxnFinalState(context.Context, TxnStatusOpt) (roachpb.LeafTxnFinalState, error)
GetLeafTxnFinalState(context.Context, TxnStatusOpt) (*roachpb.LeafTxnFinalState, error)

// UpdateRootWithLeafFinalState updates a RootTxn using the final
// state of a LeafTxn.
Expand Down
24 changes: 11 additions & 13 deletions pkg/kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,7 @@ func (txn *Txn) applyDeadlineToBoundedStaleness(
// transaction for use with InitializeLeafTxn(), when distributing
// the state of the current transaction to multiple distributed
// transaction coordinators.
func (txn *Txn) GetLeafTxnInputState(ctx context.Context) roachpb.LeafTxnInputState {
func (txn *Txn) GetLeafTxnInputState(ctx context.Context) *roachpb.LeafTxnInputState {
if txn.typ != RootTxn {
panic(errors.WithContextTags(errors.AssertionFailedf("GetLeafTxnInputState() called on leaf txn"), ctx))
}
Expand All @@ -1290,18 +1290,18 @@ func (txn *Txn) GetLeafTxnInputState(ctx context.Context) roachpb.LeafTxnInputSt
// retryable errors, it acts like Send()).
func (txn *Txn) GetLeafTxnInputStateOrRejectClient(
ctx context.Context,
) (roachpb.LeafTxnInputState, error) {
) (*roachpb.LeafTxnInputState, error) {
if txn.typ != RootTxn {
return roachpb.LeafTxnInputState{},
errors.WithContextTags(errors.AssertionFailedf("GetLeafTxnInputStateOrRejectClient() called on leaf txn"), ctx)
return nil, errors.WithContextTags(
errors.AssertionFailedf("GetLeafTxnInputStateOrRejectClient() called on leaf txn"), ctx)
}

txn.mu.Lock()
defer txn.mu.Unlock()
tfs, err := txn.mu.sender.GetLeafTxnInputState(ctx, OnlyPending)
if err != nil {
txn.handleErrIfRetryableLocked(ctx, err)
return roachpb.LeafTxnInputState{}, err
return nil, err
}
return tfs, nil
}
Expand All @@ -1310,21 +1310,19 @@ func (txn *Txn) GetLeafTxnInputStateOrRejectClient(
// transaction for use with UpdateRootWithLeafFinalState(), when combining the
// impact of multiple distributed transaction coordinators that are
// all operating on the same transaction.
func (txn *Txn) GetLeafTxnFinalState(ctx context.Context) (roachpb.LeafTxnFinalState, error) {
func (txn *Txn) GetLeafTxnFinalState(ctx context.Context) (*roachpb.LeafTxnFinalState, error) {
if txn.typ != LeafTxn {
return roachpb.LeafTxnFinalState{},
errors.WithContextTags(
errors.AssertionFailedf("GetLeafTxnFinalState() called on root txn"), ctx)
return nil, errors.WithContextTags(
errors.AssertionFailedf("GetLeafTxnFinalState() called on root txn"), ctx)
}

txn.mu.Lock()
defer txn.mu.Unlock()
tfs, err := txn.mu.sender.GetLeafTxnFinalState(ctx, AnyTxnStatus)
if err != nil {
return roachpb.LeafTxnFinalState{},
errors.WithContextTags(
errors.NewAssertionErrorWithWrappedErrf(err,
"unexpected error from GetLeafTxnFinalState(AnyTxnStatus)"), ctx)
return nil, errors.WithContextTags(
errors.NewAssertionErrorWithWrappedErrf(err,
"unexpected error from GetLeafTxnFinalState(AnyTxnStatus)"), ctx)
}
return tfs, nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/colflow/colbatch_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestColBatchScanMeta(t *testing.T) {

rootTxn := kv.NewTxn(ctx, s.DB(), s.NodeID())
leafInputState := rootTxn.GetLeafTxnInputState(ctx)
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), &leafInputState)
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), leafInputState)
flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,7 @@ func (dsp *DistSQLPlanner) Run(
recv.SetError(err)
return func() {}
}
leafInputState = &tis
leafInputState = tis
}

if logPlanDiagram {
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/execinfra/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func GetLeafTxnFinalState(ctx context.Context, txn *kv.Txn) *roachpb.LeafTxnFina
if txnMeta.Txn.ID == uuid.Nil {
return nil
}
return &txnMeta
return txnMeta
}

// DrainAndClose is a version of DrainAndForwardMetadata that drains multiple
Expand Down
10 changes: 5 additions & 5 deletions pkg/sql/flowinfra/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestClusterFlow(t *testing.T) {

req1 := &execinfrapb.SetupFlowRequest{
Version: execinfra.Version,
LeafTxnInputState: &leafInputState,
LeafTxnInputState: leafInputState,
Flow: execinfrapb.FlowSpec{
FlowID: fid,
Processors: []execinfrapb.ProcessorSpec{{
Expand All @@ -151,7 +151,7 @@ func TestClusterFlow(t *testing.T) {

req2 := &execinfrapb.SetupFlowRequest{
Version: execinfra.Version,
LeafTxnInputState: &leafInputState,
LeafTxnInputState: leafInputState,
Flow: execinfrapb.FlowSpec{
FlowID: fid,
Processors: []execinfrapb.ProcessorSpec{{
Expand All @@ -174,7 +174,7 @@ func TestClusterFlow(t *testing.T) {

req3 := &execinfrapb.SetupFlowRequest{
Version: execinfra.Version,
LeafTxnInputState: &leafInputState,
LeafTxnInputState: leafInputState,
Flow: execinfrapb.FlowSpec{
FlowID: fid,
Processors: []execinfrapb.ProcessorSpec{
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestLimitedBufferingDeadlock(t *testing.T) {

req := execinfrapb.SetupFlowRequest{
Version: execinfra.Version,
LeafTxnInputState: &leafInputState,
LeafTxnInputState: leafInputState,
Flow: execinfrapb.FlowSpec{
FlowID: execinfrapb.FlowID{UUID: uuid.MakeV4()},
// The left-hand Values processor in the diagram above.
Expand Down Expand Up @@ -720,7 +720,7 @@ func BenchmarkInfrastructure(b *testing.B) {
for i := range reqs {
reqs[i] = execinfrapb.SetupFlowRequest{
Version: execinfra.Version,
LeafTxnInputState: &leafInputState,
LeafTxnInputState: leafInputState,
Flow: execinfrapb.FlowSpec{
Processors: []execinfrapb.ProcessorSpec{{
Core: execinfrapb.ProcessorCoreUnion{Values: &valSpecs[i]},
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/flowinfra/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestServer(t *testing.T) {

req := &execinfrapb.SetupFlowRequest{
Version: execinfra.Version,
LeafTxnInputState: &leafInputState,
LeafTxnInputState: leafInputState,
}
req.Flow = execinfrapb.FlowSpec{
Processors: []execinfrapb.ProcessorSpec{{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,7 +569,7 @@ func TestInternalExecutorInLeafTxnDoesNotPanic(t *testing.T) {
rootTxn := kvDB.NewTxn(ctx, "root-txn")

ltis := rootTxn.GetLeafTxnInputState(ctx)
leafTxn := kv.NewLeafTxn(ctx, kvDB, roachpb.NodeID(1), &ltis)
leafTxn := kv.NewLeafTxn(ctx, kvDB, roachpb.NodeID(1), ltis)

ie := s.InternalExecutor().(*sql.InternalExecutor)
_, err := ie.ExecEx(
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/physicalplan/aggregator_funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func runTestFlow(
leafInputState := txn.GetLeafTxnInputState(context.Background())
req := execinfrapb.SetupFlowRequest{
Version: execinfra.Version,
LeafTxnInputState: &leafInputState,
LeafTxnInputState: leafInputState,
Flow: execinfrapb.FlowSpec{
FlowID: execinfrapb.FlowID{UUID: uuid.FastMakeV4()},
Processors: procs,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/inverted_joiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -753,7 +753,7 @@ func TestInvertedJoinerDrain(t *testing.T) {
defer diskMonitor.Stop(ctx)
rootTxn := kv.NewTxn(ctx, s.DB(), s.NodeID())
leafInputState := rootTxn.GetLeafTxnInputState(ctx)
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), &leafInputState)
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), leafInputState)
flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/joinreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,7 @@ func TestJoinReaderDrain(t *testing.T) {

rootTxn := kv.NewTxn(ctx, s.DB(), s.NodeID())
leafInputState := rootTxn.GetLeafTxnInputState(ctx)
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), &leafInputState)
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), leafInputState)

flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/tablereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func TestTableReaderDrain(t *testing.T) {

rootTxn := kv.NewTxn(ctx, s.DB(), s.NodeID())
leafInputState := rootTxn.GetLeafTxnInputState(ctx)
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), &leafInputState)
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), leafInputState)
flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/rowexec/zigzagjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func TestZigzagJoinerDrain(t *testing.T) {

rootTxn := kv.NewTxn(ctx, s.DB(), s.NodeID())
leafInputState := rootTxn.GetLeafTxnInputState(ctx)
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), &leafInputState)
leafTxn := kv.NewLeafTxn(ctx, s.DB(), s.NodeID(), leafInputState)
flowCtx := execinfra.FlowCtx{
EvalCtx: &evalCtx,
Cfg: &execinfra.ServerConfig{Settings: s.ClusterSettings()},
Expand Down