Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
67814: base: drop SlowRequestThreshold to 15s r=nvanbenschoten a=nvanbenschoten

This commit drops `SlowRequestThreshold` from 60s to 15s. This constant
dictates the delay before we consider the following four operations to
be slow enough to log and/or increment "slow request" metrics:
- raft proposals
- raft proposal quota acquisition
- lease acquisition
- latch acquisition

67890: roachtest: skip jobs/mixed-version r=pbardea a=pbardea

This test was unskipped and was passing locally, but it turns out that
the test wasn't even running any IMPORT jobs locally. This test will
need further investigation to see if IMPORTing a workload between
versions is supported.

Release note: None

67897: importccl: disallow IMPORT TABLE with expression indexes r=mgartner a=mgartner

This commit adds an explanatory error message when a user tries to
execute an `IMPORT TABLE` statement with an expression index.
Previously, the error confusingly read "unimplemented: only simple
columns are supported as index elements". Now it reads "unimplemented:
to import into a table with expression indexes, use IMPORT INTO".

Release note: None

67898: opt: better inference of computed columns involving composite types r=RaduBerinde a=RaduBerinde

Composite types are those that can have values which are logically
equal but not identical, for example decimals 1.0 and 1.00.

We currently prevent inferring the value of a computed column if it
depends on composite columns. This commit improves on this: we can
infer the computed column as long as the expression is not "composite
sensitive". For example, arithmetic operations are not composite
sensitive, whereas conversion to string is.

This improvement is necessary for sharded indexes on composite
columns.

Release note: None

67919: clisqlclient: fix the error handling in QueryRow r=arulajmani a=knz

Found while working on #67917. 

67922: changefeedccl: Correctly manage protected timestamps. r=miretskiy a=miretskiy

Fix a minor logic bug where if we previously delayed job
progress update, we may then fail to manage protected timestamp
for some time.

In addition, always checkpoing when reaching schema change boundary.
We must always checkpoint when reaching boundary because the changefeed
may need to restart (e.g. during primary key change).

Release Notes: None

67943: roachprod: delete EBS volumes on termination r=miretskiy a=rail

Fixes #67932

Previously, when a cluster was generated with `--local-ssd=false`, the
EBS volume was created with `deleteOnTermination` set to `false`. As a
result we ended up with many dangling volumes.

The patch explicitly sets the `DeleteOnTermination` property for the
`--local-ssd=false` case.

Release note: None

Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Paul Bardea <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Rail Aliiev <[email protected]>
  • Loading branch information
8 people committed Jul 22, 2021
8 parents 6afe7de + 4eb0bb7 + 0d4fc6e + 1bb6661 + 1eebf3d + 70ac5b8 + 14c1bb4 + 30a0eae commit 15d3c04
Show file tree
Hide file tree
Showing 11 changed files with 218 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pkg/base/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ const (

// SlowRequestThreshold is the amount of time to wait before considering a
// request to be "slow".
SlowRequestThreshold = 60 * time.Second
SlowRequestThreshold = 15 * time.Second

// ChunkRaftCommandThresholdBytes is the threshold in bytes at which
// to chunk or otherwise limit commands being sent to Raft.
Expand Down
14 changes: 11 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1306,21 +1306,29 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
}

