Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

streamingccl: add WITH RETENTION option to CREATE TENANT #94001

Merged
merged 1 commit into from
Dec 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/generated/sql/bnf/create_tenant_stmt.bnf
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
create_tenant_stmt ::=
'CREATE' 'TENANT' name
| 'CREATE' 'TENANT' name 'FROM' 'REPLICATION' 'OF' name 'ON' string_or_placeholder
| 'CREATE' 'TENANT' name 'FROM' 'REPLICATION' 'OF' name 'ON' string_or_placeholder opt_with_tenant_replication_options
14 changes: 13 additions & 1 deletion docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -588,7 +588,7 @@ create_external_connection_stmt ::=

create_tenant_stmt ::=
'CREATE' 'TENANT' name
| 'CREATE' 'TENANT' name 'FROM' 'REPLICATION' 'OF' name 'ON' string_or_placeholder
| 'CREATE' 'TENANT' name 'FROM' 'REPLICATION' 'OF' name 'ON' string_or_placeholder opt_with_tenant_replication_options

create_schedule_stmt ::=
create_schedule_for_changefeed_stmt
Expand Down Expand Up @@ -1299,6 +1299,7 @@ unreserved_keyword ::=
| 'RESTRICT'
| 'RESTRICTED'
| 'RESUME'
| 'RETENTION'
| 'RETRY'
| 'RETURN'
| 'RETURNS'
Expand Down Expand Up @@ -1736,6 +1737,11 @@ label_spec ::=
string_or_placeholder
| 'IF' 'NOT' 'EXISTS' string_or_placeholder

opt_with_tenant_replication_options ::=
'WITH' tenant_replication_options_list
| 'WITH' 'OPTIONS' '(' tenant_replication_options_list ')'
|

create_schedule_for_changefeed_stmt ::=
'CREATE' 'SCHEDULE' schedule_label_spec 'FOR' 'CHANGEFEED' changefeed_targets changefeed_sink opt_with_options cron_expr opt_with_schedule_options
| 'CREATE' 'SCHEDULE' schedule_label_spec 'FOR' 'CHANGEFEED' changefeed_sink opt_with_options 'AS' 'SELECT' target_list 'FROM' changefeed_target_expr opt_where_clause cron_expr opt_with_schedule_options
Expand Down Expand Up @@ -2457,6 +2463,9 @@ target_elem ::=
| a_expr
| '*'

tenant_replication_options_list ::=
( tenant_replication_options ) ( ( ',' tenant_replication_options ) )*

schedule_label_spec ::=
label_spec
|
Expand Down Expand Up @@ -3030,6 +3039,9 @@ bare_col_label ::=
'identifier'
| bare_label_keywords

tenant_replication_options ::=
'RETENTION' '=' string_or_placeholder

common_table_expr ::=
table_alias_name opt_col_def_list_no_types 'AS' '(' preparable_stmt ')'
| table_alias_name opt_col_def_list_no_types 'AS' materialize_clause '(' preparable_stmt ')'
Expand Down
32 changes: 25 additions & 7 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package streamingest

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streamclient"
Expand All @@ -31,6 +32,10 @@ import (
"github.com/cockroachdb/errors"
)

// defaultRetentionTTLSeconds is the default value for how long
// replicated data will be retained.
const defaultRetentionTTLSeconds = 25 * 60 * 60

func streamIngestionJobDescription(
p sql.PlanHookState, streamIngestion *tree.CreateTenantFromReplication,
) (string, error) {
Expand All @@ -51,9 +56,12 @@ func ingestionTypeCheck(
return false, nil, nil
}
if err := exprutil.TypeCheck(ctx, "INGESTION", p.SemaCtx(),
exprutil.Strings{ingestionStmt.ReplicationSourceAddress}); err != nil {
exprutil.Strings{
ingestionStmt.ReplicationSourceAddress,
ingestionStmt.Options.Retention}); err != nil {
return false, nil, err
}

return true, resultColumns, nil
}

Expand Down Expand Up @@ -91,6 +99,21 @@ func ingestionPlanHook(
return nil, nil, nil, false, err
}

