From 4eb0bb737561fb34fbf6fd7d16ecd93dc7f51806 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Tue, 20 Jul 2021 13:06:07 -0400 Subject: [PATCH 1/7] base: drop SlowRequestThreshold to 15s This commit drops `SlowRequestThreshold` from 60s to 15s. This constant dictates the delay before we consider the following four operations to be slow enough to log and/or increment "slow request" metrics: - raft proposals - raft proposal quota acquisition - lease acquisition - latch acquisition --- pkg/base/constants.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/base/constants.go b/pkg/base/constants.go index f449b14a66c5..97e1e3159e28 100644 --- a/pkg/base/constants.go +++ b/pkg/base/constants.go @@ -28,7 +28,7 @@ const ( // SlowRequestThreshold is the amount of time to wait before considering a // request to be "slow". - SlowRequestThreshold = 60 * time.Second + SlowRequestThreshold = 15 * time.Second // ChunkRaftCommandThresholdBytes is the threshold in bytes at which // to chunk or otherwise limit commands being sent to Raft. From 0d4fc6e8d13d7770560bbbc4dcb1b61885480a99 Mon Sep 17 00:00:00 2001 From: Paul Bardea Date: Wed, 21 Jul 2021 20:37:46 -0400 Subject: [PATCH 2/7] roachtest: skip jobs/mixed-version This test was unskipped and was passing locally, but it turns out that the test wasn't even running any IMPORT jobs locally. This test will need further investigation to see if IMPORTing a workload between versions is supported. Release note: None --- pkg/cmd/roachtest/tests/mixed_version_jobs.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/cmd/roachtest/tests/mixed_version_jobs.go b/pkg/cmd/roachtest/tests/mixed_version_jobs.go index e10cf61f7630..944c3603cc0a 100644 --- a/pkg/cmd/roachtest/tests/mixed_version_jobs.go +++ b/pkg/cmd/roachtest/tests/mixed_version_jobs.go @@ -326,6 +326,7 @@ func registerJobsMixedVersions(r registry.Registry) { r.Add(registry.TestSpec{ Name: "jobs/mixed-versions", Owner: registry.OwnerBulkIO, + Skip: "#67587", // Jobs infrastructure was unstable prior to 20.1 in terms of the behavior // of `PAUSE/CANCEL JOB` commands which were best effort and relied on the // job itself to detect the request. These were fixed by introducing new job From 1bb666120f9a08a25855002e11233e1c3e323512 Mon Sep 17 00:00:00 2001 From: Marcus Gartner Date: Wed, 21 Jul 2021 18:53:22 -0700 Subject: [PATCH 3/7] importccl: disallow IMPORT TABLE with expression indexes This commit adds an explanatory error message when a user tries to execute an `IMPORT TABLE` statement with an expression index. Previously, the error confusingly read "unimplemented: only simple columns are supported as index elements". Now it reads "unimplemented: to import into a table with expression indexes, use IMPORT INTO". Release note: None --- pkg/ccl/importccl/csv_internal_test.go | 4 +++ pkg/ccl/importccl/import_stmt_test.go | 36 ++++++++++++++++++++++ pkg/ccl/importccl/import_table_creation.go | 9 +++++- 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/pkg/ccl/importccl/csv_internal_test.go b/pkg/ccl/importccl/csv_internal_test.go index 41ecfec2d42b..5040d9880e28 100644 --- a/pkg/ccl/importccl/csv_internal_test.go +++ b/pkg/ccl/importccl/csv_internal_test.go @@ -53,6 +53,10 @@ func TestMakeSimpleTableDescriptorErrors(t *testing.T) { stmt: "create table a (i int, j int as (i + 10) virtual)", error: `to import into a table with virtual computed columns, use IMPORT INTO`, }, + { + stmt: "create table a (i int, index ((i + 1)))", + error: `to import into a table with expression indexes, use IMPORT INTO`, + }, { stmt: `create table a ( i int check (i > 0), diff --git a/pkg/ccl/importccl/import_stmt_test.go b/pkg/ccl/importccl/import_stmt_test.go index d8a64240fa84..89bcb176f967 100644 --- a/pkg/ccl/importccl/import_stmt_test.go +++ b/pkg/ccl/importccl/import_stmt_test.go @@ -4481,6 +4481,15 @@ INSERT INTO users (a, b) VALUES (1, 2), (3, 4); format: "CSV", expectedResults: [][]string{{"1", "3", "2"}, {"3", "7", "4"}}, }, + { + into: true, + name: "import-into-csv-expression-index", + data: "1,2\n3,4", + create: "a INT, b INT, INDEX ((a + b))", + targetCols: "a, b", + format: "CSV", + expectedResults: [][]string{{"1", "2"}, {"3", "4"}}, + }, { into: true, name: "import-into-avro", @@ -4490,6 +4499,15 @@ INSERT INTO users (a, b) VALUES (1, 2), (3, 4); format: "AVRO", expectedResults: [][]string{{"1", "2", "3"}, {"3", "4", "7"}}, }, + { + into: true, + name: "import-into-avro-expression-index", + data: avroData, + create: "a INT, b INT, INDEX ((a + b))", + targetCols: "a, b", + format: "AVRO", + expectedResults: [][]string{{"1", "2"}, {"3", "4"}}, + }, { into: false, name: "import-table-csv", @@ -4506,6 +4524,14 @@ INSERT INTO users (a, b) VALUES (1, 2), (3, 4); format: "CSV", expectedError: "to import into a table with virtual computed columns, use IMPORT INTO", }, + { + into: false, + name: "import-table-csv-expression-index", + data: "35,23\n67,10", + create: "a INT, b INT, INDEX ((a + b))", + format: "CSV", + expectedError: "to import into a table with expression indexes, use IMPORT INTO", + }, { into: false, name: "import-table-avro", @@ -4522,6 +4548,14 @@ INSERT INTO users (a, b) VALUES (1, 2), (3, 4); format: "AVRO", expectedError: "to import into a table with virtual computed columns, use IMPORT INTO", }, + { + into: false, + name: "import-table-avro-expression-index", + data: avroData, + create: "a INT, b INT, INDEX ((a + b))", + format: "AVRO", + expectedError: "to import into a table with expression indexes, use IMPORT INTO", + }, { into: false, name: "pgdump", @@ -4535,6 +4569,8 @@ INSERT INTO users (a, b) VALUES (1, 2), (3, 4); defer sqlDB.Exec(t, `DROP TABLE IF EXISTS users`) data = test.data var importStmt string + // Enabled expression indexes. + sqlDB.Exec(t, `SET experimental_enable_expression_indexes=true`) if test.into { sqlDB.Exec(t, fmt.Sprintf(`CREATE TABLE users (%s)`, test.create)) importStmt = fmt.Sprintf(`IMPORT INTO users (%s) %s DATA (%q)`, diff --git a/pkg/ccl/importccl/import_table_creation.go b/pkg/ccl/importccl/import_table_creation.go index 5aea83c4665e..5e5639aa6661 100644 --- a/pkg/ccl/importccl/import_table_creation.go +++ b/pkg/ccl/importccl/import_table_creation.go @@ -157,9 +157,16 @@ func MakeSimpleTableDescriptor( switch def := create.Defs[i].(type) { case *tree.CheckConstraintTableDef, *tree.FamilyTableDef, - *tree.IndexTableDef, *tree.UniqueConstraintTableDef: // ignore + case *tree.IndexTableDef: + for i := range def.Columns { + if def.Columns[i].Expr != nil { + return nil, unimplemented.NewWithIssueDetail(56002, "import.expression-index", + "to import into a table with expression indexes, use IMPORT INTO") + } + } + case *tree.ColumnTableDef: if def.IsComputed() && def.IsVirtual() { return nil, unimplemented.NewWithIssueDetail(56002, "import.computed", From 1eebf3df494330d9aba06209f8134baf2110a4d4 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Wed, 21 Jul 2021 18:33:21 -0700 Subject: [PATCH 4/7] opt: better inference of computed columns involving composite types Composite types are those that can have values which are logically equal but not identical, for example decimals 1.0 and 1.00. We currently prevent inferring the value of a computed column if it depends on composite columns. This commit improves on this: we can infer the computed column as long as the expression is not "composite sensitive". For example, arithmetic operations are not composite sensitive, whereas conversion to string is. This improvement is necessary for sharded indexes on composite columns. Release note: None --- pkg/sql/opt/xform/general_funcs.go | 39 +++--- pkg/sql/opt/xform/select_funcs.go | 8 +- pkg/sql/opt/xform/testdata/rules/computed | 142 ++++++++++++++++++---- 3 files changed, 151 insertions(+), 38 deletions(-) diff --git a/pkg/sql/opt/xform/general_funcs.go b/pkg/sql/opt/xform/general_funcs.go index 29216796e488..b6ed5d9053da 100644 --- a/pkg/sql/opt/xform/general_funcs.go +++ b/pkg/sql/opt/xform/general_funcs.go @@ -13,7 +13,6 @@ package xform import ( "sort" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/opt" "github.com/cockroachdb/cockroach/pkg/sql/opt/cat" "github.com/cockroachdb/cockroach/pkg/sql/opt/constraint" @@ -247,7 +246,7 @@ func (c *CustomFuncs) computedColFilters( // Start with set of constant columns, as derived from the list of filter // conditions. - constCols := make(map[opt.ColumnID]opt.ScalarExpr) + constCols := make(constColsMap) c.findConstantFilterCols(constCols, scanPrivate, requiredFilters) c.findConstantFilterCols(constCols, scanPrivate, optionalFilters) if len(constCols) == 0 { @@ -270,6 +269,29 @@ func (c *CustomFuncs) computedColFilters( return computedColFilters } +// constColsMap maps columns to constant values that we can infer from query +// filters. +// +// Note that for composite types, the constant value is not interchangeable with +// the column in all contexts. Composite types are types which can have +// logically equal but not identical values, like the decimals 1.0 and 1.00. +// +// For example: +// CREATE TABLE t ( +// d DECIMAL, +// c DECIMAL AS (d*10) STORED +// ); +// INSERT INTO t VALUES (1.0), (1.00), (1.000); +// SELECT c::STRING FROM t WHERE d=1; +// ---- +// 10.0 +// 10.00 +// 10.000 +// +// We can infer that c has a constant value of 1 but we can't replace it with 1 +// in any expression. +type constColsMap map[opt.ColumnID]opt.ScalarExpr + // findConstantFilterCols adds to constFilterCols mappings from table column ID // to the constant value of that column. It does this by iterating over the // given lists of filters and finding expressions that constrain columns to a @@ -280,9 +302,7 @@ func (c *CustomFuncs) computedColFilters( // This would add a mapping from x => 5 and y => 'foo', which constants can // then be used to prove that dependent computed columns are also constant. func (c *CustomFuncs) findConstantFilterCols( - constFilterCols map[opt.ColumnID]opt.ScalarExpr, - scanPrivate *memo.ScanPrivate, - filters memo.FiltersExpr, + constFilterCols constColsMap, scanPrivate *memo.ScanPrivate, filters memo.FiltersExpr, ) { tab := c.e.mem.Metadata().Table(scanPrivate.Table) for i := range filters { @@ -307,16 +327,7 @@ func (c *CustomFuncs) findConstantFilterCols( continue } - // Skip columns with a data type that uses a composite key encoding. - // Each of these data types can have multiple distinct values that - // compare equal. For example, 0 == -0 for the FLOAT data type. It's - // not safe to treat these as constant inputs to computed columns, - // since the computed expression may differentiate between the - // different forms of the same value. colTyp := tab.Column(scanPrivate.Table.ColumnOrdinal(colID)).DatumType() - if colinfo.HasCompositeKeyEncoding(colTyp) { - continue - } span := cons.Spans.Get(0) if !span.HasSingleKey(c.e.evalCtx) { diff --git a/pkg/sql/opt/xform/select_funcs.go b/pkg/sql/opt/xform/select_funcs.go index 11e3cdc1640c..18943be35f49 100644 --- a/pkg/sql/opt/xform/select_funcs.go +++ b/pkg/sql/opt/xform/select_funcs.go @@ -390,8 +390,9 @@ func (c *CustomFuncs) GenerateConstrainedScans( // ID into a constant value, by evaluating it with respect to a set of other // columns that are constant. If the computed column is constant, enter it into // the constCols map and return false. Otherwise, return false. +// func (c *CustomFuncs) tryFoldComputedCol( - tabMeta *opt.TableMeta, computedColID opt.ColumnID, constCols map[opt.ColumnID]opt.ScalarExpr, + tabMeta *opt.TableMeta, computedColID opt.ColumnID, constCols constColsMap, ) bool { // Check whether computed column has already been folded. if _, ok := constCols[computedColID]; ok { @@ -423,6 +424,11 @@ func (c *CustomFuncs) tryFoldComputedCol( } computedCol := tabMeta.ComputedCols[computedColID] + if memo.CanBeCompositeSensitive(c.e.mem.Metadata(), computedCol) { + // The computed column expression can return different values for logically + // equal outer columns (e.g. d::STRING where d is a DECIMAL). + return false + } replaced := replace(computedCol).(opt.ScalarExpr) // If the computed column is constant, enter it into the constCols map. diff --git a/pkg/sql/opt/xform/testdata/rules/computed b/pkg/sql/opt/xform/testdata/rules/computed index 334eeb0ca210..567c4f48401f 100644 --- a/pkg/sql/opt/xform/testdata/rules/computed +++ b/pkg/sql/opt/xform/testdata/rules/computed @@ -11,14 +11,6 @@ CREATE TABLE t_int ( ) ---- -exec-ddl -CREATE TABLE t_float ( - k_float FLOAT, - c_float FLOAT AS (k_float + 1) STORED, - INDEX c_float_index (c_float, k_float) -) ----- - exec-ddl CREATE TABLE t_rand ( k_int INT, @@ -46,6 +38,26 @@ CREATE TABLE hashed ( ) ---- +exec-ddl +CREATE TABLE composite_types ( + pk INT PRIMARY KEY, + + i INT, + f FLOAT, + d DECIMAL, + + cf FLOAT AS (f+1) STORED, + cif FLOAT AS (i::FLOAT) VIRTUAL, + cd DECIMAL AS (d+1) VIRTUAL, + cs STRING AS (d::STRING) STORED, + + INDEX cf_idx (cf), + INDEX cif_idx (cif), + INDEX cd_idx (cd), + INDEX cs_idx (cs) +) +---- + # Constrain the index using computed column. Ensure that another computed column # depending on the same base column isn't included as a filter (c_int_2). opt @@ -127,21 +139,6 @@ select └── filters └── k_int:1 IS NULL [outer=(1), constraints=(/1: [/NULL - /NULL]; tight), fd=()-->(1)] -# Don't constrain the index for a FLOAT column, since the FLOAT data type uses -# a composite key encoding. -opt -SELECT k_float FROM t_float WHERE k_float = 5.0 ----- -select - ├── columns: k_float:1!null - ├── fd: ()-->(1) - ├── scan t_float - │ ├── columns: k_float:1 - │ └── computed column expressions - │ └── c_float:2 - │ └── k_float:1 + 1.0 - └── filters - └── k_float:1 = 5.0 [outer=(1), constraints=(/1: [/5.0 - /5.0]; tight), fd=()-->(1)] # Don't constrain the index when the computed column has a volatile function. opt @@ -171,3 +168,102 @@ scan null_col@ab ├── columns: a:1!null b:2 ├── constraint: /1/2/3: [/1/NULL - /1/NULL] └── fd: ()-->(1,2) + +# We should be able to infer the value of cf, because the expression is not +# composite-sensitive. +opt +SELECT pk FROM composite_types@cf_idx WHERE f=1 +---- +project + ├── columns: pk:1!null + ├── key: (1) + └── select + ├── columns: pk:1!null f:3!null + ├── key: (1) + ├── fd: ()-->(3) + ├── index-join composite_types + │ ├── columns: pk:1!null f:3 + │ ├── key: (1) + │ ├── fd: (1)-->(3) + │ └── scan composite_types@cf_idx + │ ├── columns: pk:1!null + │ ├── constraint: /5/1: [/2.0 - /2.0] + │ ├── flags: force-index=cf_idx + │ └── key: (1) + └── filters + └── f:3 = 1.0 [outer=(3), constraints=(/3: [/1.0 - /1.0]; tight), fd=()-->(3)] + +# We should be able to infer the value of cif, because the expression is not +# composite-sensitive (it does not depend on composite values). +opt +SELECT pk FROM composite_types@cif_idx WHERE i=1 +---- +project + ├── columns: pk:1!null + ├── key: (1) + └── select + ├── columns: pk:1!null i:2!null + ├── key: (1) + ├── fd: ()-->(2) + ├── index-join composite_types + │ ├── columns: pk:1!null i:2 + │ ├── key: (1) + │ ├── fd: (1)-->(2) + │ └── scan composite_types@cif_idx + │ ├── columns: pk:1!null + │ ├── constraint: /6/1: [/1.0 - /1.0] + │ ├── flags: force-index=cif_idx + │ └── key: (1) + └── filters + └── i:2 = 1 [outer=(2), constraints=(/2: [/1 - /1]; tight), fd=()-->(2)] + +# We should be able to infer the value of cd, because the expression is not +# composite-sensitive. +opt +SELECT pk FROM composite_types@cd_idx WHERE d=1 +---- +project + ├── columns: pk:1!null + ├── immutable + ├── key: (1) + └── select + ├── columns: pk:1!null d:4!null + ├── immutable + ├── key: (1) + ├── fd: ()-->(4) + ├── index-join composite_types + │ ├── columns: pk:1!null d:4 + │ ├── key: (1) + │ ├── fd: (1)-->(4) + │ └── scan composite_types@cd_idx + │ ├── columns: pk:1!null + │ ├── constraint: /7/1: [/2 - /2] + │ ├── flags: force-index=cd_idx + │ └── key: (1) + └── filters + └── d:4 = 1 [outer=(4), immutable, constraints=(/4: [/1 - /1]; tight), fd=()-->(4)] + +# We should not be able to infer the value of cs because the expression is +# composite-sensitive. +opt +SELECT pk FROM composite_types@cs_idx WHERE d=1 +---- +project + ├── columns: pk:1!null + ├── immutable + ├── key: (1) + └── select + ├── columns: pk:1!null d:4!null + ├── immutable + ├── key: (1) + ├── fd: ()-->(4) + ├── index-join composite_types + │ ├── columns: pk:1!null d:4 + │ ├── key: (1) + │ ├── fd: (1)-->(4) + │ └── scan composite_types@cs_idx + │ ├── columns: pk:1!null + │ ├── flags: force-index=cs_idx + │ └── key: (1) + └── filters + └── d:4 = 1 [outer=(4), immutable, constraints=(/4: [/1 - /1]; tight), fd=()-->(4)] From 70ac5b89ef1776e8ec51a5e80bc845e693d2fb8e Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Thu, 22 Jul 2021 15:19:43 +0200 Subject: [PATCH 5/7] clisqlclient: fix the error handling in QueryRow Release note: None --- pkg/cli/clisqlclient/conn.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/cli/clisqlclient/conn.go b/pkg/cli/clisqlclient/conn.go index 0e34cfb821d7..dc78b9a53040 100644 --- a/pkg/cli/clisqlclient/conn.go +++ b/pkg/cli/clisqlclient/conn.go @@ -523,13 +523,13 @@ func (c *sqlConn) Query(query string, args []driver.Value) (Rows, error) { return &sqlRows{rows: rows.(sqlRowsI), conn: c}, nil } -func (c *sqlConn) QueryRow(query string, args []driver.Value) ([]driver.Value, error) { +func (c *sqlConn) QueryRow(query string, args []driver.Value) (vals []driver.Value, resErr error) { rows, _, err := MakeQuery(query, args...)(c) if err != nil { return nil, err } - defer func() { _ = rows.Close() }() - vals := make([]driver.Value, len(rows.Columns())) + defer func() { resErr = errors.CombineErrors(resErr, rows.Close()) }() + vals = make([]driver.Value, len(rows.Columns())) err = rows.Next(vals) // Assert that there is just one row. @@ -538,7 +538,7 @@ func (c *sqlConn) QueryRow(query string, args []driver.Value) ([]driver.Value, e nextErr := rows.Next(nextVals) if nextErr != io.EOF { if nextErr != nil { - return nil, err + return nil, nextErr } return nil, errors.AssertionFailedf("programming error: %q: expected just 1 row of result, got more", query) } From 30a0eae28590f685dc9cf2b914c3c3896367e1bc Mon Sep 17 00:00:00 2001 From: Rail Aliiev Date: Thu, 22 Jul 2021 13:02:57 -0400 Subject: [PATCH 6/7] roachprod: delete EBS volumes on termination Fixes #67932 Previously, when a cluster was generated with `--local-ssd=false`, the EBS volume was created with `deleteOnTermination` set to `false`. As a result we ended up with many dangling volumes. The patch explicitly sets the `DeleteOnTermination` property for the `--local-ssd=false` case. Release note: None --- pkg/cmd/roachprod/vm/aws/aws.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/cmd/roachprod/vm/aws/aws.go b/pkg/cmd/roachprod/vm/aws/aws.go index 45c089ef653b..09ac47ab1372 100644 --- a/pkg/cmd/roachprod/vm/aws/aws.go +++ b/pkg/cmd/roachprod/vm/aws/aws.go @@ -862,12 +862,14 @@ func (p *Provider) runInstance(name string, zone string, opts vm.CreateOpts) err if !opts.SSDOpts.UseLocalSSD { if len(p.opts.EBSVolumes) == 0 && p.opts.DefaultEBSVolume.Disk.VolumeType == "" { p.opts.DefaultEBSVolume.Disk.VolumeType = defaultEBSVolumeType + p.opts.DefaultEBSVolume.Disk.DeleteOnTermination = true } if p.opts.DefaultEBSVolume.Disk.VolumeType != "" { // Add default volume to the list of volumes we'll setup. v := p.opts.EBSVolumes.newVolume() v.Disk = p.opts.DefaultEBSVolume.Disk + v.Disk.DeleteOnTermination = true p.opts.EBSVolumes = append(p.opts.EBSVolumes, v) } } From 14c1bb469bd4a714629770dac85315040a1a6305 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Thu, 22 Jul 2021 10:43:00 -0400 Subject: [PATCH 7/7] changefeedccl: Correctly manage protected timestamps and schema change boundary. Fix a minor logic bug where if we previously delayed job progress update, we may then fail to manage protected timestamp for some time. In addition, always checkpoing when reaching schema change boundary. We must always checkpoint when reaching boundary because the changefeed may need to restart (e.g. during primary key change). Release Notes: None --- pkg/ccl/changefeedccl/changefeed_processors.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index bc19409d6a81..2d6a583e327a 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -1306,10 +1306,17 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error { } func (cf *changeFrontier) maybeCheckpointJob(frontierChanged, isBehind bool) (bool, error) { + // See if we can update highwatermark. + // We always update watermark when we reach schema change boundary. + // Otherwise, we checkpoint when frontier changes + // (and we can checkpoint), or if it has changed in the past, but we skipped that update. + updateHighWater := + cf.frontier.schemaChangeBoundaryReached() || cf.js.canCheckpointHighWatermark(frontierChanged) + // Update checkpoint if the frontier has not changed, but it is time to checkpoint. // If the frontier has changed, we want to write an empty checkpoint record indicating // that all spans have reached the frontier. - updateCheckpoint := !frontierChanged && cf.js.canCheckpointFrontier() + updateCheckpoint := !updateHighWater && cf.js.canCheckpointFrontier() var checkpoint jobspb.ChangefeedProgress_Checkpoint if updateCheckpoint { @@ -1317,10 +1324,11 @@ func (cf *changeFrontier) maybeCheckpointJob(frontierChanged, isBehind bool) (bo checkpoint.Spans = cf.frontier.getCheckpointSpans(maxBytes) } - if updateCheckpoint || cf.js.canCheckpointHighWatermark(frontierChanged) { + if updateCheckpoint || updateHighWater { + manageProtected := updateHighWater checkpointStart := timeutil.Now() if err := cf.checkpointJobProgress( - cf.frontier.Frontier(), frontierChanged, checkpoint, isBehind, + cf.frontier.Frontier(), manageProtected, checkpoint, isBehind, ); err != nil { return false, err }