Skip to content

Commit

Permalink
feat(repair_test): add test for batching ranges
Browse files Browse the repository at this point in the history
  • Loading branch information
Michal-Leszczynski authored and karol-kokoszka committed Jun 20, 2024
1 parent ddc3d8e commit f612390
Showing 1 changed file with 64 additions and 0 deletions.
64 changes: 64 additions & 0 deletions pkg/service/repair/service_repair_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,19 @@ func createKeyspace(t *testing.T, session gocqlx.Session, keyspace string, rf1,
ExecStmt(t, session, fmt.Sprintf(createKeyspaceStmt, keyspace, rf1, rf2))
}

func tryCreateTabletKeyspace(t *testing.T, session gocqlx.Session, keyspace string, rf1, rf2, tablets int) {
createKeyspaceStmt := "CREATE KEYSPACE %s WITH replication = {'class': 'NetworkTopologyStrategy', 'dc1': %d, 'dc2': %d}"
tabletStmt := " AND tablets = {'enabled': true, 'initial': %d}"
if tablets > 0 {
err := session.ExecStmt(fmt.Sprintf(createKeyspaceStmt+tabletStmt, keyspace, rf1, rf2, tablets))
if err == nil {
return
}
// Fallback as we don't know if tablets are enabled
}
ExecStmt(t, session, fmt.Sprintf(createKeyspaceStmt, keyspace, rf1, rf2))
}

func dropKeyspace(t *testing.T, session gocqlx.Session, keyspace string) {
ExecStmt(t, session, fmt.Sprintf("DROP KEYSPACE IF EXISTS %q", keyspace))
}
Expand Down Expand Up @@ -2058,6 +2071,57 @@ func TestServiceRepairIntegration(t *testing.T) {
t.Fatalf("Expected table to be fully repaired, got %d/%d", p.Tables[0].TokenRanges, p.Tables[0].Success)
}
})

t.Run("ranges batching", func(t *testing.T) {
h := newRepairTestHelper(t, session, defaultConfig())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

const (
ks = "test_repair_ranges_batching"
desiredIntensity = 1
)

Print("When: prepare keyspace with 9 replica sets")
tryCreateTabletKeyspace(t, clusterSession, ks, 2, 2, 256)
WriteData(t, clusterSession, ks, 1, "test_table_0")
defer dropKeyspace(t, clusterSession, ks)

cnt := atomic.Int64{}
h.Hrt.SetInterceptor(httpx.RoundTripperFunc(func(req *http.Request) (*http.Response, error) {
if repairEndpointRegexp.MatchString(req.URL.Path) && req.Method == http.MethodPost {
cnt.Add(1)
intensity, err := strconv.Atoi(req.URL.Query()["ranges_parallelism"][0])
if err != nil {
t.Error(err)
}
if intensity != desiredIntensity {
t.Errorf("Expected ranges_parallelism=%d, got %d", desiredIntensity, intensity)
}
rangesCnt := len(strings.Split(req.URL.Query()["ranges"][0], ","))
if intensity == rangesCnt {
t.Error("Ranges should be batched")
}
}
return nil, nil
}))

Print("When: run repair")
h.runRepair(ctx, map[string]any{
"keyspace": []string{ks},
"fail_fast": true,
"small_table_threshold": repairAllSmallTableThreshold,
})

Print("Then: repair is done")
h.assertDone(longWait)

Print("Then: assuming that batching=100% and table with 9 replica sets, validate that 9 jobs were sent")
if v := cnt.Load(); v != 9 {
t.Errorf("Expected 9 jobs to be sent (one per replica set), got %d", v)
}
})

}

func TestServiceRepairErrorNodetoolRepairRunningIntegration(t *testing.T) {
Expand Down

0 comments on commit f612390

Please sign in to comment.