Skip to content

Commit

Permalink
Merge pull request cockroachdb#139949 from cockroachdb/blathers/backp…
Browse files Browse the repository at this point in the history
…ort-release-25.1-139784

release-25.1: crosscluster/physical: remove TestTenantStreamingMultipleNodes
  • Loading branch information
msbutler authored Jan 31, 2025
2 parents 10d99ef + bd094ad commit bdb558b
Showing 1 changed file with 0 additions and 71 deletions.
71 changes: 0 additions & 71 deletions pkg/crosscluster/physical/replication_stream_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,77 +667,6 @@ func TestTenantStreamingDeleteRange(t *testing.T) {
checkDelRangeOnTable("t2", false /* embeddedInSST */)
}

func TestTenantStreamingMultipleNodes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderDeadlock(t, "multi-node may time out under deadlock")
skip.UnderRace(t, "multi-node test may time out under race")

ctx := context.Background()

testutils.RunTrueAndFalse(t, "fromSystem", func(t *testing.T, sys bool) {
args := replicationtestutils.DefaultTenantStreamingClustersArgs
args.MultitenantSingleClusterNumNodes = 3
args.RoutingMode = streamclient.RoutingModeNode

// Track the number of unique addresses that were connected to
clientAddresses := make(map[string]struct{})
var addressesMu syncutil.Mutex
args.TestingKnobs = &sql.StreamingTestingKnobs{
BeforeClientSubscribe: func(addr string, token string, _ span.Frontier, _ bool) {
addressesMu.Lock()
defer addressesMu.Unlock()
clientAddresses[addr] = struct{}{}
},
}

if sys {
args.SrcTenantID = roachpb.SystemTenantID
args.SrcTenantName = "system"
}
telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args)
defer cleanup()

// Make sure we have data on all nodes, so that we will have multiple
// connections and client addresses (and actually test multi-node).
replicationtestutils.CreateScatteredTable(t, c, 3)

producerJobID, ingestionJobID := c.StartStreamReplication(ctx)
jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID))
jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID))

c.SrcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) {
tenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)")
tenantSQL.Exec(t, "INSERT INTO d.x VALUES (1, 1)")
})

c.DestSysSQL.Exec(t, `PAUSE JOB $1`, ingestionJobID)
jobutils.WaitForJobToPause(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))
c.SrcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) {
tenantSQL.Exec(t, "INSERT INTO d.x VALUES (2, 2)")
})
c.DestSysSQL.Exec(t, `RESUME JOB $1`, ingestionJobID)
jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID))

c.SrcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) {
tenantSQL.Exec(t, "INSERT INTO d.x VALUES (3, 3)")
})

c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID))

cutoverTime := c.DestSysServer.Clock().Now()
c.Cutover(ctx, producerJobID, ingestionJobID, cutoverTime.GoTime(), false)
counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts)
require.GreaterOrEqual(t, counts["physical_replication.cutover"], int32(1))
c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime())

// Since the data was distributed across multiple nodes, multiple nodes should've been connected to
require.Greater(t, len(clientAddresses), 1)
})
}

func TestSpecsPersistedOnlyAfterInitialPlan(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit bdb558b

Please sign in to comment.