-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathdatabase.go
130 lines (116 loc) · 4.09 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// Copyright 2015 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package sql
import (
"context"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/errors"
)
//
// This file contains routines for low-level access to stored database
// descriptors, as well as accessors for the database cache.
//
// For higher levels in the SQL layer, these interface are likely not
// suitable; consider instead schema_accessors.go and resolver.go.
//
// renameDatabase implements the DatabaseDescEditor interface.
func (p *planner) renameDatabase(
ctx context.Context, desc *dbdesc.Mutable, newName string, stmt string,
) error {
oldNameKey := descpb.NameInfo{
ParentID: desc.GetParentID(),
ParentSchemaID: desc.GetParentSchemaID(),
Name: desc.GetName(),
}
// Check that the new name is available.
if dbID, err := p.Descriptors().Direct().LookupDatabaseID(ctx, p.txn, newName); err == nil && dbID != descpb.InvalidID {
return pgerror.Newf(pgcode.DuplicateDatabase,
"the new database name %q already exists", newName)
} else if err != nil {
return err
}
// Update the descriptor with the new name.
desc.SetName(newName)
// Populate the namespace update batch.
b := p.txn.NewBatch()
p.renameNamespaceEntry(ctx, b, oldNameKey, desc)
// Write the updated database descriptor.
if err := p.writeNonDropDatabaseChange(ctx, desc, stmt); err != nil {
return err
}
// Run the namespace update batch.
return p.txn.Run(ctx, b)
}
// writeNonDropDatabaseChange writes an updated database descriptor, and can
// only be called when database descriptor leasing is enabled. See
// writeDatabaseChangeToBatch. Also queues a job to complete the schema change.
func (p *planner) writeNonDropDatabaseChange(
ctx context.Context, desc *dbdesc.Mutable, jobDesc string,
) error {
if err := p.createNonDropDatabaseChangeJob(ctx, desc.ID, jobDesc); err != nil {
return err
}
b := p.Txn().NewBatch()
if err := p.writeDatabaseChangeToBatch(ctx, desc, b); err != nil {
return err
}
return p.Txn().Run(ctx, b)
}
// writeDatabaseChangeToBatch writes an updated database descriptor, and
// can only be called when database descriptor leasing is enabled. Does not
// queue a job to complete the schema change.
func (p *planner) writeDatabaseChangeToBatch(
ctx context.Context, desc *dbdesc.Mutable, b *kv.Batch,
) error {
return p.Descriptors().WriteDescToBatch(
ctx,
p.extendedEvalCtx.Tracing.KVTracingEnabled(),
desc,
b,
)
}
// forEachMutableTableInDatabase calls the given function on every table
// descriptor inside the given database. Tables that have been
// dropped are skipped.
func (p *planner) forEachMutableTableInDatabase(
ctx context.Context,
dbDesc catalog.DatabaseDescriptor,
fn func(ctx context.Context, scName string, tbDesc *tabledesc.Mutable) error,
) error {
all, err := p.Descriptors().GetAllDescriptors(ctx, p.txn)
if err != nil {
return err
}
lCtx := newInternalLookupCtx(all.OrderedDescriptors(), dbDesc)
for _, tbID := range lCtx.tbIDs {
desc := lCtx.tbDescs[tbID]
if desc.Dropped() {
continue
}
mutable := tabledesc.NewBuilder(desc.TableDesc()).BuildExistingMutableTable()
schemaName, found, err := lCtx.GetSchemaName(ctx, desc.GetParentSchemaID(), desc.GetParentID(), p.ExecCfg().Settings.Version)
if err != nil {
return err
}
if !found {
return errors.AssertionFailedf("schema id %d not found", desc.GetParentSchemaID())
}
if err := fn(ctx, schemaName, mutable); err != nil {
return err
}
}
return nil
}