Skip to content

Commit

Permalink
streamingccl: add WITH RETENTION option to CREATE TENANT
Browse files Browse the repository at this point in the history
This adds an WITH RETENTION = 'duration' option to CREATE TENANT to
allow the user to set the retention window at replication stream
creation time.

A future commit will support updating this option via SET as well.

Informs: #93441

Release note: None
  • Loading branch information
stevendanna committed Dec 20, 2022
1 parent 6eddf3e commit 16ed89c
Show file tree
Hide file tree
Showing 8 changed files with 154 additions and 26 deletions.
2 changes: 1 addition & 1 deletion docs/generated/sql/bnf/create_tenant_stmt.bnf
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
create_tenant_stmt ::=
'CREATE' 'TENANT' name
| 'CREATE' 'TENANT' name 'FROM' 'REPLICATION' 'OF' name 'ON' string_or_placeholder
| 'CREATE' 'TENANT' name 'FROM' 'REPLICATION' 'OF' name 'ON' string_or_placeholder opt_with_tenant_replication_options
14 changes: 13 additions & 1 deletion docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ create_external_connection_stmt ::=

create_tenant_stmt ::=
'CREATE' 'TENANT' name
| 'CREATE' 'TENANT' name 'FROM' 'REPLICATION' 'OF' name 'ON' string_or_placeholder
| 'CREATE' 'TENANT' name 'FROM' 'REPLICATION' 'OF' name 'ON' string_or_placeholder opt_with_tenant_replication_options

create_schedule_stmt ::=
create_schedule_for_changefeed_stmt
Expand Down Expand Up @@ -1299,6 +1299,7 @@ unreserved_keyword ::=
| 'RESTRICT'
| 'RESTRICTED'
| 'RESUME'
| 'RETENTION'
| 'RETRY'
| 'RETURN'
| 'RETURNS'
Expand Down Expand Up @@ -1736,6 +1737,11 @@ label_spec ::=
string_or_placeholder
| 'IF' 'NOT' 'EXISTS' string_or_placeholder

opt_with_tenant_replication_options ::=
'WITH' tenant_replication_options_list
| 'WITH' 'OPTIONS' '(' tenant_replication_options_list ')'
|

create_schedule_for_changefeed_stmt ::=
'CREATE' 'SCHEDULE' schedule_label_spec 'FOR' 'CHANGEFEED' changefeed_targets changefeed_sink opt_with_options cron_expr opt_with_schedule_options
| 'CREATE' 'SCHEDULE' schedule_label_spec 'FOR' 'CHANGEFEED' changefeed_sink opt_with_options 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause cron_expr opt_with_schedule_options
Expand Down Expand Up @@ -2457,6 +2463,9 @@ target_elem ::=
| a_expr
| '*'

tenant_replication_options_list ::=
( tenant_replication_options ) ( ( ',' tenant_replication_options ) )*

schedule_label_spec ::=
label_spec
|
Expand Down Expand Up @@ -3030,6 +3039,9 @@ bare_col_label ::=
'identifier'
| bare_label_keywords

tenant_replication_options ::=
'RETENTION' '=' string_or_placeholder

common_table_expr ::=
table_alias_name opt_col_def_list_no_types 'AS' '(' preparable_stmt ')'
| table_alias_name opt_col_def_list_no_types 'AS' materialize_clause '(' preparable_stmt ')'
Expand Down
32 changes: 25 additions & 7 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package streamingest

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
Expand All @@ -31,6 +32,10 @@ import (
"github.com/cockroachdb/errors"
)

// defaultRetentionTTLSeconds is the default value for how long
// replicated data will be retained.
const defaultRetentionTTLSeconds = 25 * 60 * 60

func streamIngestionJobDescription(
p sql.PlanHookState, streamIngestion *tree.CreateTenantFromReplication,
) (string, error) {
Expand All @@ -51,9 +56,12 @@ func ingestionTypeCheck(
return false, nil, nil
}
if err := exprutil.TypeCheck(ctx, "INGESTION", p.SemaCtx(),
exprutil.Strings{ingestionStmt.ReplicationSourceAddress}); err != nil {
exprutil.Strings{
ingestionStmt.ReplicationSourceAddress,
ingestionStmt.Options.Retention}); err != nil {
return false, nil, err
}

return true, resultColumns, nil
}

