From bd094adf8ad1d9fdbc448c65abdd46b41e3e84d9 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Fri, 24 Jan 2025 21:45:37 +0000 Subject: [PATCH] crosscluster/physical: remove TestTenantStreamingMultipleNodes The test takes 30 seconds to run on a good day, and the roachtest suite already covers everything this tests. Fixes #136786 Release note: none --- .../physical/replication_stream_e2e_test.go | 71 ------------------- 1 file changed, 71 deletions(-) diff --git a/pkg/crosscluster/physical/replication_stream_e2e_test.go b/pkg/crosscluster/physical/replication_stream_e2e_test.go index 512e0c9a4aea..495279216d3d 100644 --- a/pkg/crosscluster/physical/replication_stream_e2e_test.go +++ b/pkg/crosscluster/physical/replication_stream_e2e_test.go @@ -667,77 +667,6 @@ func TestTenantStreamingDeleteRange(t *testing.T) { checkDelRangeOnTable("t2", false /* embeddedInSST */) } -func TestTenantStreamingMultipleNodes(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - skip.UnderDeadlock(t, "multi-node may time out under deadlock") - skip.UnderRace(t, "multi-node test may time out under race") - - ctx := context.Background() - - testutils.RunTrueAndFalse(t, "fromSystem", func(t *testing.T, sys bool) { - args := replicationtestutils.DefaultTenantStreamingClustersArgs - args.MultitenantSingleClusterNumNodes = 3 - args.RoutingMode = streamclient.RoutingModeNode - - // Track the number of unique addresses that were connected to - clientAddresses := make(map[string]struct{}) - var addressesMu syncutil.Mutex - args.TestingKnobs = &sql.StreamingTestingKnobs{ - BeforeClientSubscribe: func(addr string, token string, _ span.Frontier, _ bool) { - addressesMu.Lock() - defer addressesMu.Unlock() - clientAddresses[addr] = struct{}{} - }, - } - - if sys { - args.SrcTenantID = roachpb.SystemTenantID - args.SrcTenantName = "system" - } - telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) - c, cleanup := replicationtestutils.CreateMultiTenantStreamingCluster(ctx, t, args) - defer cleanup() - - // Make sure we have data on all nodes, so that we will have multiple - // connections and client addresses (and actually test multi-node). - replicationtestutils.CreateScatteredTable(t, c, 3) - - producerJobID, ingestionJobID := c.StartStreamReplication(ctx) - jobutils.WaitForJobToRun(c.T, c.SrcSysSQL, jobspb.JobID(producerJobID)) - jobutils.WaitForJobToRun(c.T, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - - c.SrcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { - tenantSQL.Exec(t, "CREATE TABLE d.x (id INT PRIMARY KEY, n INT)") - tenantSQL.Exec(t, "INSERT INTO d.x VALUES (1, 1)") - }) - - c.DestSysSQL.Exec(t, `PAUSE JOB $1`, ingestionJobID) - jobutils.WaitForJobToPause(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - c.SrcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { - tenantSQL.Exec(t, "INSERT INTO d.x VALUES (2, 2)") - }) - c.DestSysSQL.Exec(t, `RESUME JOB $1`, ingestionJobID) - jobutils.WaitForJobToRun(t, c.DestSysSQL, jobspb.JobID(ingestionJobID)) - - c.SrcExec(func(t *testing.T, sysSQL *sqlutils.SQLRunner, tenantSQL *sqlutils.SQLRunner) { - tenantSQL.Exec(t, "INSERT INTO d.x VALUES (3, 3)") - }) - - c.WaitUntilStartTimeReached(jobspb.JobID(ingestionJobID)) - - cutoverTime := c.DestSysServer.Clock().Now() - c.Cutover(ctx, producerJobID, ingestionJobID, cutoverTime.GoTime(), false) - counts := telemetry.GetFeatureCounts(telemetry.Raw, telemetry.ResetCounts) - require.GreaterOrEqual(t, counts["physical_replication.cutover"], int32(1)) - c.RequireFingerprintMatchAtTimestamp(cutoverTime.AsOfSystemTime()) - - // Since the data was distributed across multiple nodes, multiple nodes should've been connected to - require.Greater(t, len(clientAddresses), 1) - }) -} - func TestSpecsPersistedOnlyAfterInitialPlan(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t)