Skip to content

Commit

Permalink
feat(restore): adds --dc-mapping flag to restore command (#4213)
Browse files Browse the repository at this point in the history
This adds support for `--dc-mapping` flag to restore command. It specifies mapping between DCs from the backup and DCs in the restored(target) cluster. Only 1 use case is supported: 1-1 dc mapping. This means that squeezing (restore dc1 and dc2 into dc3) or extending (restore dc1 into dc1 and dc2) DCs is not supported when --dc-mapping is provided.
So the syntax is:

source_dc1=target_dc1,source_dc2=target_dc2
Where
     equal(=) is used to separate source   dc name and target dc name
     comma(,)  is used to separate multiple mappings

If --dc-mapping is not provided, then current behavior should be preserved - each node with access to DC can download it data. Also it's allowed to provide only subset of DCs, ignoring source dc or target (or both).
Only works with tables restoration (--restore-tables=true).

Fixes: #3829
  • Loading branch information
VAveryanov8 authored Feb 17, 2025
1 parent 723864a commit af23793
Show file tree
Hide file tree
Showing 17 changed files with 839 additions and 172 deletions.
4 changes: 4 additions & 0 deletions docs/source/sctool/partials/sctool_restore.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ options:
usage: |
Task schedule as a cron `expression`.
It supports the extended syntax including @monthly, @weekly, @daily, @midnight, @hourly, @every X[h|m|s].
- name: dc-mapping
default_value: '[]'
usage: "Specifies mapping between DCs from the backup and DCs in the restored(target) cluster.\n\nThe Syntax is \"source_dc1=target_dc1,source_dc2=target_dc2\" where multiple mappings are separated by comma (,)\nand source and target DCs are separated by equal (=).\n\nExample: \"dc1=dc3,dc2=dc4\" - data from dc1 should be restored to dc3 and data from dc2 should be restored to dc4.\n\nOnly works with tables restoration (--restore-tables=true). \nNote: Only DCs that are provided in mappings will be restored.\n"
- name: dry-run
default_value: "false"
usage: |
Expand Down Expand Up @@ -90,6 +93,7 @@ options:
The `<dc>` parameter is optional. It allows you to specify the datacenter whose nodes will be used to restore the data
from this location in a multi-dc setting, it must match Scylla nodes datacenter.
By default, all live nodes are used to restore data from specified locations.
If `--dc-mapping` is used, then `<dc>` parameter will be ignored.
Note that specifying datacenters closest to backup locations might reduce download time of restored data.
The supported storage '<provider>'s are 'azure', 'gcs', 's3'.
Expand Down
4 changes: 4 additions & 0 deletions docs/source/sctool/partials/sctool_restore_update.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ options:
usage: |
Task schedule as a cron `expression`.
It supports the extended syntax including @monthly, @weekly, @daily, @midnight, @hourly, @every X[h|m|s].
- name: dc-mapping
default_value: '[]'
usage: "Specifies mapping between DCs from the backup and DCs in the restored(target) cluster.\n\nThe Syntax is \"source_dc1=target_dc1,source_dc2=target_dc2\" where multiple mappings are separated by comma (,)\nand source and target DCs are separated by equal (=).\n\nExample: \"dc1=dc3,dc2=dc4\" - data from dc1 should be restored to dc3 and data from dc2 should be restored to dc4.\n\nOnly works with tables restoration (--restore-tables=true). \nNote: Only DCs that are provided in mappings will be restored.\n"
- name: dry-run
default_value: "false"
usage: |
Expand Down Expand Up @@ -88,6 +91,7 @@ options:
The `<dc>` parameter is optional. It allows you to specify the datacenter whose nodes will be used to restore the data
from this location in a multi-dc setting, it must match Scylla nodes datacenter.
By default, all live nodes are used to restore data from specified locations.
If `--dc-mapping` is used, then `<dc>` parameter will be ignored.
Note that specifying datacenters closest to backup locations might reduce download time of restored data.
The supported storage '<provider>'s are 'azure', 'gcs', 's3'.
Expand Down
9 changes: 9 additions & 0 deletions pkg/command/restore/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type command struct {
restoreTables bool
dryRun bool
showTables bool
dcMapping map[string]string
}

func NewCommand(client *managerclient.Client) *cobra.Command {
Expand Down Expand Up @@ -90,6 +91,7 @@ func (cmd *command) init() {
w.Unwrap().BoolVar(&cmd.restoreTables, "restore-tables", false, "")
w.Unwrap().BoolVar(&cmd.dryRun, "dry-run", false, "")
w.Unwrap().BoolVar(&cmd.showTables, "show-tables", false, "")
w.Unwrap().StringToStringVar(&cmd.dcMapping, "dc-mapping", nil, "")
}

func (cmd *command) run(args []string) error {
Expand Down Expand Up @@ -182,6 +184,13 @@ func (cmd *command) run(args []string) error {
props["restore_tables"] = cmd.restoreTables
ok = true
}
if cmd.Flag("dc-mapping").Changed {
if cmd.Update() {
return wrapper("dc-mapping")
}
props["dc_mapping"] = cmd.dcMapping
ok = true
}

if cmd.dryRun {
res, err := cmd.client.GetRestoreTarget(cmd.Context(), cmd.cluster, task)
Expand Down
12 changes: 12 additions & 0 deletions pkg/command/restore/res.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ location: |
The `<dc>` parameter is optional. It allows you to specify the datacenter whose nodes will be used to restore the data
from this location in a multi-dc setting, it must match Scylla nodes datacenter.
By default, all live nodes are used to restore data from specified locations.
If `--dc-mapping` is used, then `<dc>` parameter will be ignored.
Note that specifying datacenters closest to backup locations might reduce download time of restored data.
The supported storage '<provider>'s are 'azure', 'gcs', 's3'.
Expand Down Expand Up @@ -72,3 +73,14 @@ dry-run: |
show-tables: |
Prints table names together with keyspace, used in combination with --dry-run.
dc-mapping: |
Specifies mapping between DCs from the backup and DCs in the restored(target) cluster.
The Syntax is "source_dc1=target_dc1,source_dc2=target_dc2" where multiple mappings are separated by comma (,)
and source and target DCs are separated by equal (=).
Example: "dc1=dc3,dc2=dc4" - data from dc1 should be restored to dc3 and data from dc2 should be restored to dc4.
Only works with tables restoration (--restore-tables=true).
Note: Only DCs that are provided in mappings will be restored.
34 changes: 19 additions & 15 deletions pkg/service/restore/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type batchDispatcher struct {
hostShardCnt map[string]uint
}

func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationHosts map[backupspec.Location][]string) *batchDispatcher {
func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[string]uint, locationInfo []LocationInfo) *batchDispatcher {
sortWorkload(workload)
var shards uint
for _, sh := range hostShardCnt {
Expand All @@ -70,7 +70,7 @@ func newBatchDispatcher(workload Workload, batchSize int, hostShardCnt map[strin
mu: sync.Mutex{},
wait: make(chan struct{}),
workload: workload,
workloadProgress: newWorkloadProgress(workload, locationHosts),
workloadProgress: newWorkloadProgress(workload, locationInfo),
batchSize: batchSize,
expectedShardWorkload: workload.TotalSize / int64(shards),
hostShardCnt: hostShardCnt,
Expand Down Expand Up @@ -106,32 +106,36 @@ type remoteSSTableDirProgress struct {
RemainingSSTables []RemoteSSTable
}

func newWorkloadProgress(workload Workload, locationHosts map[backupspec.Location][]string) workloadProgress {
func newWorkloadProgress(workload Workload, locationInfo []LocationInfo) workloadProgress {
dcBytes := make(map[string]int64)
locationDC := make(map[string][]string)
p := make([]remoteSSTableDirProgress, len(workload.RemoteDir))
for i, rdw := range workload.RemoteDir {
dcBytes[rdw.DC] += rdw.Size
locationDC[rdw.Location.StringWithoutDC()] = append(locationDC[rdw.Location.StringWithoutDC()], rdw.DC)
p[i] = remoteSSTableDirProgress{
RemainingSize: rdw.Size,
RemainingSSTables: rdw.SSTables,
}
}
hostDCAccess := make(map[string][]string)
for loc, hosts := range locationHosts {
for _, h := range hosts {
hostDCAccess[h] = append(hostDCAccess[h], locationDC[loc.StringWithoutDC()]...)
}
}
return workloadProgress{
dcBytesToBeRestored: dcBytes,
hostFailedDC: make(map[string][]string),
hostDCAccess: hostDCAccess,
hostDCAccess: getHostDCAccess(locationInfo),
remoteDir: p,
}
}

func getHostDCAccess(locationInfo []LocationInfo) map[string][]string {
hostDCAccess := map[string][]string{}
for _, l := range locationInfo {
for dc, hosts := range l.DCHosts {
for _, h := range hosts {
hostDCAccess[h] = append(hostDCAccess[h], dc)
}
}
}
return hostDCAccess
}

// Checks if given host finished restoring all that it could.
func (wp workloadProgress) isDone(host string) bool {
failed := wp.hostFailedDC[host]
Expand Down Expand Up @@ -201,8 +205,8 @@ func (bd *batchDispatcher) ValidateAllDispatched() error {
for i, rdp := range bd.workloadProgress.remoteDir {
if rdp.RemainingSize != 0 || len(rdp.RemainingSSTables) != 0 {
rdw := bd.workload.RemoteDir[i]
return errors.Errorf("failed to restore sstables from location %s table %s.%s (%d bytes). See logs for more info",
rdw.Location, rdw.Keyspace, rdw.Table, rdw.Size)
return errors.Errorf("failed to restore sstables from location %s dc %s table %s.%s (%d bytes). See logs for more info",
rdw.Location, rdw.DC, rdw.Keyspace, rdw.Table, rdw.Size)
}
}
for dc, bytes := range bd.workloadProgress.dcBytesToBeRestored {
Expand Down Expand Up @@ -257,7 +261,7 @@ func (bd *batchDispatcher) dispatchBatch(host string) (batch, bool) {
if slices.Contains(bd.workloadProgress.hostFailedDC[host], rdw.DC) {
continue
}
// Sip dir from location without access
// Skip dir from location without access
if !slices.Contains(bd.workloadProgress.hostDCAccess[host], rdw.DC) {
continue
}
Expand Down
127 changes: 122 additions & 5 deletions pkg/service/restore/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package restore
import (
"testing"

"github.com/google/go-cmp/cmp"
"github.com/scylladb/scylla-manager/backupspec"
)

Expand Down Expand Up @@ -104,17 +105,29 @@ func TestBatchDispatcher(t *testing.T) {

workload := aggregateWorkload(rawWorkload)

locationHosts := map[backupspec.Location][]string{
l1: {"h1", "h2"},
l2: {"h3"},
}
hostToShard := map[string]uint{
"h1": 1,
"h2": 2,
"h3": 3,
}

bd := newBatchDispatcher(workload, 1, hostToShard, locationHosts)
locationInfo := []LocationInfo{
{
Location: l1,
DCHosts: map[string][]string{
"dc1": {"h1", "h2"},
"dc2": {"h1", "h2"},
},
},
{
Location: l2,
DCHosts: map[string][]string{
"dc3": {"h3"},
},
},
}

bd := newBatchDispatcher(workload, 1, hostToShard, locationInfo)

scenario := []struct {
host string
Expand Down Expand Up @@ -166,3 +179,107 @@ func TestBatchDispatcher(t *testing.T) {
t.Fatalf("Expected sstables to be batched: %s", err)
}
}

func TestGetHostDCAccess(t *testing.T) {
testCases := []struct {
name string

locationInfo []LocationInfo

expected map[string][]string
}{
{
name: "one location with one DC",
locationInfo: []LocationInfo{
{
DCHosts: map[string][]string{
"dc1": {"host1", "host2"},
},
},
},
expected: map[string][]string{
"host1": {"dc1"},
"host2": {"dc1"},
},
},
{
name: "one location with two DC's",
locationInfo: []LocationInfo{
{
DCHosts: map[string][]string{
"dc1": {"host1"},
"dc2": {"host2"},
},
},
},
expected: map[string][]string{
"host1": {"dc1"},
"host2": {"dc2"},
},
},
{
name: "one location with two DC's, more nodes",
locationInfo: []LocationInfo{
{
DCHosts: map[string][]string{
"dc1": {"host1", "host2"},
"dc2": {"host3", "host4"},
},
},
},
expected: map[string][]string{
"host1": {"dc1"},
"host2": {"dc1"},
"host3": {"dc2"},
"host4": {"dc2"},
},
},
{
name: "two locations with one DC each",
locationInfo: []LocationInfo{
{
DCHosts: map[string][]string{
"dc1": {"host1"},
},
},
{
DCHosts: map[string][]string{
"dc2": {"host2"},
},
},
},
expected: map[string][]string{
"host1": {"dc1"},
"host2": {"dc2"},
},
},
{
name: "two locations with one DC each, but hosts maps to all dcs",
locationInfo: []LocationInfo{
{
DCHosts: map[string][]string{
"dc1": {"host1", "host2"},
},
},
{
DCHosts: map[string][]string{
"dc2": {"host1", "host2"},
},
},
},
expected: map[string][]string{
"host1": {"dc1", "dc2"},
"host2": {"dc1", "dc2"},
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
actual := getHostDCAccess(tc.locationInfo)
if diff := cmp.Diff(actual, tc.expected); diff != "" {
t.Fatalf("Actual != Expected: %s", diff)
}
})
}
}
10 changes: 5 additions & 5 deletions pkg/service/restore/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ type SSTable struct {
}

// IndexWorkload returns sstables to be restored aggregated by location, table and remote sstable dir.
func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []backupspec.Location) (Workload, error) {
func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []LocationInfo) (Workload, error) {
var rawWorkload []RemoteDirWorkload
for _, l := range locations {
lw, err := w.indexLocationWorkload(ctx, l)
if err != nil {
return Workload{}, errors.Wrapf(err, "index workload in %s", l)
return Workload{}, errors.Wrapf(err, "index workload in %s", l.Location)
}
rawWorkload = append(rawWorkload, lw...)
}
Expand All @@ -60,7 +60,7 @@ func (w *tablesWorker) IndexWorkload(ctx context.Context, locations []backupspec
return workload, nil
}

func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location backupspec.Location) ([]RemoteDirWorkload, error) {
func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location LocationInfo) ([]RemoteDirWorkload, error) {
rawWorkload, err := w.createRemoteDirWorkloads(ctx, location)
if err != nil {
return nil, errors.Wrap(err, "create remote dir workloads")
Expand All @@ -74,7 +74,7 @@ func (w *tablesWorker) indexLocationWorkload(ctx context.Context, location backu
return rawWorkload, nil
}

func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location backupspec.Location) ([]RemoteDirWorkload, error) {
func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location LocationInfo) ([]RemoteDirWorkload, error) {
var rawWorkload []RemoteDirWorkload
err := w.forEachManifest(ctx, location, func(m backupspec.ManifestInfoWithContent) error {
return m.ForEachIndexIterWithError(nil, func(fm backupspec.FilesMeta) error {
Expand All @@ -87,7 +87,7 @@ func (w *tablesWorker) createRemoteDirWorkloads(ctx context.Context, location ba
return errors.Wrapf(err, "convert files meta to sstables")
}
sstDir := m.LocationSSTableVersionDir(fm.Keyspace, fm.Table, fm.Version)
remoteSSTables, err := w.adjustSSTablesWithRemote(ctx, w.randomHostFromLocation(location), sstDir, sstables)
remoteSSTables, err := w.adjustSSTablesWithRemote(ctx, location.AnyHost(), sstDir, sstables)
if err != nil {
return errors.Wrap(err, "fetch sstables sizes")
}
Expand Down
Loading

0 comments on commit af23793

Please sign in to comment.