Expand Down Expand Up @@ -91,6 +99,21 @@ func ingestionPlanHook(
return nil, nil, nil, false, err
}

retentionTTLSeconds := defaultRetentionTTLSeconds
if ingestionStmt.Options.Retention != nil {
retentionStr, err := exprEval.String(ctx, ingestionStmt.Options.Retention)
if err != nil {
return nil, nil, nil, false, err
}
if retentionStr != "" {
r, err := time.ParseDuration(retentionStr)
if err != nil {
return nil, nil, nil, false, err
}
retentionTTLSeconds = int(r.Seconds())
}
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer span.Finish()
Expand Down Expand Up @@ -170,19 +193,14 @@ func ingestionPlanHook(
}

prefix := keys.MakeTenantPrefix(destinationTenantID)
// TODO(adityamaru): Wire this up to the user configurable option.
replicationTTLSeconds := 25 * 60 * 60
if knobs := p.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.OverrideReplicationTTLSeconds != 0 {
replicationTTLSeconds = knobs.OverrideReplicationTTLSeconds
}
streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: string(streamAddress),
StreamID: uint64(replicationProducerSpec.StreamID),
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
DestinationTenantID: destinationTenantID,
SourceTenantName: roachpb.TenantName(sourceTenant),
DestinationTenantName: roachpb.TenantName(destinationTenant),
ReplicationTTLSeconds: int32(replicationTTLSeconds),
ReplicationTTLSeconds: int32(retentionTTLSeconds),
ReplicationStartTime: replicationProducerSpec.ReplicationStartTime,
}

Expand Down
23 changes: 14 additions & 9 deletions pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type tenantStreamingClustersArgs struct {
destInitFunc destInitExecFunc
destNumNodes int
destClusterSettings map[string]string
retentionTTLSeconds int
testingKnobs *sql.StreamingTestingKnobs
}

