Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
63010: sql: fix race condition in internal executor r=yuzefovich a=ajwerner

The async and sync implementations were too close to justify two structs.
Also, the async behavior of not stopping the writer in case the reader
called close wasn't desireable. This commit unifies the implementation.
It also ensures that we propagate context errors in all cases triggered
by the closure of the done channel. It also makes closing the channel
idempotent.

Additionally, this commit transitions the execution flow into draining
state without setting our custom error on the resultWriter.

Fixes #62948.

Release note: None

63123: sql: fix ADD COLUMN ... UNIQUE for PARTITION ALL BY tables r=ajstorm a=otan

Refs: #63113

Release note (bug fix): Fixed a bug where doing ALTER TABLE ... ADD
COLUMN ... UNIQUE would error if the table had a PARTITION ALL BY /
REGIONAL BY ROW definition.


63178: sem/builtins: vectorized engine no longer catches crdb_internal.force_panic r=yuzefovich a=yuzefovich

Previously, when executing `crdb_internal.force_panic` builtin, if it
was executed via the vectorized engine, we would catch the panic and
convert it into an internal error instead. This is undesirable and is
now fixed.

Release note: None

Co-authored-by: Andrew Werner <[email protected]>
Co-authored-by: Oliver Tan <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
4 people committed Apr 7, 2021
4 parents e7f7244 + d8f85e2 + 4f1698f + ae6233f commit 651184b
Show file tree
Hide file tree
Showing 12 changed files with 174 additions and 138 deletions.
15 changes: 13 additions & 2 deletions pkg/ccl/logictestccl/testdata/logic_test/partitioning_implicit
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ query T noticetrace
CREATE INDEX created_idx ON t(c)
----

statement ok
ALTER TABLE t ADD COLUMN e INT8 NOT NULL UNIQUE

statement ok
ALTER TABLE t ADD CONSTRAINT unique_c_d UNIQUE(c, d)

