Skip to content

Commit

Permalink
feat(backup): use DESCRIBE SCHEMA WITH INTERNALS for schema backup
Browse files Browse the repository at this point in the history
This commit moves from using driver KeyspaceMetadata method to CQL query for describing schema. Moreover, in order not to lose any information returned from Scylla, saved schema file format is changed.

Schema file name depends on whether it's safe to use it for restore purposes.
Files ending with 'schema_with_internals.json.gz' come from Scylla version >= 6.0 and are safe for restore.
Files ending with 'schema.json.gz' come from Scylla version < 6.0 and aren't safe for restore, but can still be useful for the user or debugging.

The new file format represents parsed DescribedSchema in a json format (no more tar archive). Json format allows for preserving additional information like object keyspace, type and name, which would be lost in a cql file.
  • Loading branch information
Michal-Leszczynski authored and karol-kokoszka committed Jun 20, 2024
1 parent 9a7e886 commit e20a213
Show file tree
Hide file tree
Showing 7 changed files with 134 additions and 83 deletions.
31 changes: 29 additions & 2 deletions pkg/service/backup/backupspec/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@ const (
MetadataVersion = ".version"
// Manifest is name of the manifest file.
Manifest = "manifest.json.gz"
// Schema is the name of the schema file.
Schema = "schema.tar.gz"
// Schema is the name of the schema file used for restore
// (so for Scylla versions starting from 6.0).
Schema = "schema_with_internals.json.gz"
// UnsafeSchema is the name of the schema file that shouldn't be used for restore
// (so for Scylla versions older than 6.0).
UnsafeSchema = "schema.json.gz"
// TempFileExt is suffix for the temporary files.
TempFileExt = ".tmp"

Expand Down Expand Up @@ -85,10 +89,33 @@ func RemoteSchemaFile(clusterID, taskID uuid.UUID, snapshotTag string) string {
manifestName := strings.Join([]string{
"task",
taskID.String(),
RemoteSchemaFileSuffix(snapshotTag),
}, "_")

return path.Join(
remoteSchemaDir(clusterID),
manifestName,
)
}

// RemoteSchemaFileSuffix returns suffix (containing snapshot tag) of the schema file.
func RemoteSchemaFileSuffix(snapshotTag string) string {
return strings.Join([]string{
"tag",
snapshotTag,
Schema,
}, "_")
}

// RemoteUnsafeSchemaFile returns path to the unsafe schema file.
func RemoteUnsafeSchemaFile(clusterID, taskID uuid.UUID, snapshotTag string) string {
manifestName := strings.Join([]string{
"task",
taskID.String(),
"tag",
snapshotTag,
UnsafeSchema,
}, "_")

return path.Join(
remoteSchemaDir(clusterID),
Expand Down
7 changes: 6 additions & 1 deletion pkg/service/backup/purger.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,13 @@ func (p purger) PurgeSnapshotTags(ctx context.Context, manifests []*ManifestInfo
deletedManifests := 0
for _, m := range manifests {
if tags.Has(m.SnapshotTag) {
// Note that schema files might not be backed up in the first place
unsafePath := RemoteUnsafeSchemaFile(m.ClusterID, m.TaskID, m.SnapshotTag)
if _, err := p.deleteFile(ctx, m.Location.RemotePath(unsafePath)); err != nil {
p.logger.Info(ctx, "Remove unsafe schema file", "path", unsafePath, "error", err)
}
if _, err := p.deleteFile(ctx, m.Location.RemotePath(m.SchemaPath())); err != nil {
p.logger.Info(ctx, "Failed to remove schema file", "path", m.SchemaPath(), "error", err)
p.logger.Info(ctx, "Remove schema file", "path", m.SchemaPath(), "error", err)
}
if _, err := p.deleteFile(ctx, m.Location.RemotePath(m.Path())); err != nil {
p.logger.Info(ctx, "Failed to remove manifest", "path", m.Path(), "error", err)
Expand Down
60 changes: 0 additions & 60 deletions pkg/service/backup/schema.go

This file was deleted.

17 changes: 11 additions & 6 deletions pkg/service/backup/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
return nil
}
stageFunc := map[Stage]func() error{
StageAwaitSchema: func() error { // nolint: unparam
StageAwaitSchema: func() error {
clusterSession, err := s.clusterSession(ctx, clusterID)
if err != nil {
w.Logger.Info(ctx, "No CQL cluster session, backup of schema as CQL files will be skipped", "error", err)
Expand All @@ -822,13 +822,18 @@ func (s *Service) Backup(ctx context.Context, clusterID, taskID, runID uuid.UUID
defer clusterSession.Close()

if err := w.AwaitSchemaAgreement(ctx, clusterSession); err != nil {
w.Logger.Info(ctx, "Couldn't await schema agreement, backup of schema as CQL files will be skipped", "error", err)
return nil
return errors.Wrap(err, "await schema agreement")
}
if err = w.DumpSchema(ctx, clusterSession); err != nil {
w.Logger.Info(ctx, "Couldn't dump schema, backup of schema as CQL files will be skipped", "error", err)
w.Schema = nil

var hosts []string
for _, h := range hi {
hosts = append(hosts, h.IP)
}

if err := w.DumpSchema(ctx, clusterSession, hosts); err != nil {
return errors.Wrap(err, "dump schema")
}

return nil
},
StageSnapshot: func() error {
Expand Down
11 changes: 6 additions & 5 deletions pkg/service/backup/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,12 @@ type workerTools struct {
type worker struct {
workerTools

PrevStage Stage
Metrics metrics.BackupMetrics
Units []Unit
Schema *bytes.Buffer
OnRunProgress func(ctx context.Context, p *RunProgress)
PrevStage Stage
Metrics metrics.BackupMetrics
Units []Unit
Schema bytes.Buffer
SchemaFilePath string
OnRunProgress func(ctx context.Context, p *RunProgress)
// ResumeUploadProgress populates upload stats of the provided run progress
// with previous run progress.
// If there is no previous run there should be no update.
Expand Down
4 changes: 2 additions & 2 deletions pkg/service/backup/worker_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ func (w *worker) createTemporaryManifest(h hostInfo, tokens []int64) ManifestInf
},
Index: make([]FilesMeta, len(dirs)),
}
if w.Schema != nil {
c.Schema = RemoteSchemaFile(w.ClusterID, w.TaskID, w.SnapshotTag)
if w.SchemaFilePath != "" {
c.Schema = w.SchemaFilePath
}

for i, d := range dirs {
Expand Down
87 changes: 80 additions & 7 deletions pkg/service/backup/worker_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,21 @@
package backup

import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"time"

"github.com/pkg/errors"
"github.com/scylladb/go-set/strset"
"github.com/scylladb/gocqlx/v2"
. "github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/scyllaclient"
"github.com/scylladb/scylla-manager/v3/pkg/service/backup/backupspec"
"github.com/scylladb/scylla-manager/v3/pkg/util/parallel"
"github.com/scylladb/scylla-manager/v3/pkg/util/query"
"github.com/scylladb/scylla-manager/v3/pkg/util/retry"
"github.com/scylladb/scylla-manager/v3/pkg/util/version"
)

func (w *workerTools) AwaitSchemaAgreement(ctx context.Context, clusterSession gocqlx.Session) error {
Expand Down Expand Up @@ -61,17 +67,36 @@ func (w *workerTools) AwaitSchemaAgreement(ctx context.Context, clusterSession g
}, backoff, notify)
}

func (w *worker) DumpSchema(ctx context.Context, clusterSession gocqlx.Session) error {
b, err := createSchemaArchive(ctx, w.Units, clusterSession)
func (w *worker) DumpSchema(ctx context.Context, clusterSession gocqlx.Session, hosts []string) error {
schema, err := query.DescribeSchemaWithInternals(clusterSession)
if err != nil {
return errors.Wrap(err, "get schema")
return err
}

b, err := marshalAndCompressSchema(schema)
if err != nil {
return err
}

safe, err := isDescribeSchemaSafe(ctx, w.Client, hosts)
if err != nil {
return errors.Wrap(err, "check describe schema support")
}
if safe {
w.SchemaFilePath = backupspec.RemoteSchemaFile(w.ClusterID, w.TaskID, w.SnapshotTag)
} else {
w.SchemaFilePath = backupspec.RemoteUnsafeSchemaFile(w.ClusterID, w.TaskID, w.SnapshotTag)
w.Logger.Info(ctx, "Backing-up and restoring schema from output of "+
"DESCRIBE SCHEMA WITH INTERNALS is supported starting from ScyllaDB 6.0 or 2024.2. "+
"Previous versions restore schema via snapshot of system_schema sstables")
}

w.Schema = b
return nil
}

func (w *worker) UploadSchema(ctx context.Context, hosts []hostInfo) (stepError error) {
if w.Schema == nil {
if w.SchemaFilePath == "" {
w.Logger.Info(ctx, "No schema CQL file to upload")
return nil
}
Expand All @@ -87,8 +112,8 @@ func (w *worker) UploadSchema(ctx context.Context, hosts []hostInfo) (stepError
}

f := func(h hostInfo) error {
dst := h.Location.RemotePath(RemoteSchemaFile(w.ClusterID, w.TaskID, w.SnapshotTag))
return w.Client.RclonePut(ctx, h.IP, dst, w.Schema)
dst := h.Location.RemotePath(w.SchemaFilePath)
return w.Client.RclonePut(ctx, h.IP, dst, &w.Schema)
}

notify := func(h hostInfo, err error) {
Expand All @@ -100,3 +125,51 @@ func (w *worker) UploadSchema(ctx context.Context, hosts []hostInfo) (stepError

return hostsInParallel(hostPerLocation, parallel.NoLimit, f, notify)
}

func marshalAndCompressSchema(schema query.DescribedSchema) (bytes.Buffer, error) {
rawSchema, err := json.Marshal(schema)
if err != nil {
return bytes.Buffer{}, errors.Wrap(err, "marshal schema")
}

var b bytes.Buffer
gw := gzip.NewWriter(&b)
if _, err := gw.Write(rawSchema); err != nil {
return bytes.Buffer{}, errors.Wrap(err, "write compressed schema")
}
if err := gw.Close(); err != nil {
return bytes.Buffer{}, errors.Wrap(err, "close gzip writer")
}

return b, nil
}

// isDescribeSchemaSafe checks if restoring schema from DESCRIBE SCHEMA WITH INTERNALS is safe.
func isDescribeSchemaSafe(ctx context.Context, client *scyllaclient.Client, hosts []string) (bool, error) {
const (
ConstraintOSS = ">= 6.0, < 2000"
ConstraintENT = ">= 2024.2, > 1000"
)

for _, h := range hosts {
ni, err := client.NodeInfo(ctx, h)
if err != nil {
return false, errors.Wrapf(err, "get node %s info", h)
}

oss, err := version.CheckConstraint(ni.ScyllaVersion, ConstraintOSS)
if err != nil {
return false, errors.Wrapf(err, "check version constraint for %s", h)
}
ent, err := version.CheckConstraint(ni.ScyllaVersion, ConstraintENT)
if err != nil {
return false, errors.Wrapf(err, "check version constraint for %s", h)
}

if !oss && !ent {
return false, nil
}
}

return true, nil
}

0 comments on commit e20a213

Please sign in to comment.