Expand Down Expand Up @@ -195,11 +196,21 @@ func (c *tenantStreamingClusters) cutover(
// Returns producer job ID and ingestion job ID.
func (c *tenantStreamingClusters) startStreamReplication() (int, int) {
var ingestionJobID, streamProducerJobID int
streamReplStmt := fmt.Sprintf("CREATE TENANT %s FROM REPLICATION OF %s ON '%s'", c.args.destTenantName, c.args.srcTenantName, c.srcURL.String())
c.destSysSQL.QueryRow(c.t, streamReplStmt).Scan(&ingestionJobID, &streamProducerJobID)
c.destSysSQL.QueryRow(c.t, c.buildCreateTenantQuery()).Scan(&ingestionJobID, &streamProducerJobID)
return streamProducerJobID, ingestionJobID
}

func (c *tenantStreamingClusters) buildCreateTenantQuery() string {
streamReplStmt := fmt.Sprintf("CREATE TENANT %s FROM REPLICATION OF %s ON '%s'",
c.args.destTenantName,
c.args.srcTenantName,
c.srcURL.String())
if c.args.retentionTTLSeconds > 0 {
streamReplStmt = fmt.Sprintf("%s WITH RETENTION = '%ds'", streamReplStmt, c.args.retentionTTLSeconds)
}
return streamReplStmt
}

func waitForTenantPodsActive(
t testing.TB, tenantServer serverutils.TestTenantInterface, numPods int,
) {
Expand Down Expand Up @@ -1102,13 +1113,7 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) {
args := defaultTenantStreamingClustersArgs
// Override the replication job details ReplicationTTLSeconds to a small value
// so that every progress update results in a protected timestamp update.
//
// TODO(adityamaru): Once this is wired up to be user configurable via an
// option to `CREATE TENANT ... FROM REPLICATION` we should replace this
// testing knob with a create tenant option.
args.testingKnobs = &sql.StreamingTestingKnobs{
OverrideReplicationTTLSeconds: 1,
}
args.retentionTTLSeconds = 1

testProtectedTimestampManagement := func(t *testing.T, pauseBeforeTerminal bool, completeReplication bool) {
// waitForProducerProtection asserts that there is a PTS record protecting
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1638,10 +1638,6 @@ type StreamingTestingKnobs struct {
// frontier specs generated for the replication job.
AfterReplicationFlowPlan func([]*execinfrapb.StreamIngestionDataSpec,
*execinfrapb.StreamIngestionFrontierSpec)

// OverrideReplicationTTLSeconds will override the default value of the
// `ReplicationTTLSeconds` field on the StreamIngestion job details.
OverrideReplicationTTLSeconds int
}

var _ base.ModuleTestingKnobs = &StreamingTestingKnobs{}
Expand Down
49 changes: 45 additions & 4 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,9 @@ func (u *sqlSymUnion) functionObj() tree.FuncObj {
func (u *sqlSymUnion) functionObjs() tree.FuncObjs {
return u.val.(tree.FuncObjs)
}
func (u *sqlSymUnion) tenantReplicationOptions() *tree.TenantReplicationOptions {
return u.val.(*tree.TenantReplicationOptions)
}
%}

// NB: the %token definitions must come before the %type definitions in this
Expand Down Expand Up @@ -929,7 +932,7 @@ func (u *sqlSymUnion) functionObjs() tree.FuncObjs {
%token <str> RANGE RANGES READ REAL REASON REASSIGN RECURSIVE RECURRING REF REFERENCES REFRESH
%token <str> REGCLASS REGION REGIONAL REGIONS REGNAMESPACE REGPROC REGPROCEDURE REGROLE REGTYPE REINDEX
%token <str> RELATIVE RELOCATE REMOVE_PATH RENAME REPEATABLE REPLACE REPLICATION
%token <str> RELEASE RESET RESTART RESTORE RESTRICT RESTRICTED RESUME RETURNING RETURN RETURNS RETRY REVISION_HISTORY
%token <str> RELEASE RESET RESTART RESTORE RESTRICT RESTRICTED RESUME RETENTION RETURNING RETURN RETURNS RETRY REVISION_HISTORY
%token <str> REVOKE RIGHT ROLE ROLES ROLLBACK ROLLUP ROUTINES ROW ROWS RSHIFT RULE RUNNING

%token <str> SAVEPOINT SCANS SCATTER SCHEDULE SCHEDULES SCROLL SCHEMA SCHEMA_ONLY SCHEMAS SCRUB
Expand Down Expand Up @@ -1265,6 +1268,7 @@ func (u *sqlSymUnion) functionObjs() tree.FuncObjs {
%type <[]tree.KVOption> kv_option_list opt_with_options var_set_list opt_with_schedule_options
%type <*tree.BackupOptions> opt_with_backup_options backup_options backup_options_list
%type <*tree.RestoreOptions> opt_with_restore_options restore_options restore_options_list
%type <*tree.TenantReplicationOptions> opt_with_tenant_replication_options tenant_replication_options tenant_replication_options_list
%type <tree.ShowBackupDetails> show_backup_details
%type <*tree.CopyOptions> opt_with_copy_options copy_options copy_options_list
%type <str> import_format
Expand Down Expand Up @@ -3175,15 +3179,15 @@ backup_options:
}
| REVISION_HISTORY '=' a_expr
{
$$.val = &tree.BackupOptions{CaptureRevisionHistory: $3.expr()}
$$.val = &tree.BackupOptions{CaptureRevisionHistory: $3.expr()}
}
| DETACHED
{
$$.val = &tree.BackupOptions{Detached: tree.MakeDBool(true)}
}
| DETACHED '=' TRUE
{
$$.val = &tree.BackupOptions{Detached: tree.MakeDBool(true)}
$$.val = &tree.BackupOptions{Detached: tree.MakeDBool(true)}
}
| DETACHED '=' FALSE
{
Expand Down Expand Up @@ -4146,16 +4150,52 @@ create_tenant_stmt:
{
$$.val = &tree.CreateTenant{Name: tree.Name($3)}
}
| CREATE TENANT name FROM REPLICATION OF name ON string_or_placeholder
| CREATE TENANT name FROM REPLICATION OF name ON string_or_placeholder opt_with_tenant_replication_options
{
$$.val = &tree.CreateTenantFromReplication{
Name: tree.Name($3),
ReplicationSourceTenantName: tree.Name($7),
ReplicationSourceAddress: $9.expr(),
Options: *$10.tenantReplicationOptions(),
}
}
| CREATE TENANT error // SHOW HELP: CREATE TENANT

// Optional tenant replication options.
opt_with_tenant_replication_options:
WITH tenant_replication_options_list
{
$$.val = $2.tenantReplicationOptions()
}
| WITH OPTIONS '(' tenant_replication_options_list ')'
{
$$.val = $4.tenantReplicationOptions()
}
| /* EMPTY */
{
$$.val = &tree.TenantReplicationOptions{}
}

tenant_replication_options_list:
// Require at least one option
tenant_replication_options
{
$$.val = $1.tenantReplicationOptions()
}
| tenant_replication_options_list ',' tenant_replication_options
{
if err := $1.tenantReplicationOptions().CombineWith($3.tenantReplicationOptions()); err != nil {
return setErr(sqllex, err)
}
}

// List of valid tenant replication options.
tenant_replication_options:
RETENTION '=' string_or_placeholder
{
$$.val = &tree.TenantReplicationOptions{Retention: $3.expr()}
}

// %Help: CREATE SCHEDULE
// %Category: Group
// %Text:
Expand Down Expand Up @@ -15803,6 +15843,7 @@ unreserved_keyword:
| RESTRICT
| RESTRICTED
| RESUME
| RETENTION
| RETRY
| RETURN
| RETURNS
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/parser/testdata/create_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,19 @@ CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON ('pgurl') -- fully parenthesized
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON '_' -- literals removed
CREATE TENANT _ FROM REPLICATION OF _ ON 'pgurl' -- identifiers removed

parse
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH RETENTION = '36h'
----
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH RETENTION = '36h'
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON ('pgurl') WITH RETENTION = ('36h') -- fully parenthesized
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON '_' WITH RETENTION = '_' -- literals removed
CREATE TENANT _ FROM REPLICATION OF _ ON 'pgurl' WITH RETENTION = '36h' -- identifiers removed

parse
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH OPTIONS (RETENTION = '36h')
----
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH RETENTION = '36h' -- normalized!
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON ('pgurl') WITH RETENTION = ('36h') -- fully parenthesized
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON '_' WITH RETENTION = '_' -- literals removed
CREATE TENANT _ FROM REPLICATION OF _ ON 'pgurl' WITH RETENTION = '36h' -- identifiers removed
40 changes: 40 additions & 0 deletions pkg/sql/sem/tree/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -2220,8 +2220,17 @@ type CreateTenantFromReplication struct {
// ReplicationSourceAddress is the address of the source cluster that we are
// replicating data from.
ReplicationSourceAddress Expr

Options TenantReplicationOptions
}

// TenantReplicationOptions options for the CREATE TENANT FROM REPLICATION command.
type TenantReplicationOptions struct {
Retention Expr
}

var _ NodeFormatter = &TenantReplicationOptions{}

// Format implements the NodeFormatter interface.
func (node *CreateTenantFromReplication) Format(ctx *FmtCtx) {
ctx.WriteString("CREATE TENANT ")
Expand All @@ -2235,4 +2244,35 @@ func (node *CreateTenantFromReplication) Format(ctx *FmtCtx) {
ctx.WriteString(" ON ")
ctx.FormatNode(node.ReplicationSourceAddress)
}
if !node.Options.IsDefault() {
ctx.WriteString(" WITH ")
ctx.FormatNode(&node.Options)
}
}

// Format implements the NodeFormatter interface
func (o *TenantReplicationOptions) Format(ctx *FmtCtx) {
if o.Retention != nil {
ctx.WriteString("RETENTION = ")
ctx.FormatNode(o.Retention)
}
}

// CombineWith merges other TenantReplicationOptions into this struct.
// An error is returned if the same option merged multiple times.
func (o *TenantReplicationOptions) CombineWith(other *TenantReplicationOptions) error {
if o.Retention != nil {
if other.Retention != nil {
return errors.New("RETENTION option specified multiple times")
}
} else {
o.Retention = other.Retention
}
return nil
}

// IsDefault returns true if this backup options struct has default value.
func (o TenantReplicationOptions) IsDefault() bool {
options := TenantReplicationOptions{}
return o.Retention == options.Retention
}

0 comments on commit 16ed89c

Please sign in to comment.