Expand All @@ -470,15 +473,17 @@ CREATE TABLE public.t (
d INT8 NOT NULL,
j JSONB NULL,
u STRING NULL,
e INT8 NOT NULL,
CONSTRAINT "primary" PRIMARY KEY (pk ASC),
UNIQUE INDEX t_u_key (u ASC),
INDEX t_a_idx (a ASC),
UNIQUE INDEX t_b_key (b ASC),
INDEX t_partition_by_c_idx (partition_by ASC, c ASC),
INVERTED INDEX t_j_idx (j),
INDEX created_idx (c ASC),
UNIQUE INDEX t_e_key (e ASC),
UNIQUE INDEX unique_c_d (c ASC, d ASC),
FAMILY fam_0_pk_pk2_partition_by_a_b_c_d_j_u (pk, pk2, partition_by, a, b, c, d, j, u)
FAMILY fam_0_pk_pk2_partition_by_a_b_c_d_j_u (pk, pk2, partition_by, a, b, c, d, j, u, e)
) PARTITION ALL BY LIST (partition_by) (
PARTITION one VALUES IN ((1)),
PARTITION two VALUES IN ((2))
Expand All @@ -499,6 +504,8 @@ t_a_idx a false
t_a_idx partition_by true
t_b_key b false
t_b_key partition_by true
t_e_key e false
t_e_key partition_by true
t_j_idx j false
t_j_idx partition_by true
t_partition_by_c_idx c false
Expand All @@ -525,6 +532,7 @@ CREATE TABLE public.t (
d INT8 NOT NULL,
j JSONB NULL,
u STRING NULL,
e INT8 NOT NULL,
CONSTRAINT "primary" PRIMARY KEY (pk2 ASC),
UNIQUE INDEX t_pk_key (pk ASC),
UNIQUE INDEX t_u_key (u ASC),
Expand All @@ -533,8 +541,9 @@ CREATE TABLE public.t (
INDEX t_partition_by_c_idx (partition_by ASC, c ASC),
INVERTED INDEX t_j_idx (j),
INDEX created_idx (c ASC),
UNIQUE INDEX t_e_key (e ASC),
UNIQUE INDEX unique_c_d (c ASC, d ASC),
FAMILY fam_0_pk_pk2_partition_by_a_b_c_d_j_u (pk, pk2, partition_by, a, b, c, d, j, u)
FAMILY fam_0_pk_pk2_partition_by_a_b_c_d_j_u (pk, pk2, partition_by, a, b, c, d, j, u, e)
) PARTITION ALL BY LIST (partition_by) (
PARTITION one VALUES IN ((1)),
PARTITION two VALUES IN ((2))
Expand All @@ -555,6 +564,8 @@ t_a_idx a false
t_a_idx partition_by true
t_b_key b false
t_b_key partition_by true
t_e_key e false
t_e_key partition_by true
t_j_idx j false
t_j_idx partition_by true
t_partition_by_c_idx c false
Expand Down
27 changes: 27 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/regional_by_row
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,33 @@ ALTER TABLE regional_by_row_table ALTER PRIMARY KEY USING COLUMNS(pk2) USING HAS
statement error interleaved tables are not compatible with REGIONAL BY ROW tables
CREATE INDEX bad_idx ON regional_by_row_table(pk) INTERLEAVE IN PARENT parent_table(pk)

# Try add a new unique column.
statement ok
ALTER TABLE regional_by_row_table ADD COLUMN unique_col INT8 NOT NULL UNIQUE

query T
SELECT create_statement FROM [SHOW CREATE TABLE regional_by_row_table]
----
CREATE TABLE public.regional_by_row_table (
pk INT8 NOT NULL,
pk2 INT8 NOT NULL,
a INT8 NOT NULL,
b INT8 NOT NULL,
j JSONB NULL,
crdb_region public.crdb_internal_region NOT VISIBLE NOT NULL DEFAULT default_to_database_primary_region(gateway_region())::public.crdb_internal_region,
unique_col INT8 NOT NULL,
CONSTRAINT "primary" PRIMARY KEY (pk ASC),
INDEX regional_by_row_table_a_idx (a ASC),
UNIQUE INDEX regional_by_row_table_b_key (b ASC),
INVERTED INDEX regional_by_row_table_j_idx (j),
UNIQUE INDEX regional_by_row_table_unique_col_key (unique_col ASC),
FAMILY fam_0_pk_pk2_a_b_j_crdb_region (pk, pk2, a, b, j, crdb_region, unique_col)
) LOCALITY REGIONAL BY ROW;
ALTER PARTITION "us-east-1" OF INDEX multi_region_test_db.public.regional_by_row_table@regional_by_row_table_a_idx CONFIGURE ZONE USING "gc.ttlseconds" = 10

statement ok
ALTER TABLE regional_by_row_table DROP COLUMN unique_col

# Insert some rows into the regional_by_row_table.
query TI
INSERT INTO regional_by_row_table (pk, pk2, a, b, j) VALUES
Expand Down
13 changes: 13 additions & 0 deletions pkg/sql/add_column.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,19 @@ func (p *planner) addColumnImpl(
}
incTelemetryForNewColumn(d, col)

// Ensure all new indexes are partitioned appropriately.
if idx != nil {
*idx, err = p.configureIndexDescForNewIndexPartitioning(
params.ctx,
desc,
*idx,
nil, /* PartitionByIndex */
)
if err != nil {
return err
}
}

// If the new column has a DEFAULT expression that uses a sequence, add references between
// its descriptor and this column descriptor.
if d.HasDefaultExpr() {
Expand Down
17 changes: 9 additions & 8 deletions pkg/sql/colexecerror/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ func shouldCatchPanic(panicEmittedFrom string) bool {
// unchanged by the higher-level catchers.
return false
}
const nonVectorizedTestPrefix = "github.com/cockroachdb/cockroach/pkg/sql/colexecerror.NonVectorizedTestPanic"
if strings.HasPrefix(panicEmittedFrom, nonVectorizedTestPrefix) {
// This panic came from NonVectorizedTestPanic() method and should not
// be caught for testing purposes.
const nonCatchablePanicPrefix = "github.com/cockroachdb/cockroach/pkg/sql/colexecerror.NonCatchablePanic"
if strings.HasPrefix(panicEmittedFrom, nonCatchablePanicPrefix) {
// This panic came from NonCatchablePanic() method and should not be
// caught.
return false
}
return strings.HasPrefix(panicEmittedFrom, colPackagesPrefix) ||
Expand Down Expand Up @@ -200,9 +200,10 @@ func ExpectedError(err error) {
panic(newNotInternalError(err))
}

// NonVectorizedTestPanic is the equivalent of Golang's 'panic' word that should
// be used by the testing code within the vectorized engine to simulate a panic
// that occurs outside of the engine (and, thus, should not be caught).
func NonVectorizedTestPanic(object interface{}) {
// NonCatchablePanic is the equivalent of Golang's 'panic' word that can be used
// in order to crash the goroutine. It could be used by the testing code within
// the vectorized engine to simulate a panic that occurs outside of the engine
// (and, thus, should not be caught).
func NonCatchablePanic(object interface{}) {
panic(object)
}
10 changes: 5 additions & 5 deletions pkg/sql/colexecerror/error_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestCatchVectorizedRuntimeError(t *testing.T) {
require.Panics(t, func() {
require.NoError(t, colexecerror.CatchVectorizedRuntimeError(func() {
require.NoError(t, colexecerror.CatchVectorizedRuntimeError(func() {
colexecerror.NonVectorizedTestPanic(errors.New("should not be caught"))
colexecerror.NonCatchablePanic(errors.New("should not be caught"))
}))
}))
})
Expand Down Expand Up @@ -62,15 +62,15 @@ func TestCatchVectorizedRuntimeError(t *testing.T) {
require.False(t, strings.Contains(notAnnotatedErr.Error(), annotationText))
}

// TestNonVectorizedTestPanicIsNotCaught verifies that panics emitted via
// NonVectorizedTestPanic() method are not caught by the catcher.
func TestNonVectorizedTestPanicIsNotCaught(t *testing.T) {
// TestNonCatchablePanicIsNotCaught verifies that panics emitted via
// NonCatchablePanic() method are not caught by the catcher.
func TestNonCatchablePanicIsNotCaught(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

require.Panics(t, func() {
require.NoError(t, colexecerror.CatchVectorizedRuntimeError(func() {
colexecerror.NonVectorizedTestPanic("should panic")
colexecerror.NonCatchablePanic("should panic")
}))
})
}
2 changes: 1 addition & 1 deletion pkg/sql/colflow/vectorized_panic_propagation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (e *testNonVectorizedPanicEmitter) Init() {
func (e *testNonVectorizedPanicEmitter) Next(ctx context.Context) coldata.Batch {
if !e.emitBatch {
e.emitBatch = true
colexecerror.NonVectorizedTestPanic("")
colexecerror.NonCatchablePanic("")
}

e.emitBatch = false
Expand Down
8 changes: 4 additions & 4 deletions pkg/sql/distsql_running.go
Original file line number Diff line number Diff line change
Expand Up @@ -738,10 +738,10 @@ func (r *DistSQLReceiver) Push(
}
r.tracing.TraceExecRowsResult(r.ctx, r.row)
if commErr := r.resultWriter.AddRow(r.ctx, r.row); commErr != nil {
if errors.Is(commErr, ErrLimitedResultClosed) {
// ErrLimitedResultClosed is not a real error, it is a signal to
// stop distsql and return success to the client (that's why we
// don't set the error on the resultWriter).
if errors.Is(commErr, ErrLimitedResultClosed) || errors.Is(commErr, errIEResultChannelClosed) {
// ErrLimitedResultClosed and errIEResultChannelClosed are not real
// errors, it is a signal to stop distsql and return success to the
// client (that's why we don't set the error on the resultWriter).
r.status = execinfra.DrainRequested
} else {
// Set the error on the resultWriter to notify the consumer about
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,7 @@ var rowsAffectedResultColumns = colinfo.ResultColumns{
func (ie *InternalExecutor) execInternal(
ctx context.Context,
opName string,
rw ieResultChannel,
rw *ieResultChannel,
txn *kv.Txn,
sessionDataOverride sessiondata.InternalExecutorOverride,
stmt string,
Expand Down
Loading

0 comments on commit 651184b

Please sign in to comment.