Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(1-1-restore): sets tombstone_gc mode to repair #4260

Draft
wants to merge 3 commits into
base: va/1-1-restore-views
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 5 additions & 16 deletions pkg/service/one2onerestore/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,16 @@ func (s *Service) One2OneRestore(ctx context.Context, clusterID, taskID, runID u
"run_id", runID,
)

target, err := s.parseTarget(properties)
w, err := s.newWorker(ctx, clusterID)
if err != nil {
return errors.Wrap(err, "parse target")
return errors.Wrap(err, "new worker")
}
s.logger.Info(ctx, "Service input params", "target", target)

w, err := s.newWorker(ctx, clusterID)
target, err := w.parseTarget(ctx, properties)
if err != nil {
return errors.Wrap(err, "new worker")
return errors.Wrap(err, "parse target")
}
s.logger.Info(ctx, "Service input params", "target", target)

manifests, hosts, err := w.getAllSnapshotManifestsAndTargetHosts(ctx, target)
if err != nil {
Expand All @@ -100,17 +100,6 @@ func (s *Service) One2OneRestore(ctx context.Context, clusterID, taskID, runID u
return nil
}

func (s *Service) parseTarget(properties json.RawMessage) (Target, error) {
target := defaultTarget()
if err := json.Unmarshal(properties, &target); err != nil {
return Target{}, errors.Wrap(err, "unmarshal json")
}
if err := target.validateProperties(); err != nil {
return Target{}, errors.Wrap(err, "invalid target")
}
return target, nil
}

func (s *Service) newWorker(ctx context.Context, clusterID uuid.UUID) (worker, error) {
client, err := s.scyllaClient(ctx, clusterID)
if err != nil {
Expand Down
11 changes: 11 additions & 0 deletions pkg/service/one2onerestore/service_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestOne2OneRestoreServiceIntegration(t *testing.T) {

clusterSession := CreateSessionAndDropAllKeyspaces(t, h.client)

// Setup schema and data
ksName := "testrestore"
WriteData(t, clusterSession, ksName, 10)
mvName := "testmv"
Expand Down Expand Up @@ -73,6 +74,16 @@ func TestOne2OneRestoreServiceIntegration(t *testing.T) {
if srcCntMV != dstCntMV {
t.Fatalf("Expected row count in materialized view %d, but got %d", srcCntMV, dstCntMV)
}

// Ensure table's tombstone_gc mode is set to 'repair'
w, _ := newTestWorker(t, ManagedClusterHosts())
mode, err := w.getTableTombstoneGCMode(ksName, BigTableName)
if err != nil {
t.Fatalf("Get table tombstone_gc mode: %v", err)
}
if mode != modeRepair {
t.Fatalf("Expected repair mode, but got %s", string(mode))
}
}

func truncateAllTablesInKeyspace(tb testing.TB, session gocqlx.Session, ks string) {
Expand Down
133 changes: 133 additions & 0 deletions pkg/service/one2onerestore/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ package one2onerestore

import (
"context"
"encoding/json"
"regexp"
"slices"
"strings"
"time"

Expand All @@ -13,7 +16,9 @@ import (
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"github.com/scylladb/scylla-manager/v3/pkg/util/retry"
"github.com/scylladb/scylla-manager/v3/pkg/util/version"
"go.uber.org/multierr"
)

Expand All @@ -28,6 +33,10 @@ type worker struct {

// restore is an actual 1-1-restore stages.
func (w *worker) restore(ctx context.Context, workload []hostWorkload, target Target) (err error) {
if err := w.setTombstoneGCModeRepair(ctx, workload, target.Keyspace); err != nil {
return errors.Wrap(err, "tombstone_gc mode")
}

views, err := w.dropViews(ctx, workload, target.Keyspace)
if err != nil {
return errors.Wrap(err, "drop views")
Expand All @@ -44,6 +53,23 @@ func (w *worker) restore(ctx context.Context, workload []hostWorkload, target Ta
return w.restoreTables(ctx, workload, target.Keyspace)
}

func (w *worker) parseTarget(ctx context.Context, properties json.RawMessage) (Target, error) {
target := defaultTarget()
if err := json.Unmarshal(properties, &target); err != nil {
return Target{}, errors.Wrap(err, "unmarshal json")
}
if err := target.validateProperties(); err != nil {
return Target{}, errors.Wrap(err, "invalid target")
}
skip, err := skipRestorePatterns(ctx, w.client, w.clusterSession)
if err != nil {
return Target{}, errors.Wrap(err, "skip restore patterns")
}
w.logger.Info(ctx, "Extended excluded tables pattern", "pattern", skip)
target.Keyspace = append(target.Keyspace, skip...)
return target, nil
}

// getAllSnapshotManifestsAndTargetHosts gets backup(source) cluster node represented by manifests and target cluster nodes.
func (w *worker) getAllSnapshotManifestsAndTargetHosts(ctx context.Context, target Target) ([]*backupspec.ManifestInfo, []Host, error) {
nodeStatus, err := w.client.Status(ctx)
Expand Down Expand Up @@ -154,3 +180,110 @@ func alterSchemaRetryWrapper(ctx context.Context, op func() error, notify func(e

return retry.WithNotify(ctx, wrappedOp, backoff, notify)
}

func skipRestorePatterns(ctx context.Context, client *scyllaclient.Client, session gocqlx.Session) ([]string, error) {
keyspaces, err := client.KeyspacesByType(ctx)
if err != nil {
return nil, errors.Wrap(err, "get keyspaces by type")
}
tables, err := client.AllTables(ctx)
if err != nil {
return nil, errors.Wrap(err, "get all tables")
}

var skip []string
// Skip local data.
// Note that this also covers the raft based tables (e.g. system and system_schema).
for _, ks := range keyspaces[scyllaclient.KeyspaceTypeAll] {
if !slices.Contains(keyspaces[scyllaclient.KeyspaceTypeNonLocal], ks) {
skip = append(skip, ks)
}
}

// Skip outdated tables.
// Note that even though system_auth is not used in Scylla 6.0,
// it might still be present there (leftover after upgrade).
// That's why SM should always skip known outdated tables so that backups
// from older Scylla versions don't cause unexpected problems.
if err := isRestoreAuthAndServiceLevelsFromSStablesSupported(ctx, client); err != nil {
if errors.Is(err, errRestoreAuthAndServiceLevelsUnsupportedScyllaVersion) {
skip = append(skip, "system_auth", "system_distributed.service_levels")
} else {
return nil, errors.Wrap(err, "check auth and service levels restore support")
}
}

// Skip system cdc tables
systemCDCTableRegex := regexp.MustCompile(`(^|_)cdc(_|$)`)
for ks, tabs := range tables {
// Local keyspaces were already excluded
if !slices.Contains(keyspaces[scyllaclient.KeyspaceTypeNonLocal], ks) {
continue
}
// Here we only skip system cdc tables
if slices.Contains(keyspaces[scyllaclient.KeyspaceTypeUser], ks) {
continue
}
for _, t := range tabs {
if systemCDCTableRegex.MatchString(t) {
skip = append(skip, ks+"."+t)
}
}
}

// Skip user cdc tables
skip = append(skip, "*.*_scylla_cdc_log")

// Skip views
views, err := query.GetAllViews(session)
if err != nil {
return nil, errors.Wrap(err, "get cluster views")
}
skip = append(skip, views.List()...)

// Exclude collected patterns
out := make([]string, 0, len(skip))
for _, p := range skip {
out = append(out, "!"+p)
}
return out, nil
}

// errRestoreAuthAndServiceLevelsUnsupportedScyllaVersion means that restore auth and service levels procedure is not safe for used Scylla configuration.
var errRestoreAuthAndServiceLevelsUnsupportedScyllaVersion = errors.Errorf("restoring authentication and service levels is not supported for given ScyllaDB version")

// isRestoreAuthAndServiceLevelsFromSStablesSupported checks if restore auth and service levels procedure is supported for used Scylla configuration.
// Because of #3869 and #3875, there is no way fo SM to safely restore auth and service levels into cluster with
// version higher or equal to OSS 6.0 or ENT 2024.2.
func isRestoreAuthAndServiceLevelsFromSStablesSupported(ctx context.Context, client *scyllaclient.Client) error {
const (
ossConstraint = ">= 6.0, < 2000"
entConstraint = ">= 2024.2, > 1000"
)

status, err := client.Status(ctx)
if err != nil {
return errors.Wrap(err, "get status")
}
for _, n := range status {
ni, err := client.NodeInfo(ctx, n.Addr)
if err != nil {
return errors.Wrapf(err, "get node %s info", n.Addr)
}

ossNotSupported, err := version.CheckConstraint(ni.ScyllaVersion, ossConstraint)
if err != nil {
return errors.Wrapf(err, "check version constraint for %s", n.Addr)
}
entNotSupported, err := version.CheckConstraint(ni.ScyllaVersion, entConstraint)
if err != nil {
return errors.Wrapf(err, "check version constraint for %s", n.Addr)
}

if ossNotSupported || entNotSupported {
return errRestoreAuthAndServiceLevelsUnsupportedScyllaVersion
}
}

return nil
}
Loading