retentionTTLSeconds := defaultRetentionTTLSeconds
if ingestionStmt.Options.Retention != nil {
retentionStr, err := exprEval.String(ctx, ingestionStmt.Options.Retention)
if err != nil {
return nil, nil, nil, false, err
}
if retentionStr != "" {
r, err := time.ParseDuration(retentionStr)
if err != nil {
return nil, nil, nil, false, err
}
retentionTTLSeconds = int(r.Seconds())
}
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
ctx, span := tracing.ChildSpan(ctx, stmt.StatementTag())
defer span.Finish()
Expand Down Expand Up @@ -170,19 +193,14 @@ func ingestionPlanHook(
}

prefix := keys.MakeTenantPrefix(destinationTenantID)
// TODO(adityamaru): Wire this up to the user configurable option.
replicationTTLSeconds := 25 * 60 * 60
if knobs := p.ExecCfg().StreamingTestingKnobs; knobs != nil && knobs.OverrideReplicationTTLSeconds != 0 {
replicationTTLSeconds = knobs.OverrideReplicationTTLSeconds
}
streamIngestionDetails := jobspb.StreamIngestionDetails{
StreamAddress: string(streamAddress),
StreamID: uint64(replicationProducerSpec.StreamID),
Span: roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()},
DestinationTenantID: destinationTenantID,
SourceTenantName: roachpb.TenantName(sourceTenant),
DestinationTenantName: roachpb.TenantName(destinationTenant),
ReplicationTTLSeconds: int32(replicationTTLSeconds),
ReplicationTTLSeconds: int32(retentionTTLSeconds),
ReplicationStartTime: replicationProducerSpec.ReplicationStartTime,
}

Expand Down
23 changes: 14 additions & 9 deletions pkg/ccl/streamingccl/streamingest/stream_replication_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type tenantStreamingClustersArgs struct {
destInitFunc destInitExecFunc
destNumNodes int
destClusterSettings map[string]string
retentionTTLSeconds int
testingKnobs *sql.StreamingTestingKnobs
}

