-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathtenant_upgrade_test.go
363 lines (344 loc) · 13.4 KB
/
tenant_upgrade_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt
package kvtenantccl_test
import (
"context"
gosql "database/sql"
"net/url"
"testing"
"time"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slinstance"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/upgrade"
"github.com/cockroachdb/cockroach/pkg/upgrade/upgrades"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/stretchr/testify/require"
)
// TestTenantUpgrade exercises the case where a system tenant is in a
// non-finalized version state and creates a tenant. The test ensures
// that the newly created tenant begins in that same version.
//
// The first subtest creates the tenant in the mixed version state,
// then upgrades the system tenant, then upgrades the secondary tenant,
// and ensures everything is happy. It then restarts the tenant and ensures
// that the cluster version is properly set.
//
// The second subtest creates a new tenant after the system tenant has been
// upgraded and ensures that it is created at the final cluster version. It
// also verifies that the version is correct after a restart
func TestTenantUpgrade(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
false, // initializeVersion
)
// Initialize the version to the BinaryMinSupportedVersion.
require.NoError(t, clusterversion.Initialize(ctx,
clusterversion.TestingBinaryMinSupportedVersion, &settings.SV))
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
// Test validates tenant behavior. No need for the default test
// tenant.
DisableDefaultTestTenant: true,
Settings: settings,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BinaryVersionOverride: clusterversion.TestingBinaryMinSupportedVersion,
},
},
},
})
defer tc.Stopper().Stop(ctx)
connectToTenant := func(t *testing.T, addr string) (_ *gosql.DB, cleanup func()) {
pgURL, cleanupPGUrl := sqlutils.PGUrl(t, addr, "Tenant", url.User(username.RootUser))
tenantDB, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
return tenantDB, func() {
tenantDB.Close()
cleanupPGUrl()
}
}
expectedInitialTenantVersion, _, _ := v0v1v2()
mkTenant := func(t *testing.T, id uint64) (tenantDB *gosql.DB, cleanup func()) {
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
false, // initializeVersion
)
// Initialize the version to the minimum it could be.
require.NoError(t, clusterversion.Initialize(ctx,
expectedInitialTenantVersion, &settings.SV))
tenantArgs := base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(id),
TestingKnobs: base.TestingKnobs{},
Settings: settings,
}
tenant, err := tc.Server(0).StartTenant(ctx, tenantArgs)
require.NoError(t, err)
return connectToTenant(t, tenant.SQLAddr())
}
t.Run("upgrade tenant", func(t *testing.T) {
// Create a tenant before upgrading anything and verify its version.
const initialTenantID = 10
initialTenant, cleanup := mkTenant(t, initialTenantID)
initialTenantRunner := sqlutils.MakeSQLRunner(initialTenant)
// Ensure that the tenant works.
initialTenantRunner.CheckQueryResults(t, "SHOW CLUSTER SETTING version",
[][]string{{expectedInitialTenantVersion.String()}})
initialTenantRunner.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)")
initialTenantRunner.Exec(t, "INSERT INTO t VALUES (1), (2)")
// Upgrade the host cluster.
sqlutils.MakeSQLRunner(tc.ServerConn(0)).Exec(t,
"SET CLUSTER SETTING version = $1",
clusterversion.TestingBinaryVersion.String())
// Ensure that the tenant still works.
initialTenantRunner.CheckQueryResults(t, "SELECT * FROM t", [][]string{{"1"}, {"2"}})
// Upgrade the tenant cluster.
initialTenantRunner.Exec(t,
"SET CLUSTER SETTING version = $1",
clusterversion.TestingBinaryVersion.String())
// Ensure that the tenant still works.
initialTenantRunner.CheckQueryResults(t, "SELECT * FROM t", [][]string{{"1"}, {"2"}})
initialTenantRunner.CheckQueryResults(t, "SHOW CLUSTER SETTING version",
[][]string{{clusterversion.TestingBinaryVersion.String()}})
// Restart the tenant and ensure that the version is correct.
cleanup()
{
tenantServer, err := tc.Server(0).StartTenant(ctx, base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(initialTenantID),
})
require.NoError(t, err)
initialTenant, cleanup = connectToTenant(t, tenantServer.SQLAddr())
defer cleanup()
initialTenantRunner = sqlutils.MakeSQLRunner(initialTenant)
}
initialTenantRunner.CheckQueryResults(t, "SELECT * FROM t", [][]string{{"1"}, {"2"}})
initialTenantRunner.CheckQueryResults(t, "SHOW CLUSTER SETTING version",
[][]string{{clusterversion.TestingBinaryVersion.String()}})
})
t.Run("post-upgrade tenant", func(t *testing.T) {
// Create a new tenant and ensure it has the right version.
const postUpgradeTenantID = 11
postUpgradeTenant, cleanup := mkTenant(t, postUpgradeTenantID)
sqlutils.MakeSQLRunner(postUpgradeTenant).CheckQueryResults(t,
"SHOW CLUSTER SETTING version",
[][]string{{clusterversion.TestingBinaryVersion.String()}})
// Restart the new tenant and ensure it has the right version.
cleanup()
{
tenantServer, err := tc.Server(0).StartTenant(ctx, base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(postUpgradeTenantID),
})
require.NoError(t, err)
postUpgradeTenant, cleanup = connectToTenant(t, tenantServer.SQLAddr())
defer cleanup()
}
sqlutils.MakeSQLRunner(postUpgradeTenant).CheckQueryResults(t,
"SHOW CLUSTER SETTING version",
[][]string{{clusterversion.TestingBinaryVersion.String()}})
})
}
// Returns two versions v0, v1, v2 which correspond to adjacent releases.
func v0v1v2() (roachpb.Version, roachpb.Version, roachpb.Version) {
v0 := clusterversion.ByKey(clusterversion.V22_1)
v1 := clusterversion.TestingBinaryVersion
v2 := clusterversion.TestingBinaryVersion
if v1.Internal > 2 {
v1.Internal -= 2
} else {
v2.Internal += 2
}
return v0, v1, v2
}
// TestTenantUpgradeFailure exercises cases where the tenant dies
// between version upgrades.
func TestTenantUpgradeFailure(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
// Contains information for starting a tenant
// and maintaining a stopper.
type tenantInfo struct {
v2onMigrationStopper *stop.Stopper
tenantArgs *base.TestTenantArgs
}
v0, v1, v2 := v0v1v2()
ctx := context.Background()
settings := cluster.MakeTestingClusterSettingsWithVersions(
v2,
v0,
false, // initializeVersion
)
// Initialize the version to the BinaryMinSupportedVersion.
tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
// Test validates tenant behavior. No need for the default test
// tenant here.
DisableDefaultTestTenant: true,
Settings: settings,
Knobs: base.TestingKnobs{
Server: &server.TestingKnobs{
DisableAutomaticVersionUpgrade: make(chan struct{}),
BinaryVersionOverride: v0,
},
},
},
})
defer tc.Stopper().Stop(ctx)
// Channel for stopping a tenant.
tenantStopperChannel := make(chan struct{})
startAndConnectToTenant := func(t *testing.T, tenantInfo *tenantInfo) (_ *gosql.DB, cleanup func()) {
tenant, err := tc.Server(0).StartTenant(ctx, *tenantInfo.tenantArgs)
require.NoError(t, err)
pgURL, cleanupPGUrl := sqlutils.PGUrl(t, tenant.SQLAddr(), "Tenant", url.User(username.RootUser))
tenantDB, err := gosql.Open("postgres", pgURL.String())
require.NoError(t, err)
return tenantDB, func() {
tenantDB.Close()
cleanupPGUrl()
}
}
mkTenant := func(t *testing.T, id uint64) *tenantInfo {
settings := cluster.MakeTestingClusterSettingsWithVersions(
v2,
v0,
false, // initializeVersion
)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 3*time.Second)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 500*time.Millisecond)
v2onMigrationStopper := stop.NewStopper()
// Initialize the version to the minimum it could be.
require.NoError(t, clusterversion.Initialize(ctx,
v0, &settings.SV))
tenantArgs := base.TestTenantArgs{
Stopper: v2onMigrationStopper,
TenantID: roachpb.MakeTenantID(id),
TestingKnobs: base.TestingKnobs{
JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(),
UpgradeManager: &upgrade.TestingKnobs{
ListBetweenOverride: func(from, to clusterversion.ClusterVersion) []clusterversion.ClusterVersion {
return []clusterversion.ClusterVersion{{Version: v1}, {Version: v2}}
},
RegistryOverride: func(cv clusterversion.ClusterVersion) (upgrade.Upgrade, bool) {
switch cv.Version {
case v1:
return upgrade.NewTenantUpgrade("testing", clusterversion.ClusterVersion{
Version: v1,
},
upgrades.NoPrecondition,
func(
ctx context.Context, version clusterversion.ClusterVersion, deps upgrade.TenantDeps, _ *jobs.Job,
) error {
return nil
}), true
case v2:
return upgrade.NewTenantUpgrade("testing next", clusterversion.ClusterVersion{
Version: v2,
},
upgrades.NoPrecondition,
func(
ctx context.Context, version clusterversion.ClusterVersion, deps upgrade.TenantDeps, _ *jobs.Job,
) error {
tenantStopperChannel <- struct{}{}
return nil
}), true
default:
panic("Unexpected version number observed.")
}
},
},
},
Settings: settings,
}
return &tenantInfo{tenantArgs: &tenantArgs,
v2onMigrationStopper: v2onMigrationStopper}
}
t.Run("upgrade tenant have it crash then resume", func(t *testing.T) {
// Create a tenant before upgrading anything and verify its version.
const initialTenantID = 10
tenantInfo := mkTenant(t, initialTenantID)
tenant, cleanup := startAndConnectToTenant(t, tenantInfo)
initialTenantRunner := sqlutils.MakeSQLRunner(tenant)
// Ensure that the tenant works.
initialTenantRunner.CheckQueryResults(t, "SHOW CLUSTER SETTING version",
[][]string{{v0.String()}})
initialTenantRunner.Exec(t, "CREATE TABLE t (i INT PRIMARY KEY)")
initialTenantRunner.Exec(t, "INSERT INTO t VALUES (1), (2)")
// Use to wait for tenant crash leading to a clean up.
waitForTenantClose := make(chan struct{})
// Cause the upgrade to crash on v1.
go func() {
<-tenantStopperChannel
tenant.Close()
tenantInfo.v2onMigrationStopper.Stop(ctx)
waitForTenantClose <- struct{}{}
}()
// Upgrade the host cluster to the latest version.
sqlutils.MakeSQLRunner(tc.ServerConn(0)).Exec(t,
"SET CLUSTER SETTING version = $1",
clusterversion.TestingBinaryVersion.String())
// Ensure that the tenant still works.
initialTenantRunner.CheckQueryResults(t, "SELECT * FROM t", [][]string{{"1"}, {"2"}})
// Upgrade the tenant cluster, but the upgrade will fail on v1.
initialTenantRunner.ExpectErr(t,
".*(database is closed|failed to connect|closed network connection)+",
"SET CLUSTER SETTING version = $1",
v2.String())
<-waitForTenantClose
cleanup()
tenantInfo = mkTenant(t, initialTenantID)
tenant, cleanup = startAndConnectToTenant(t, tenantInfo)
initialTenantRunner = sqlutils.MakeSQLRunner(tenant)
// Ensure that the tenant still works and the target
// version wasn't reached.
initialTenantRunner.CheckQueryResults(t, "SELECT * FROM t", [][]string{{"1"}, {"2"}})
initialTenantRunner.CheckQueryResults(t, "SHOW CLUSTER SETTING version",
[][]string{{v1.String()}})
// Restart the tenant and ensure that the version is correct.
cleanup()
{
tca, cleanup := startAndConnectToTenant(t, tenantInfo)
defer cleanup()
initialTenantRunner = sqlutils.MakeSQLRunner(tca)
}
// Keep trying to resume the stopper channel until the channel is closed,
// since we may repeatedly wait on it due to transaction retries. In
// the other case the stopper is used, so no such risk exists.
go func() {
for {
_, ok := <-tenantStopperChannel
if !ok {
return
}
}
}()
// Upgrade the tenant cluster.
initialTenantRunner.Exec(t,
"SET CLUSTER SETTING version = $1",
v2.String())
close(tenantStopperChannel)
// Validate the target version has been reached.
initialTenantRunner.CheckQueryResults(t, "SELECT * FROM t", [][]string{{"1"}, {"2"}})
initialTenantRunner.CheckQueryResults(t, "SHOW CLUSTER SETTING version",
[][]string{{v2.String()}})
tenantInfo.v2onMigrationStopper.Stop(ctx)
})
}