diff --git a/docs/generated/sql/bnf/create_tenant_stmt.bnf b/docs/generated/sql/bnf/create_tenant_stmt.bnf index 9e5560aa7e4c..e8961e62e89d 100644 --- a/docs/generated/sql/bnf/create_tenant_stmt.bnf +++ b/docs/generated/sql/bnf/create_tenant_stmt.bnf @@ -1,3 +1,3 @@ create_tenant_stmt ::= 'CREATE' 'TENANT' name - | 'CREATE' 'TENANT' name 'FROM' 'REPLICATION' 'OF' name 'ON' string_or_placeholder + | 'CREATE' 'TENANT' name 'FROM' 'REPLICATION' 'OF' name 'ON' string_or_placeholder opt_with_tenant_replication_options diff --git a/docs/generated/sql/bnf/stmt_block.bnf b/docs/generated/sql/bnf/stmt_block.bnf index b4822ebaeab1..ae53ffedcb52 100644 --- a/docs/generated/sql/bnf/stmt_block.bnf +++ b/docs/generated/sql/bnf/stmt_block.bnf @@ -588,7 +588,7 @@ create_external_connection_stmt ::= create_tenant_stmt ::= 'CREATE' 'TENANT' name - | 'CREATE' 'TENANT' name 'FROM' 'REPLICATION' 'OF' name 'ON' string_or_placeholder + | 'CREATE' 'TENANT' name 'FROM' 'REPLICATION' 'OF' name 'ON' string_or_placeholder opt_with_tenant_replication_options create_schedule_stmt ::= create_schedule_for_changefeed_stmt @@ -1299,6 +1299,7 @@ unreserved_keyword ::= | 'RESTRICT' | 'RESTRICTED' | 'RESUME' + | 'RETENTION' | 'RETRY' | 'RETURN' | 'RETURNS' @@ -1736,6 +1737,11 @@ label_spec ::= string_or_placeholder | 'IF' 'NOT' 'EXISTS' string_or_placeholder +opt_with_tenant_replication_options ::= + 'WITH' tenant_replication_options_list + | 'WITH' 'OPTIONS' '(' tenant_replication_options_list ')' + | + create_schedule_for_changefeed_stmt ::= 'CREATE' 'SCHEDULE' schedule_label_spec 'FOR' 'CHANGEFEED' changefeed_targets changefeed_sink opt_with_options cron_expr opt_with_schedule_options | 'CREATE' 'SCHEDULE' schedule_label_spec 'FOR' 'CHANGEFEED' changefeed_sink opt_with_options 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause cron_expr opt_with_schedule_options @@ -2457,6 +2463,9 @@ target_elem ::= | a_expr | '*' +tenant_replication_options_list ::= + ( tenant_replication_options ) ( ( ',' tenant_replication_options ) )* + schedule_label_spec ::= label_spec | @@ -3030,6 +3039,9 @@ bare_col_label ::= 'identifier' | bare_label_keywords +tenant_replication_options ::= + 'RETENTION' '=' string_or_placeholder + common_table_expr ::= table_alias_name opt_col_def_list_no_types 'AS' '(' preparable_stmt ')' | table_alias_name opt_col_def_list_no_types 'AS' materialize_clause '(' preparable_stmt ')' diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go index c40e7ffa3226..263f3aad983f 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go @@ -10,6 +10,7 @@ package streamingest import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl" "github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient" @@ -31,6 +32,10 @@ import ( "github.com/cockroachdb/errors" ) +// defaultRetentionTTLSeconds is the default value for how long +// replicated data will be retained. +const defaultRetentionTTLSeconds = 25 * 60 * 60 + func streamIngestionJobDescription( p sql.PlanHookState, streamIngestion *tree.CreateTenantFromReplication, ) (string, error) { @@ -51,9 +56,12 @@ func ingestionTypeCheck( return false, nil, nil } if err := exprutil.TypeCheck(ctx, "INGESTION", p.SemaCtx(), - exprutil.Strings{ingestionStmt.ReplicationSourceAddress}); err != nil { + exprutil.Strings{ + ingestionStmt.ReplicationSourceAddress, + ingestionStmt.Options.Retention}); err != nil { return false, nil, err } + return true, resultColumns, nil } @@ -91,6 +99,21 @@ func ingestionPlanHook( return nil, nil, nil, false, err } + retentionTTLSeconds := defaultRetentionTTLSeconds + if ingestionStmt.Options.Retention != nil { + retentionStr, err := exprEval.String(ctx, ingestionStmt.Options.Retention) + if err != nil { + return nil, nil, nil, false, err + } + if retentionStr != "" { + r, err := time.ParseDuration(retentionStr) + if err != nil { + return nil, nil, nil, false, err + } + retentionTTLSeconds = int(r.Seconds()) + } + } + fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error { ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag()) defer span.Finish() @@ -170,11 +193,6 @@ func ingestionPlanHook( } prefix := keys.MakeTenantPrefix(destinationTenantID) - // TODO(adityamaru): Wire this up to the user configurable option. - replicationTTLSeconds := 25 * 60 * 60 - if knobs := p.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.OverrideReplicationTTLSeconds != 0 { - replicationTTLSeconds = knobs.OverrideReplicationTTLSeconds - } streamIngestionDetails := jobspb.StreamIngestionDetails{ StreamAddress: string(streamAddress), StreamID: uint64(replicationProducerSpec.StreamID), @@ -182,7 +200,7 @@ func ingestionPlanHook( DestinationTenantID: destinationTenantID, SourceTenantName: roachpb.TenantName(sourceTenant), DestinationTenantName: roachpb.TenantName(destinationTenant), - ReplicationTTLSeconds: int32(replicationTTLSeconds), + ReplicationTTLSeconds: int32(retentionTTLSeconds), ReplicationStartTime: replicationProducerSpec.ReplicationStartTime, } diff --git a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go index 93c68ac1b756..24d5a9c1d952 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go @@ -62,6 +62,7 @@ type tenantStreamingClustersArgs struct { destInitFunc destInitExecFunc destNumNodes int destClusterSettings map[string]string + retentionTTLSeconds int testingKnobs *sql.StreamingTestingKnobs } @@ -195,11 +196,21 @@ func (c *tenantStreamingClusters) cutover( // Returns producer job ID and ingestion job ID. func (c *tenantStreamingClusters) startStreamReplication() (int, int) { var ingestionJobID, streamProducerJobID int - streamReplStmt := fmt.Sprintf("CREATE TENANT %s FROM REPLICATION OF %s ON '%s'", c.args.destTenantName, c.args.srcTenantName, c.srcURL.String()) - c.destSysSQL.QueryRow(c.t, streamReplStmt).Scan(&ingestionJobID, &streamProducerJobID) + c.destSysSQL.QueryRow(c.t, c.buildCreateTenantQuery()).Scan(&ingestionJobID, &streamProducerJobID) return streamProducerJobID, ingestionJobID } +func (c *tenantStreamingClusters) buildCreateTenantQuery() string { + streamReplStmt := fmt.Sprintf("CREATE TENANT %s FROM REPLICATION OF %s ON '%s'", + c.args.destTenantName, + c.args.srcTenantName, + c.srcURL.String()) + if c.args.retentionTTLSeconds > 0 { + streamReplStmt = fmt.Sprintf("%s WITH RETENTION = '%ds'", streamReplStmt, c.args.retentionTTLSeconds) + } + return streamReplStmt +} + func waitForTenantPodsActive( t testing.TB, tenantServer serverutils.TestTenantInterface, numPods int, ) { @@ -1102,13 +1113,7 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) { args := defaultTenantStreamingClustersArgs // Override the replication job details ReplicationTTLSeconds to a small value // so that every progress update results in a protected timestamp update. - // - // TODO(adityamaru): Once this is wired up to be user configurable via an - // option to `CREATE TENANT ... FROM REPLICATION` we should replace this - // testing knob with a create tenant option. - args.testingKnobs = &sql.StreamingTestingKnobs{ - OverrideReplicationTTLSeconds: 1, - } + args.retentionTTLSeconds = 1 testProtectedTimestampManagement := func(t *testing.T, pauseBeforeTerminal bool, completeReplication bool) { // waitForProducerProtection asserts that there is a PTS record protecting diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index d2fb23787a26..0304a57fb20a 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -1638,10 +1638,6 @@ type StreamingTestingKnobs struct { // frontier specs generated for the replication job. AfterReplicationFlowPlan func([]*execinfrapb.StreamIngestionDataSpec, *execinfrapb.StreamIngestionFrontierSpec) - - // OverrideReplicationTTLSeconds will override the default value of the - // `ReplicationTTLSeconds` field on the StreamIngestion job details. - OverrideReplicationTTLSeconds int } var _ base.ModuleTestingKnobs = &StreamingTestingKnobs{} diff --git a/pkg/sql/parser/sql.y b/pkg/sql/parser/sql.y index 9c230e4b4df7..f683b5d3ce75 100644 --- a/pkg/sql/parser/sql.y +++ b/pkg/sql/parser/sql.y @@ -828,6 +828,9 @@ func (u *sqlSymUnion) functionObj() tree.FuncObj { func (u *sqlSymUnion) functionObjs() tree.FuncObjs { return u.val.(tree.FuncObjs) } +func (u *sqlSymUnion) tenantReplicationOptions() *tree.TenantReplicationOptions { + return u.val.(*tree.TenantReplicationOptions) +} %} // NB: the %token definitions must come before the %type definitions in this @@ -929,7 +932,7 @@ func (u *sqlSymUnion) functionObjs() tree.FuncObjs { %token RANGE RANGES READ REAL REASON REASSIGN RECURSIVE RECURRING REF REFERENCES REFRESH %token REGCLASS REGION REGIONAL REGIONS REGNAMESPACE REGPROC REGPROCEDURE REGROLE REGTYPE REINDEX %token RELATIVE RELOCATE REMOVE_PATH RENAME REPEATABLE REPLACE REPLICATION -%token RELEASE RESET RESTART RESTORE RESTRICT RESTRICTED RESUME RETURNING RETURN RETURNS RETRY REVISION_HISTORY +%token RELEASE RESET RESTART RESTORE RESTRICT RESTRICTED RESUME RETENTION RETURNING RETURN RETURNS RETRY REVISION_HISTORY %token REVOKE RIGHT ROLE ROLES ROLLBACK ROLLUP ROUTINES ROW ROWS RSHIFT RULE RUNNING %token SAVEPOINT SCANS SCATTER SCHEDULE SCHEDULES SCROLL SCHEMA SCHEMA_ONLY SCHEMAS SCRUB @@ -1265,6 +1268,7 @@ func (u *sqlSymUnion) functionObjs() tree.FuncObjs { %type <[]tree.KVOption> kv_option_list opt_with_options var_set_list opt_with_schedule_options %type <*tree.BackupOptions> opt_with_backup_options backup_options backup_options_list %type <*tree.RestoreOptions> opt_with_restore_options restore_options restore_options_list +%type <*tree.TenantReplicationOptions> opt_with_tenant_replication_options tenant_replication_options tenant_replication_options_list %type show_backup_details %type <*tree.CopyOptions> opt_with_copy_options copy_options copy_options_list %type import_format @@ -3175,7 +3179,7 @@ backup_options: } | REVISION_HISTORY '=' a_expr { - $$.val = &tree.BackupOptions{CaptureRevisionHistory: $3.expr()} + $$.val = &tree.BackupOptions{CaptureRevisionHistory: $3.expr()} } | DETACHED { @@ -3183,7 +3187,7 @@ backup_options: } | DETACHED '=' TRUE { - $$.val = &tree.BackupOptions{Detached: tree.MakeDBool(true)} + $$.val = &tree.BackupOptions{Detached: tree.MakeDBool(true)} } | DETACHED '=' FALSE { @@ -4146,16 +4150,52 @@ create_tenant_stmt: { $$.val = &tree.CreateTenant{Name: tree.Name($3)} } -| CREATE TENANT name FROM REPLICATION OF name ON string_or_placeholder +| CREATE TENANT name FROM REPLICATION OF name ON string_or_placeholder opt_with_tenant_replication_options { $$.val = &tree.CreateTenantFromReplication{ Name: tree.Name($3), ReplicationSourceTenantName: tree.Name($7), ReplicationSourceAddress: $9.expr(), + Options: *$10.tenantReplicationOptions(), } } | CREATE TENANT error // SHOW HELP: CREATE TENANT +// Optional tenant replication options. +opt_with_tenant_replication_options: + WITH tenant_replication_options_list + { + $$.val = $2.tenantReplicationOptions() + } +| WITH OPTIONS '(' tenant_replication_options_list ')' + { + $$.val = $4.tenantReplicationOptions() + } +| /* EMPTY */ + { + $$.val = &tree.TenantReplicationOptions{} + } + +tenant_replication_options_list: + // Require at least one option + tenant_replication_options + { + $$.val = $1.tenantReplicationOptions() + } +| tenant_replication_options_list ',' tenant_replication_options + { + if err := $1.tenantReplicationOptions().CombineWith($3.tenantReplicationOptions()); err != nil { + return setErr(sqllex, err) + } + } + +// List of valid tenant replication options. +tenant_replication_options: + RETENTION '=' string_or_placeholder + { + $$.val = &tree.TenantReplicationOptions{Retention: $3.expr()} + } + // %Help: CREATE SCHEDULE // %Category: Group // %Text: @@ -15803,6 +15843,7 @@ unreserved_keyword: | RESTRICT | RESTRICTED | RESUME +| RETENTION | RETRY | RETURN | RETURNS diff --git a/pkg/sql/parser/testdata/create_tenant b/pkg/sql/parser/testdata/create_tenant index 0d3c5690e91e..8a1254836beb 100644 --- a/pkg/sql/parser/testdata/create_tenant +++ b/pkg/sql/parser/testdata/create_tenant @@ -29,3 +29,19 @@ CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON ('pgurl') -- fully parenthesized CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON '_' -- literals removed CREATE TENANT _ FROM REPLICATION OF _ ON 'pgurl' -- identifiers removed + +parse +CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH RETENTION = '36h' +---- +CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH RETENTION = '36h' +CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON ('pgurl') WITH RETENTION = ('36h') -- fully parenthesized +CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON '_' WITH RETENTION = '_' -- literals removed +CREATE TENANT _ FROM REPLICATION OF _ ON 'pgurl' WITH RETENTION = '36h' -- identifiers removed + +parse +CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH OPTIONS (RETENTION = '36h') +---- +CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH RETENTION = '36h' -- normalized! +CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON ('pgurl') WITH RETENTION = ('36h') -- fully parenthesized +CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON '_' WITH RETENTION = '_' -- literals removed +CREATE TENANT _ FROM REPLICATION OF _ ON 'pgurl' WITH RETENTION = '36h' -- identifiers removed diff --git a/pkg/sql/sem/tree/create.go b/pkg/sql/sem/tree/create.go index 5a26d832795b..7710828a6324 100644 --- a/pkg/sql/sem/tree/create.go +++ b/pkg/sql/sem/tree/create.go @@ -2220,8 +2220,17 @@ type CreateTenantFromReplication struct { // ReplicationSourceAddress is the address of the source cluster that we are // replicating data from. ReplicationSourceAddress Expr + + Options TenantReplicationOptions +} + +// TenantReplicationOptions options for the CREATE TENANT FROM REPLICATION command. +type TenantReplicationOptions struct { + Retention Expr } +var _ NodeFormatter = &TenantReplicationOptions{} + // Format implements the NodeFormatter interface. func (node *CreateTenantFromReplication) Format(ctx *FmtCtx) { ctx.WriteString("CREATE TENANT ") @@ -2235,4 +2244,35 @@ func (node *CreateTenantFromReplication) Format(ctx *FmtCtx) { ctx.WriteString(" ON ") ctx.FormatNode(node.ReplicationSourceAddress) } + if !node.Options.IsDefault() { + ctx.WriteString(" WITH ") + ctx.FormatNode(&node.Options) + } +} + +// Format implements the NodeFormatter interface +func (o *TenantReplicationOptions) Format(ctx *FmtCtx) { + if o.Retention != nil { + ctx.WriteString("RETENTION = ") + ctx.FormatNode(o.Retention) + } +} + +// CombineWith merges other TenantReplicationOptions into this struct. +// An error is returned if the same option merged multiple times. +func (o *TenantReplicationOptions) CombineWith(other *TenantReplicationOptions) error { + if o.Retention != nil { + if other.Retention != nil { + return errors.New("RETENTION option specified multiple times") + } + } else { + o.Retention = other.Retention + } + return nil +} + +// IsDefault returns true if this backup options struct has default value. +func (o TenantReplicationOptions) IsDefault() bool { + options := TenantReplicationOptions{} + return o.Retention == options.Retention }