Skip to content

Commit

Permalink
Merge #137850
Browse files Browse the repository at this point in the history
137850: workload: handle new RESTORE results when importing fixtures r=srosenberg a=DarrylWong

In 03e2409, two columns were removed from the RESTORE results. This change supports handling this. It also removes logging around bytes restored as it is no longer returned.

Fixes: #137671
Release note: none
Epic: none

Co-authored-by: DarrylWong <[email protected]>
  • Loading branch information
craig[bot] and DarrylWong committed Dec 23, 2024
2 parents 5e0c76a + 05f8645 commit 8d61a42
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 32 deletions.
1 change: 0 additions & 1 deletion pkg/ccl/workloadccl/cliccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/ccl/workloadccl",
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/timeutil",
"//pkg/workload",
Expand Down
12 changes: 7 additions & 5 deletions pkg/ccl/workloadccl/cliccl/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"strings"

"github.com/cockroachdb/cockroach/pkg/ccl/workloadccl"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload"
Expand Down Expand Up @@ -268,14 +267,17 @@ func (l restoreDataLoader) InitialDataLoad(
) (int64, error) {
log.Infof(ctx, "starting restore of %d tables", len(gen.Tables()))
start := timeutil.Now()
bytes, err := workloadccl.RestoreFixture(ctx, db, l.fixture, l.database, true /* injectStats */)
err := workloadccl.RestoreFixture(ctx, db, l.fixture, l.database, true /* injectStats */)
if err != nil {
return 0, errors.Wrap(err, `restoring fixture`)
}
elapsed := timeutil.Since(start)
log.Infof(ctx, "restored %s bytes in %d tables (took %s, %s)",
humanizeutil.IBytes(bytes), len(gen.Tables()), elapsed, humanizeutil.DataRate(bytes, elapsed))
return bytes, nil
log.Infof(ctx, "restored %d tables (took %s)",
len(gen.Tables()), elapsed)
// As of #134516, RESTORE no longer returns the number of bytes restored.
// We still return 0 here to implement the interface, although as of right
// now the value is never used.
return 0, nil
}

func fixturesLoad(gen workload.Generator, urls []string, dbName string) error {
Expand Down
36 changes: 11 additions & 25 deletions pkg/ccl/workloadccl/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,8 +608,7 @@ func makeQualifiedTableName(dbName string, table *workload.Table) string {
// license is required to have been set in the cluster.
func RestoreFixture(
ctx context.Context, sqlDB *gosql.DB, fixture Fixture, database string, injectStats bool,
) (int64, error) {
var bytesAtomic int64
) error {
g := ctxgroup.WithContext(ctx)
genName := fixture.Generator.Meta().Name
tables := fixture.Generator.Tables()
Expand All @@ -628,7 +627,7 @@ func RestoreFixture(
start := timeutil.Now()
restoreStmt := fmt.Sprintf(`RESTORE %s.%s FROM LATEST IN $1 WITH into_db=$2, unsafe_restore_incompatible_version`, genName, table.TableName)
log.Infof(ctx, "Restoring from %s", table.BackupURI)
var rows, index, tableBytes int64
var rows int64
var discard interface{}
res, err := sqlDB.Query(restoreStmt, table.BackupURI, database)
if err != nil {
Expand All @@ -641,46 +640,33 @@ func RestoreFixture(
}
return gosql.ErrNoRows
}
resCols, err := res.Columns()
if err != nil {
if err := res.Scan(
&discard, &discard, &discard, &rows,
); err != nil {
return err
}
if len(resCols) == 7 {
if err := res.Scan(
&discard, &discard, &discard, &rows, &index, &discard, &tableBytes,
); err != nil {
return err
}
} else {
if err := res.Scan(
&discard, &discard, &discard, &rows, &index, &tableBytes,
); err != nil {
return err
}
}
atomic.AddInt64(&bytesAtomic, tableBytes)

elapsed := timeutil.Since(start)
log.Infof(ctx, `loaded %s table %s in %s (%d rows, %d index entries, %s)`,
humanizeutil.IBytes(tableBytes), table.TableName, elapsed, rows, index,
humanizeutil.IBytes(int64(float64(tableBytes)/elapsed.Seconds())))
log.Infof(ctx, `loaded table %s in %s (%d rows)`,
table.TableName, elapsed, rows)
return nil
})
}
if err := g.Wait(); err != nil {
return 0, err
return err
}
if injectStats {
for i := range tables {
t := &tables[i]
if len(t.Stats) > 0 {
qualifiedTableName := makeQualifiedTableName(genName, t)
if err := injectStatistics(qualifiedTableName, t, sqlDB); err != nil {
return 0, err
return err
}
}
}
}
return atomic.LoadInt64(&bytesAtomic), nil
return nil
}

func listDir(
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/workloadccl/fixture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestFixture(t *testing.T) {
}

sqlDB.Exec(t, `CREATE DATABASE test`)
if _, err := workloadccl.RestoreFixture(ctx, db, fixture, `test`, false); err != nil {
if err := workloadccl.RestoreFixture(ctx, db, fixture, `test`, false); err != nil {
t.Fatalf(`%+v`, err)
}
sqlDB.CheckQueryResults(t,
Expand Down

0 comments on commit 8d61a42

Please sign in to comment.