Skip to content

Commit

Permalink
feat(restore): use described schema for restore
Browse files Browse the repository at this point in the history
Restoring from schema file (new format) is always better and safe.
  • Loading branch information
Michal-Leszczynski authored and karol-kokoszka committed Jun 20, 2024
1 parent f43c310 commit 2b76b85
Show file tree
Hide file tree
Showing 3 changed files with 171 additions and 12 deletions.
147 changes: 147 additions & 0 deletions pkg/service/restore/schema_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,16 @@
package restore

import (
"compress/gzip"
"context"
"encoding/json"
stdErr "errors"
"io"
"path"
"strings"

"github.com/pkg/errors"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"go.uber.org/atomic"

Expand Down Expand Up @@ -52,6 +58,10 @@ func (w *schemaWorker) stageRestoreData(ctx context.Context) error {
w.logger.Info(ctx, "Started restoring schema")
defer w.logger.Info(ctx, "Restoring schema finished")

if w.describedSchema != nil {
return w.restoreFromSchemaFile(ctx)
}

status, err := w.client.Status(ctx)
if err != nil {
return errors.Wrap(err, "get status")
Expand Down Expand Up @@ -123,6 +133,47 @@ func (w *schemaWorker) stageRestoreData(ctx context.Context) error {
return nil
}

func (w *schemaWorker) restoreFromSchemaFile(ctx context.Context) error {
w.logger.Info(ctx, "Apply schema CQL statements")
start := timeutc.Now()
for _, row := range *w.describedSchema {
if row.Keyspace == "system_replicated_keys" {
// See https://github.com/scylladb/scylla-enterprise/issues/4168
continue
}
// Sometimes a single object might require multiple CQL statements (e.g. table with dropped and added column)
for _, stmt := range parseCQLStatement(row.CQLStmt) {
if err := w.clusterSession.ExecStmt(stmt); err != nil {
return errors.Wrapf(err, "create %s (%s) with %s", row.Name, row.Keyspace, stmt)
}
}
}
end := timeutc.Now()

// Insert dummy run for some progress display
for _, u := range w.run.Units {
for _, t := range u.Tables {
pr := RunProgress{
ClusterID: w.run.ClusterID,
TaskID: w.run.TaskID,
RunID: w.run.ID,
ManifestPath: "DESCRIBE SCHEMA WITH INTERNALS",
Keyspace: u.Keyspace,
Table: t.Table,
DownloadStartedAt: &start,
DownloadCompletedAt: &start,
RestoreStartedAt: &start,
RestoreCompletedAt: &end,
Downloaded: t.Size,
}
w.insertRunProgress(ctx, &pr)
}
}

w.logger.Info(ctx, "Restored schema from schema file")
return nil
}

func (w *schemaWorker) locationDownloadHandler(ctx context.Context, location Location) error {
w.logger.Info(ctx, "Downloading schema from location", "location", location)
defer w.logger.Info(ctx, "Downloading schema from location finished", "location", location)
Expand Down Expand Up @@ -273,3 +324,99 @@ func (w *schemaWorker) getFileNamesMapping(sstables []string, sstableUUIDFormat
}
return sstable.RenameToIDs(sstables, &w.generationCnt)
}

func getDescribedSchema(ctx context.Context, client *scyllaclient.Client, snapshotTag string, locHost map[Location][]string) (schema *query.DescribedSchema, err error) {
baseDir := path.Join("backup", string(SchemaDirKind))
// It's enough to get a single schema file, but it's important to validate
// that each location contains exactly one or none of them.
var (
host string
schemaPath *string
foundCnt int
)
for l, hosts := range locHost {
host = hosts[0]
schemaPath, err = getRemoteSchemaFilePath(ctx, client, snapshotTag, host, l.RemotePath(baseDir))
if err != nil {
return nil, errors.Wrapf(err, "get schema file from %s", l.RemotePath(baseDir))
}
if schemaPath != nil {
foundCnt++
}
}

if foundCnt == 0 {
return nil, nil // nolint: nilnil
} else if foundCnt < len(locHost) {
return nil, errors.New("only a subset of provided locations has schema files")
}

r, err := client.RcloneOpen(ctx, host, *schemaPath)
if err != nil {
return nil, errors.Wrap(err, "open schema file")
}
defer func() {
err = stdErr.Join(err, errors.Wrap(r.Close(), "close schema file reader"))
}()

gzr, err := gzip.NewReader(r)
if err != nil {
return nil, errors.Wrap(err, "create gzip reader")
}
defer func() {
err = stdErr.Join(err, errors.Wrap(gzr.Close(), "close gzip reader"))
}()

rawSchema, err := io.ReadAll(gzr)
if err != nil {
return nil, errors.Wrap(err, "decompress schema")
}

schema = new(query.DescribedSchema)
if err := json.Unmarshal(rawSchema, schema); err != nil {
return nil, errors.Wrap(err, "unmarshal schema")
}

return schema, nil
}

// getRemoteSchemaFilePath returns path to the schema file with given snapshotTag.
// Both search and returned path are relative to the remotePath.
// In case schema file wasn't found, nil is returned.
func getRemoteSchemaFilePath(ctx context.Context, client *scyllaclient.Client, snapshotTag, host, remotePath string) (*string, error) {
opts := scyllaclient.RcloneListDirOpts{
FilesOnly: true,
Recurse: true,
}

var schemaPaths []string
err := client.RcloneListDirIter(ctx, host, remotePath, &opts, func(f *scyllaclient.RcloneListDirItem) {
if strings.HasSuffix(f.Name, RemoteSchemaFileSuffix(snapshotTag)) {
schemaPaths = append(schemaPaths, f.Path)
}
})
if err != nil {
return nil, errors.Wrapf(err, "iterate over schema dir")
}

if len(schemaPaths) == 0 {
return nil, nil // nolint: nilnil
}
if len(schemaPaths) > 1 {
return nil, errors.Errorf("many schema files with %s snapshot tag: %v", snapshotTag, schemaPaths)
}
schemaPath := path.Join(remotePath, schemaPaths[0])
return &schemaPath, nil
}

// parseCQLStatement splits composite CQL statement into a slice of single CQL statements.
func parseCQLStatement(cql string) []string {
var out []string
for _, stmt := range strings.Split(cql, ";") {
stmt = strings.TrimSpace(stmt)
if stmt != "" {
out = append(out, stmt)
}
}
return out
}
2 changes: 1 addition & 1 deletion pkg/service/restore/service_restore_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1705,7 +1705,7 @@ func getBucketKeyspaceUser(t *testing.T) (string, string, string) {
func (h *restoreTestHelper) shouldSkipTest(targets ...Target) {
for _, target := range targets {
if target.RestoreSchema {
if err := IsRestoreSchemaSupported(context.Background(), h.Client); err != nil {
if err := IsRestoreSchemaFromSSTablesSupported(context.Background(), h.Client); err != nil {
if errors.Is(err, ErrRestoreSchemaUnsupportedScyllaVersion) {
h.T.Skip(err)
}
Expand Down
34 changes: 23 additions & 11 deletions pkg/service/restore/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ import (

// restoreWorkerTools consists of utils common for both schemaWorker and tablesWorker.
type worker struct {
run *Run
target Target
run *Run
target Target
describedSchema *query.DescribedSchema

config Config
logger log.Logger
Expand Down Expand Up @@ -116,12 +117,6 @@ func (w *worker) initTarget(ctx context.Context, properties json.RawMessage) err
return errors.Wrap(err, "verify all nodes availability")
}

if t.RestoreSchema {
if err := IsRestoreSchemaSupported(ctx, w.client); err != nil {
return err
}
}

allLocations := strset.New()
locationHosts := make(map[Location][]string)
for _, l := range t.Location {
Expand Down Expand Up @@ -166,19 +161,36 @@ func (w *worker) initTarget(ctx context.Context, properties json.RawMessage) err

w.target = t
w.run.SnapshotTag = t.SnapshotTag
w.logger.Info(ctx, "Initialized target", "target", t)

if t.RestoreSchema {
w.logger.Info(ctx, "Look for schema file")
w.describedSchema, err = getDescribedSchema(ctx, w.client, t.SnapshotTag, locationHosts)
if err != nil {
return errors.Wrap(err, "look for schema file")
}

if w.describedSchema == nil {
w.logger.Info(ctx, "Couldn't find schema file. Proceeding with schema restoration using sstables")
if err := IsRestoreSchemaFromSSTablesSupported(ctx, w.client); err != nil {
return errors.Wrap(err, "check safety of restoring schema from sstables")
}
} else {
w.logger.Info(ctx, "Found schema file")
}
}

w.logger.Info(ctx, "Initialized target", "target", t)
return nil
}

// ErrRestoreSchemaUnsupportedScyllaVersion means that restore schema procedure is not safe for used Scylla configuration.
var ErrRestoreSchemaUnsupportedScyllaVersion = errors.Errorf("restore into cluster with given ScyllaDB version and consistent_cluster_management is not supported. " +
"See https://manager.docs.scylladb.com/stable/restore/restore-schema.html for a workaround.")

// IsRestoreSchemaSupported checks if restore schema procedure is supported for used Scylla configuration.
// IsRestoreSchemaFromSSTablesSupported checks if restore schema procedure is supported for used Scylla configuration.
// Because of #3662, there is no way fo SM to safely restore schema into cluster with consistent_cluster_management
// and version higher or equal to OSS 5.4 or ENT 2024. There is a documented workaround in SM docs.
func IsRestoreSchemaSupported(ctx context.Context, client *scyllaclient.Client) error {
func IsRestoreSchemaFromSSTablesSupported(ctx context.Context, client *scyllaclient.Client) error {
const (
DangerousConstraintOSS = ">= 6.0, < 2000"
DangerousConstraintENT = ">= 2024.2, > 1000"
Expand Down

0 comments on commit 2b76b85

Please sign in to comment.