func (cf *changeFrontier) maybeCheckpointJob(frontierChanged, isBehind bool) (bool, error) {
// See if we can update highwatermark.
// We always update watermark when we reach schema change boundary.
// Otherwise, we checkpoint when frontier changes
// (and we can checkpoint), or if it has changed in the past, but we skipped that update.
updateHighWater :=
cf.frontier.schemaChangeBoundaryReached() || cf.js.canCheckpointHighWatermark(frontierChanged)

// Update checkpoint if the frontier has not changed, but it is time to checkpoint.
// If the frontier has changed, we want to write an empty checkpoint record indicating
// that all spans have reached the frontier.
updateCheckpoint := !frontierChanged && cf.js.canCheckpointFrontier()
updateCheckpoint := !updateHighWater && cf.js.canCheckpointFrontier()

var checkpoint jobspb.ChangefeedProgress_Checkpoint
if updateCheckpoint {
maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Get(&cf.flowCtx.Cfg.Settings.SV)
checkpoint.Spans = cf.frontier.getCheckpointSpans(maxBytes)
}

if updateCheckpoint || cf.js.canCheckpointHighWatermark(frontierChanged) {
if updateCheckpoint || updateHighWater {
manageProtected := updateHighWater
checkpointStart := timeutil.Now()
if err := cf.checkpointJobProgress(
cf.frontier.Frontier(), frontierChanged, checkpoint, isBehind,
cf.frontier.Frontier(), manageProtected, checkpoint, isBehind,
); err != nil {
return false, err
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/importccl/csv_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func TestMakeSimpleTableDescriptorErrors(t *testing.T) {
stmt: "create table a (i int, j int as (i + 10) virtual)",
error: `to import into a table with virtual computed columns, use IMPORT INTO`,
},
{
stmt: "create table a (i int, index ((i + 1)))",
error: `to import into a table with expression indexes, use IMPORT INTO`,
},
{
stmt: `create table a (
i int check (i > 0),
Expand Down
36 changes: 36 additions & 0 deletions pkg/ccl/importccl/import_stmt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4481,6 +4481,15 @@ INSERT INTO users (a, b) VALUES (1, 2), (3, 4);
format: "CSV",
expectedResults: [][]string{{"1", "3", "2"}, {"3", "7", "4"}},
},
{
into: true,
name: "import-into-csv-expression-index",
data: "1,2\n3,4",
create: "a INT, b INT, INDEX ((a + b))",
targetCols: "a, b",
format: "CSV",
expectedResults: [][]string{{"1", "2"}, {"3", "4"}},
},
{
into: true,
name: "import-into-avro",
Expand All @@ -4490,6 +4499,15 @@ INSERT INTO users (a, b) VALUES (1, 2), (3, 4);
format: "AVRO",
expectedResults: [][]string{{"1", "2", "3"}, {"3", "4", "7"}},
},
{
into: true,
name: "import-into-avro-expression-index",
data: avroData,
create: "a INT, b INT, INDEX ((a + b))",
targetCols: "a, b",
format: "AVRO",
expectedResults: [][]string{{"1", "2"}, {"3", "4"}},
},
{
into: false,
name: "import-table-csv",
Expand All @@ -4506,6 +4524,14 @@ INSERT INTO users (a, b) VALUES (1, 2), (3, 4);
format: "CSV",
expectedError: "to import into a table with virtual computed columns, use IMPORT INTO",
},
{
into: false,
name: "import-table-csv-expression-index",
data: "35,23\n67,10",
create: "a INT, b INT, INDEX ((a + b))",
format: "CSV",
expectedError: "to import into a table with expression indexes, use IMPORT INTO",
},
{
into: false,
name: "import-table-avro",
Expand All @@ -4522,6 +4548,14 @@ INSERT INTO users (a, b) VALUES (1, 2), (3, 4);
format: "AVRO",
expectedError: "to import into a table with virtual computed columns, use IMPORT INTO",
},
{
into: false,
name: "import-table-avro-expression-index",
data: avroData,
create: "a INT, b INT, INDEX ((a + b))",
format: "AVRO",
expectedError: "to import into a table with expression indexes, use IMPORT INTO",
},
{
into: false,
name: "pgdump",
Expand All @@ -4535,6 +4569,8 @@ INSERT INTO users (a, b) VALUES (1, 2), (3, 4);
defer sqlDB.Exec(t, `DROP TABLE IF EXISTS users`)
data = test.data
var importStmt string
// Enabled expression indexes.
sqlDB.Exec(t, `SET experimental_enable_expression_indexes=true`)
if test.into {
sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE users (%s)`, test.create))
importStmt = fmt.Sprintf(`IMPORT INTO users (%s) %s DATA (%q)`,
Expand Down
9 changes: 8 additions & 1 deletion pkg/ccl/importccl/import_table_creation.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,16 @@ func MakeSimpleTableDescriptor(
switch def := create.Defs[i].(type) {
case *tree.CheckConstraintTableDef,
*tree.FamilyTableDef,
*tree.IndexTableDef,
*tree.UniqueConstraintTableDef:
// ignore
case *tree.IndexTableDef:
for i := range def.Columns {
if def.Columns[i].Expr != nil {
return nil, unimplemented.NewWithIssueDetail(56002, "import.expression-index",
"to import into a table with expression indexes, use IMPORT INTO")
}
}

case *tree.ColumnTableDef:
if def.IsComputed() && def.IsVirtual() {
return nil, unimplemented.NewWithIssueDetail(56002, "import.computed",
Expand Down
8 changes: 4 additions & 4 deletions pkg/cli/clisqlclient/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,13 +523,13 @@ func (c *sqlConn) Query(query string, args []driver.Value) (Rows, error) {
return &sqlRows{rows: rows.(sqlRowsI), conn: c}, nil
}

func (c *sqlConn) QueryRow(query string, args []driver.Value) ([]driver.Value, error) {
func (c *sqlConn) QueryRow(query string, args []driver.Value) (vals []driver.Value, resErr error) {
rows, _, err := MakeQuery(query, args...)(c)
if err != nil {
return nil, err
}
defer func() { _ = rows.Close() }()
vals := make([]driver.Value, len(rows.Columns()))
defer func() { resErr = errors.CombineErrors(resErr, rows.Close()) }()
vals = make([]driver.Value, len(rows.Columns()))
err = rows.Next(vals)

// Assert that there is just one row.
Expand All @@ -538,7 +538,7 @@ func (c *sqlConn) QueryRow(query string, args []driver.Value) ([]driver.Value, e
nextErr := rows.Next(nextVals)
if nextErr != io.EOF {
if nextErr != nil {
return nil, err
return nil, nextErr
}
return nil, errors.AssertionFailedf("programming error: %q: expected just 1 row of result, got more", query)
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/cmd/roachprod/vm/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -862,12 +862,14 @@ func (p *Provider) runInstance(name string, zone string, opts vm.CreateOpts) err
if !opts.SSDOpts.UseLocalSSD {
if len(p.opts.EBSVolumes) == 0 && p.opts.DefaultEBSVolume.Disk.VolumeType == "" {
p.opts.DefaultEBSVolume.Disk.VolumeType = defaultEBSVolumeType
p.opts.DefaultEBSVolume.Disk.DeleteOnTermination = true
}

if p.opts.DefaultEBSVolume.Disk.VolumeType != "" {
// Add default volume to the list of volumes we'll setup.
v := p.opts.EBSVolumes.newVolume()
v.Disk = p.opts.DefaultEBSVolume.Disk
v.Disk.DeleteOnTermination = true
p.opts.EBSVolumes = append(p.opts.EBSVolumes, v)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/cmd/roachtest/tests/mixed_version_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ func registerJobsMixedVersions(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "jobs/mixed-versions",
Owner: registry.OwnerBulkIO,
Skip: "#67587",
// Jobs infrastructure was unstable prior to 20.1 in terms of the behavior
// of `PAUSE/CANCEL JOB` commands which were best effort and relied on the
// job itself to detect the request. These were fixed by introducing new job
Expand Down
39 changes: 25 additions & 14 deletions pkg/sql/opt/xform/general_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package xform
import (
"sort"

"github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo"
"github.com/cockroachdb/cockroach/pkg/sql/opt"
"github.com/cockroachdb/cockroach/pkg/sql/opt/cat"
"github.com/cockroachdb/cockroach/pkg/sql/opt/constraint"
Expand Down Expand Up @@ -247,7 +246,7 @@ func (c *CustomFuncs) computedColFilters(

// Start with set of constant columns, as derived from the list of filter
// conditions.
constCols := make(map[opt.ColumnID]opt.ScalarExpr)
constCols := make(constColsMap)
c.findConstantFilterCols(constCols, scanPrivate, requiredFilters)
c.findConstantFilterCols(constCols, scanPrivate, optionalFilters)
if len(constCols) == 0 {
Expand All @@ -270,6 +269,29 @@ func (c *CustomFuncs) computedColFilters(
return computedColFilters
}

// constColsMap maps columns to constant values that we can infer from query
// filters.
//
// Note that for composite types, the constant value is not interchangeable with
// the column in all contexts. Composite types are types which can have
// logically equal but not identical values, like the decimals 1.0 and 1.00.
//
// For example:
// CREATE TABLE t (
// d DECIMAL,
// c DECIMAL AS (d*10) STORED
// );
// INSERT INTO t VALUES (1.0), (1.00), (1.000);
// SELECT c::STRING FROM t WHERE d=1;
// ----
// 10.0
// 10.00
// 10.000
//
// We can infer that c has a constant value of 1 but we can't replace it with 1
// in any expression.
type constColsMap map[opt.ColumnID]opt.ScalarExpr

// findConstantFilterCols adds to constFilterCols mappings from table column ID
// to the constant value of that column. It does this by iterating over the
// given lists of filters and finding expressions that constrain columns to a
Expand All @@ -280,9 +302,7 @@ func (c *CustomFuncs) computedColFilters(
// This would add a mapping from x => 5 and y => 'foo', which constants can
// then be used to prove that dependent computed columns are also constant.
func (c *CustomFuncs) findConstantFilterCols(
constFilterCols map[opt.ColumnID]opt.ScalarExpr,
scanPrivate *memo.ScanPrivate,
filters memo.FiltersExpr,
constFilterCols constColsMap, scanPrivate *memo.ScanPrivate, filters memo.FiltersExpr,
) {
tab := c.e.mem.Metadata().Table(scanPrivate.Table)
for i := range filters {
Expand All @@ -307,16 +327,7 @@ func (c *CustomFuncs) findConstantFilterCols(
continue
}

// Skip columns with a data type that uses a composite key encoding.
// Each of these data types can have multiple distinct values that
// compare equal. For example, 0 == -0 for the FLOAT data type. It's
// not safe to treat these as constant inputs to computed columns,
// since the computed expression may differentiate between the
// different forms of the same value.
colTyp := tab.Column(scanPrivate.Table.ColumnOrdinal(colID)).DatumType()
if colinfo.HasCompositeKeyEncoding(colTyp) {
continue
}

span := cons.Spans.Get(0)
if !span.HasSingleKey(c.e.evalCtx) {
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/opt/xform/select_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,9 @@ func (c *CustomFuncs) GenerateConstrainedScans(
// ID into a constant value, by evaluating it with respect to a set of other
// columns that are constant. If the computed column is constant, enter it into
// the constCols map and return false. Otherwise, return false.
//
func (c *CustomFuncs) tryFoldComputedCol(
tabMeta *opt.TableMeta, computedColID opt.ColumnID, constCols map[opt.ColumnID]opt.ScalarExpr,
tabMeta *opt.TableMeta, computedColID opt.ColumnID, constCols constColsMap,
) bool {
// Check whether computed column has already been folded.
if _, ok := constCols[computedColID]; ok {
Expand Down Expand Up @@ -423,6 +424,11 @@ func (c *CustomFuncs) tryFoldComputedCol(
}

computedCol := tabMeta.ComputedCols[computedColID]
if memo.CanBeCompositeSensitive(c.e.mem.Metadata(), computedCol) {
// The computed column expression can return different values for logically
// equal outer columns (e.g. d::STRING where d is a DECIMAL).
return false
}
replaced := replace(computedCol).(opt.ScalarExpr)

// If the computed column is constant, enter it into the constCols map.
Expand Down
Loading

0 comments on commit 15d3c04

Please sign in to comment.