Expand Down Expand Up @@ -195,11 +196,21 @@ func (c *tenantStreamingClusters) cutover(
// Returns producer job ID and ingestion job ID.
func (c *tenantStreamingClusters) startStreamReplication() (int, int) {
var ingestionJobID, streamProducerJobID int
streamReplStmt := fmt.Sprintf("CREATE TENANT %s FROM REPLICATION OF %s ON '%s'", c.args.destTenantName, c.args.srcTenantName, c.srcURL.String())
c.destSysSQL.QueryRow(c.t, streamReplStmt).Scan(&ingestionJobID, &streamProducerJobID)
c.destSysSQL.QueryRow(c.t, c.buildCreateTenantQuery()).Scan(&ingestionJobID, &streamProducerJobID)
return streamProducerJobID, ingestionJobID
}

func (c *tenantStreamingClusters) buildCreateTenantQuery() string {
streamReplStmt := fmt.Sprintf("CREATE TENANT %s FROM REPLICATION OF %s ON '%s'",
c.args.destTenantName,
c.args.srcTenantName,
c.srcURL.String())
if c.args.retentionTTLSeconds > 0 {
streamReplStmt = fmt.Sprintf("%s WITH RETENTION = '%ds'", streamReplStmt, c.args.retentionTTLSeconds)
}
return streamReplStmt
}

func waitForTenantPodsActive(
t testing.TB, tenantServer serverutils.TestTenantInterface, numPods int,
) {
Expand Down Expand Up @@ -1102,13 +1113,7 @@ func TestTenantReplicationProtectedTimestampManagement(t *testing.T) {
args := defaultTenantStreamingClustersArgs
// Override the replication job details ReplicationTTLSeconds to a small value
// so that every progress update results in a protected timestamp update.
//
// TODO(adityamaru): Once this is wired up to be user configurable via an
// option to `CREATE TENANT ... FROM REPLICATION` we should replace this
// testing knob with a create tenant option.
args.testingKnobs = &sql.StreamingTestingKnobs{
OverrideReplicationTTLSeconds: 1,
}
args.retentionTTLSeconds = 1

testProtectedTimestampManagement := func(t *testing.T, pauseBeforeTerminal bool, completeReplication bool) {
// waitForProducerProtection asserts that there is a PTS record protecting
Expand Down
4 changes: 0 additions & 4 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1638,10 +1638,6 @@ type StreamingTestingKnobs struct {
// frontier specs generated for the replication job.
AfterReplicationFlowPlan func([]*execinfrapb.StreamIngestionDataSpec,
*execinfrapb.StreamIngestionFrontierSpec)

// OverrideReplicationTTLSeconds will override the default value of the
// `ReplicationTTLSeconds` field on the StreamIngestion job details.
OverrideReplicationTTLSeconds int
}

var _ base.ModuleTestingKnobs = &StreamingTestingKnobs{}
Expand Down
49 changes: 45 additions & 4 deletions pkg/sql/parser/sql.y
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,9 @@ func (u *sqlSymUnion) functionObj() tree.FuncObj {
func (u *sqlSymUnion) functionObjs() tree.FuncObjs {
return u.val.(tree.FuncObjs)
}
func (u *sqlSymUnion) tenantReplicationOptions() *tree.TenantReplicationOptions {
return u.val.(*tree.TenantReplicationOptions)
}
%}

// NB: the %token definitions must come before the %type definitions in this
Expand Down Expand Up @@ -929,7 +932,7 @@ func (u *sqlSymUnion) functionObjs() tree.FuncObjs {
%token <str> RANGE RANGES READ REAL REASON REASSIGN RECURSIVE RECURRING REF REFERENCES REFRESH
%token <str> REGCLASS REGION REGIONAL REGIONS REGNAMESPACE REGPROC REGPROCEDURE REGROLE REGTYPE REINDEX
%token <str> RELATIVE RELOCATE REMOVE_PATH RENAME REPEATABLE REPLACE REPLICATION
%token <str> RELEASE RESET RESTART RESTORE RESTRICT RESTRICTED RESUME RETURNING RETURN RETURNS RETRY REVISION_HISTORY
%token <str> RELEASE RESET RESTART RESTORE RESTRICT RESTRICTED RESUME RETENTION RETURNING RETURN RETURNS RETRY REVISION_HISTORY
%token <str> REVOKE RIGHT ROLE ROLES ROLLBACK ROLLUP ROUTINES ROW ROWS RSHIFT RULE RUNNING

%token <str> SAVEPOINT SCANS SCATTER SCHEDULE SCHEDULES SCROLL SCHEMA SCHEMA_ONLY SCHEMAS SCRUB
Expand Down Expand Up @@ -1265,6 +1268,7 @@ func (u *sqlSymUnion) functionObjs() tree.FuncObjs {
%type <[]tree.KVOption> kv_option_list opt_with_options var_set_list opt_with_schedule_options
%type <*tree.BackupOptions> opt_with_backup_options backup_options backup_options_list
%type <*tree.RestoreOptions> opt_with_restore_options restore_options restore_options_list
%type <*tree.TenantReplicationOptions> opt_with_tenant_replication_options tenant_replication_options tenant_replication_options_list
%type <tree.ShowBackupDetails> show_backup_details
%type <*tree.CopyOptions> opt_with_copy_options copy_options copy_options_list
%type <str> import_format
Expand Down Expand Up @@ -3175,15 +3179,15 @@ backup_options:
}
| REVISION_HISTORY '=' a_expr
{
$$.val = &tree.BackupOptions{CaptureRevisionHistory: $3.expr()}
$$.val = &tree.BackupOptions{CaptureRevisionHistory: $3.expr()}
}
| DETACHED
{
$$.val = &tree.BackupOptions{Detached: tree.MakeDBool(true)}
}
| DETACHED '=' TRUE
{
$$.val = &tree.BackupOptions{Detached: tree.MakeDBool(true)}
$$.val = &tree.BackupOptions{Detached: tree.MakeDBool(true)}
}
| DETACHED '=' FALSE
{
Expand Down Expand Up @@ -4146,16 +4150,52 @@ create_tenant_stmt:
{
$$.val = &tree.CreateTenant{Name: tree.Name($3)}
}
| CREATE TENANT name FROM REPLICATION OF name ON string_or_placeholder
| CREATE TENANT name FROM REPLICATION OF name ON string_or_placeholder opt_with_tenant_replication_options
{
$$.val = &tree.CreateTenantFromReplication{
Name: tree.Name($3),
ReplicationSourceTenantName: tree.Name($7),
ReplicationSourceAddress: $9.expr(),
Options: *$10.tenantReplicationOptions(),
}
}
| CREATE TENANT error // SHOW HELP: CREATE TENANT

// Optional tenant replication options.
opt_with_tenant_replication_options:
WITH tenant_replication_options_list
{
$$.val = $2.tenantReplicationOptions()
}
| WITH OPTIONS '(' tenant_replication_options_list ')'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we can also do CREATE TENANT .. FROM REPLICATION OF .. ON .. WITH OPTIONS (RETENTION = 36h); right? can you add a test for that syntax? or maybe we can wait for the data driven tests for that?

btw is that standard to support both WITH X = y and WITH OPTIONS (X = y)?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a parser test.

The WITH OPTIONS (X =y) is also supported by BACKUP and RESTORE so I supported it here for consistency.

{
$$.val = $4.tenantReplicationOptions()
}
| /* EMPTY */
{
$$.val = &tree.TenantReplicationOptions{}
}

tenant_replication_options_list:
// Require at least one option
tenant_replication_options
{
$$.val = $1.tenantReplicationOptions()
}
| tenant_replication_options_list ',' tenant_replication_options
{
if err := $1.tenantReplicationOptions().CombineWith($3.tenantReplicationOptions()); err != nil {
return setErr(sqllex, err)
}
}

// List of valid tenant replication options.
tenant_replication_options:
RETENTION '=' string_or_placeholder
{
$$.val = &tree.TenantReplicationOptions{Retention: $3.expr()}
}

// %Help: CREATE SCHEDULE
// %Category: Group
// %Text:
Expand Down Expand Up @@ -15803,6 +15843,7 @@ unreserved_keyword:
| RESTRICT
| RESTRICTED
| RESUME
| RETENTION
| RETRY
| RETURN
| RETURNS
Expand Down
16 changes: 16 additions & 0 deletions pkg/sql/parser/testdata/create_tenant
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,19 @@ CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON ('pgurl') -- fully parenthesized
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON '_' -- literals removed
CREATE TENANT _ FROM REPLICATION OF _ ON 'pgurl' -- identifiers removed

parse
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH RETENTION = '36h'
----
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH RETENTION = '36h'
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON ('pgurl') WITH RETENTION = ('36h') -- fully parenthesized
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON '_' WITH RETENTION = '_' -- literals removed
CREATE TENANT _ FROM REPLICATION OF _ ON 'pgurl' WITH RETENTION = '36h' -- identifiers removed

parse
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH OPTIONS (RETENTION = '36h')
----
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON 'pgurl' WITH RETENTION = '36h' -- normalized!
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON ('pgurl') WITH RETENTION = ('36h') -- fully parenthesized
CREATE TENANT "destination-hyphen" FROM REPLICATION OF "source-hyphen" ON '_' WITH RETENTION = '_' -- literals removed
CREATE TENANT _ FROM REPLICATION OF _ ON 'pgurl' WITH RETENTION = '36h' -- identifiers removed
40 changes: 40 additions & 0 deletions pkg/sql/sem/tree/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -2220,8 +2220,17 @@ type CreateTenantFromReplication struct {
// ReplicationSourceAddress is the address of the source cluster that we are
// replicating data from.
ReplicationSourceAddress Expr

Options TenantReplicationOptions
}

// TenantReplicationOptions options for the CREATE TENANT FROM REPLICATION command.
type TenantReplicationOptions struct {
Retention Expr
}

var _ NodeFormatter = &TenantReplicationOptions{}

// Format implements the NodeFormatter interface.
func (node *CreateTenantFromReplication) Format(ctx *FmtCtx) {
ctx.WriteString("CREATE TENANT ")
Expand All @@ -2235,4 +2244,35 @@ func (node *CreateTenantFromReplication) Format(ctx *FmtCtx) {
ctx.WriteString(" ON ")
ctx.FormatNode(node.ReplicationSourceAddress)
}
if !node.Options.IsDefault() {
ctx.WriteString(" WITH ")
ctx.FormatNode(&node.Options)
}
}

// Format implements the NodeFormatter interface
func (o *TenantReplicationOptions) Format(ctx *FmtCtx) {
if o.Retention != nil {
ctx.WriteString("RETENTION = ")
ctx.FormatNode(o.Retention)
}
}

// CombineWith merges other TenantReplicationOptions into this struct.
// An error is returned if the same option merged multiple times.
func (o *TenantReplicationOptions) CombineWith(other *TenantReplicationOptions) error {
if o.Retention != nil {
if other.Retention != nil {
return errors.New("RETENTION option specified multiple times")
}
} else {
o.Retention = other.Retention
}
return nil
}

// IsDefault returns true if this backup options struct has default value.
func (o TenantReplicationOptions) IsDefault() bool {
options := TenantReplicationOptions{}
return o.Retention == options.Retention
}