Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
84514: sql/schemachanger: implemented ALTER PRIMARY KEY for vanilla case  r=postamar a=Xiang-Gu

The PR implements `ALTER PRIMARY KEY` under the declarative schema
changer framework that handles the simplest, "vanilla" case like
```
CREATE TABLE t (i INT PRIMARY KEY, j INT NOT NULL)
ALTER TABLE t ALTER PRIMARY KEY USING COLUMNS (j)
```

This is the first of a series PRs where followup PRs will expand its
capabilities to be able to handle more complex cases, including
  1. Allow the requested new primary key to be hash-sharded;
  2. Consider the case where altering primary key requires us to
     modify existing secondary indexes(see the legacy schema change
     about in what cases we should rewrite)
  3. Consider the case where the old primary index is on the implicitly
     created `rowid` column, in which case we also need to drop that
     column;
  5. Consider partitioning and locality (I'm not sure what they are,
     and why they play a role when `ALTER PRIMARY KEY` but I've seen
     them in the old schema changer, so I assume we ought to do
     something about them too here).
  6. Support `ALTER PRIMARY KEY` with concurrent schema change
      statements. E.g.
 ```ALTER TABLE t ADD COLUMN k INT NOT NULL DEFAULT 30, ALTER PRIMARY KEY USING COLUMNS (j);```

related: #83932
Release note: None

84718: sql: populate query-level stats earlier & add contention to telemetry log r=THardy98 a=THardy98

Addresses: #71328

This change adds contention time (measured in nanoseconds) to the
`SampledQuery` telemetry log.

To accomodate this change, we needed to collect query-level statistics
earlier. Previously, query-level statistics were fetched when we called
`Finish` under the `instrumentationHelper`, however this occurred after
we had already emitted our query execution logs. Now, we collect
query-level stats in `dispatchToExecutionEngine` after we've executed
the query.

As a tradeoff to collecting query-level stats earlier, we need to fetch
the trace twice:
- once when populating query-level stats (trace is required to compute
  query-level stats) at `populateQueryLevelStats` in
`dispatchToExecutionEngine` after query execution
- once during the instrumentation helper's `Finish` (as we do currently)

This allows us to collect query-level stats earlier without omitting any
tracing events we record currently. This approach is safer, with the
additional overhead of fetching the trace twice only occuring at the
tracing sampling rate of 1-2%, which is fairly conservative. The concern
with only fetching the trace at query-level stats population was the
ommission of a number of events that occur in
`commitSQLTransactionInternal` (or any execution paths that don't lead
to `dispatchToExecutionEngine`).

Release note (sql change): Add `ContentionTime` field to the
`SampledQuery` telemetry log. Query-level statistics are collected
earlier to facilitate the adding of contention time to query execution
logs. The earlier collection of query-level statistics requires the
additional overhead of fetching the query's trace twice (instead of
previously once).

85280: sql, server: add new system privileges for observability r=Santamaura a=Santamaura

This patch introduces 2 new system privileges
VIEWDEBUG and VIEWCLUSTERMETADATA. VIEWDEBUG
will now be used to gate taking traces and viewing
debug endpoints. VIEWCLUSTERMETADATA will now be
used to gate the node and range reports.

Resolves #83844, #83856, #83857, #83858, #83861

Release note (sql change): add VIEWDEBUG and
VIEWCLUSTERMETADATA system privileges.

85458: changefeedccl: add retries to sinkless changefeeds r=jayshrivastava a=jayshrivastava

This change updates core/sinkless changefeeds to run in a retry loop, allowing for changefeed restarts in case of transient errors or declarative schema changes. 

See commit notes for more details.

Fixes #85008

85819: kv: use max timestamp during below-Raft scan to gossip liveness r=nvanbenschoten a=nvanbenschoten

Related to https://github.com/cockroachlabs/support/issues/1573.

This commit switches `MaybeGossipNodeLivenessRaftMuLocked` to evaluate its scan
at the maximum timestamp instead of at the local node's HLC time. This ensures
that we gossip the most recent liveness record, regardless of what timestamp it
is written at.

85822: colbuilder: fall back to row-by-row processor wrapping for many renders r=yuzefovich a=yuzefovich

**colbuilder: add a microbenchmark for running many render expressions**

This commit adds a microbenchmark of queries with many render
expressions. It'll be used in the following commit to tune when we fall
back to wrapping a row-by-row processor to handle those renders.

Release note: None

**colbuilder: fall back to row-by-row processor wrapping for many renders**

This commit introduces a mechanism to handle render expressions by
wrapping a row-by-row processor into the vectorized flow when
1. the estimated number of rows to go through the renders is relatively
small
2. the number of renders is relatively high.

The idea is that the vectorized projection operators have higher
overhead when many of them are planned AND there is not enough data to
amortize the overhead, so to improve the performance in those cases
we'll use the row-by-row noop processor. Both of the thresholds are
controlled by cluster settings and the defaults were chosen based on
a representative microbenchmark.

It's worth pointing out that we only have the estimated row count for
the scan operators, so the change has limited applicability.

```
RenderPlanning/rows=1/renders=1-24           407µs ± 2%     408µs ± 2%     ~     (p=0.684 n=10+10)
RenderPlanning/rows=1/renders=8-24           516µs ± 1%     537µs ± 1%   +4.05%  (p=0.000 n=10+10)
RenderPlanning/rows=1/renders=32-24          832µs ± 1%     811µs ± 1%   -2.59%  (p=0.000 n=10+10)
RenderPlanning/rows=1/renders=64-24         1.22ms ± 0%    1.14ms ± 1%   -6.62%  (p=0.000 n=9+10)
RenderPlanning/rows=1/renders=128-24        2.02ms ± 0%    1.80ms ± 1%  -11.18%  (p=0.000 n=8+9)
RenderPlanning/rows=1/renders=512-24        7.75ms ± 1%    5.75ms ± 1%  -25.77%  (p=0.000 n=10+9)
RenderPlanning/rows=1/renders=4096-24        160ms ± 1%      62ms ± 1%  -61.51%  (p=0.000 n=10+9)
RenderPlanning/rows=4/renders=1-24           438µs ± 2%     438µs ± 1%     ~     (p=0.853 n=10+10)
RenderPlanning/rows=4/renders=8-24           603µs ± 1%     633µs ± 2%   +5.06%  (p=0.000 n=10+10)
RenderPlanning/rows=4/renders=32-24         1.08ms ± 1%    1.08ms ± 1%     ~     (p=0.105 n=10+10)
RenderPlanning/rows=4/renders=64-24         1.72ms ± 0%    1.62ms ± 0%   -5.83%  (p=0.000 n=9+9)
RenderPlanning/rows=4/renders=128-24        3.01ms ± 1%    2.75ms ± 1%   -8.78%  (p=0.000 n=10+10)
RenderPlanning/rows=4/renders=512-24        11.6ms ± 1%     9.6ms ± 2%  -17.58%  (p=0.000 n=10+10)
RenderPlanning/rows=4/renders=4096-24        192ms ± 2%      91ms ± 2%  -52.58%  (p=0.000 n=10+10)
RenderPlanning/rows=16/renders=1-24          494µs ± 1%     499µs ± 1%   +1.03%  (p=0.006 n=10+8)
RenderPlanning/rows=16/renders=8-24          855µs ± 1%     901µs ± 1%   +5.37%  (p=0.000 n=10+10)
RenderPlanning/rows=16/renders=32-24        2.03ms ± 1%    2.04ms ± 1%     ~     (p=0.190 n=10+10)
RenderPlanning/rows=16/renders=64-24        3.58ms ± 1%    3.42ms ± 1%   -4.56%  (p=0.000 n=10+9)
RenderPlanning/rows=16/renders=128-24       6.74ms ± 1%    6.31ms ± 1%   -6.37%  (p=0.000 n=10+10)
RenderPlanning/rows=16/renders=512-24       26.9ms ± 1%    24.7ms ± 1%   -8.24%  (p=0.000 n=9+10)
RenderPlanning/rows=16/renders=4096-24       329ms ± 2%     218ms ± 2%  -33.66%  (p=0.000 n=10+10)
RenderPlanning/rows=64/renders=1-24          666µs ± 1%     659µs ± 2%   -1.07%  (p=0.007 n=10+10)
RenderPlanning/rows=64/renders=8-24         1.79ms ± 1%    1.84ms ± 1%   +3.01%  (p=0.000 n=10+10)
RenderPlanning/rows=64/renders=32-24        5.53ms ± 1%    5.79ms ± 2%   +4.74%  (p=0.000 n=10+10)
RenderPlanning/rows=64/renders=64-24        10.8ms ± 1%    11.0ms ± 1%   +1.87%  (p=0.000 n=10+9)
RenderPlanning/rows=64/renders=128-24       21.2ms ± 1%    21.7ms ± 1%   +2.71%  (p=0.000 n=10+10)
RenderPlanning/rows=64/renders=512-24       83.6ms ± 0%    84.9ms ± 0%   +1.47%  (p=0.000 n=10+7)
RenderPlanning/rows=64/renders=4096-24       824ms ± 1%     751ms ± 2%   -8.88%  (p=0.000 n=10+10)
RenderPlanning/rows=128/renders=1-24         853µs ± 1%     851µs ± 1%     ~     (p=0.481 n=10+10)
RenderPlanning/rows=128/renders=8-24        2.98ms ± 1%    3.11ms ± 1%   +4.32%  (p=0.000 n=10+10)
RenderPlanning/rows=128/renders=32-24       10.4ms ± 1%    10.9ms ± 1%   +5.44%  (p=0.000 n=10+10)
RenderPlanning/rows=128/renders=64-24       20.1ms ± 1%    21.3ms ± 1%   +5.99%  (p=0.000 n=10+10)
RenderPlanning/rows=128/renders=128-24      39.7ms ± 1%    42.1ms ± 2%   +5.98%  (p=0.000 n=10+10)
RenderPlanning/rows=128/renders=512-24       160ms ± 1%     168ms ± 2%   +5.13%  (p=0.000 n=9+10)
RenderPlanning/rows=128/renders=4096-24      1.44s ± 1%     1.48s ± 2%   +3.15%  (p=0.000 n=9+10)
RenderPlanning/rows=256/renders=1-24        1.22ms ± 1%    1.21ms ± 1%   -1.01%  (p=0.000 n=10+10)
RenderPlanning/rows=256/renders=8-24        5.22ms ± 0%    5.19ms ± 1%   -0.54%  (p=0.011 n=8+9)
RenderPlanning/rows=256/renders=32-24       19.9ms ± 1%    20.0ms ± 1%     ~     (p=0.182 n=9+10)
RenderPlanning/rows=256/renders=64-24       39.0ms ± 0%    38.9ms ± 0%   -0.33%  (p=0.023 n=10+10)
RenderPlanning/rows=256/renders=128-24      76.8ms ± 1%    76.7ms ± 1%     ~     (p=0.739 n=10+10)
RenderPlanning/rows=256/renders=512-24       316ms ± 1%     319ms ± 1%   +1.15%  (p=0.001 n=9+10)
RenderPlanning/rows=256/renders=4096-24      2.75s ± 1%     2.73s ± 1%   -0.64%  (p=0.002 n=8+9)
```

Fixes: #85632.

Release note: None

Co-authored-by: Xiang Gu <[email protected]>
Co-authored-by: Thomas Hardy <[email protected]>
Co-authored-by: Santamaura <[email protected]>
Co-authored-by: Jayant Shrivastava <[email protected]>
Co-authored-by: Nathan VanBenschoten <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
7 people committed Aug 9, 2022
7 parents 7c18668 + 8a25332 + fd1b51f + 71f0298 + 3b40478 + 51c5a38 + f5a28fb commit 8bcd0cd
Show file tree
Hide file tree
Showing 57 changed files with 5,171 additions and 188 deletions.
1 change: 1 addition & 0 deletions docs/generated/eventlog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2553,6 +2553,7 @@ contains common SQL event/execution details.
| `InvertedJoinCount` | The number of inverted joins in the query plan. | no |
| `ApplyJoinCount` | The number of apply joins in the query plan. | no |
| `ZigZagJoinCount` | The number of zig zag joins in the query plan. | no |
| `ContentionNanos` | The duration of time in nanoseconds that the query experienced contention. | no |


#### Common fields
Expand Down
2 changes: 2 additions & 0 deletions docs/generated/sql/bnf/stmt_block.bnf
Original file line number Diff line number Diff line change
Expand Up @@ -1360,7 +1360,9 @@ unreserved_keyword ::=
| 'VIEW'
| 'VIEWACTIVITY'
| 'VIEWACTIVITYREDACTED'
| 'VIEWCLUSTERMETADATA'
| 'VIEWCLUSTERSETTING'
| 'VIEWDEBUG'
| 'VISIBLE'
| 'VOLATILE'
| 'VOTERS'
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,7 +1038,7 @@ func (cf *changeFrontier) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetad
// Detect whether this boundary should be used to kill or restart the
// changefeed.
if cf.frontier.boundaryType == jobspb.ResolvedSpan_RESTART {
err = changefeedbase.MarkRetryableError(err)
err = changefeedbase.MarkRetryableErrorWithTimestamp(err, cf.frontier.boundaryTime)
}
}

Expand Down
49 changes: 39 additions & 10 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ import (
"github.com/cockroachdb/errors"
)

var changefeedRetryOptions = retry.Options{
InitialBackoff: 5 * time.Millisecond,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
}

// featureChangefeedEnabled is used to enable and disable the CHANGEFEED feature.
var featureChangefeedEnabled = settings.RegisterBoolSetting(
settings.TenantWritable,
Expand Down Expand Up @@ -196,10 +202,38 @@ func changefeedPlanHook(

telemetry.Count(`changefeed.create.core`)
logChangefeedCreateTelemetry(ctx, jr)
err := distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh)
if err != nil {
telemetry.Count(`changefeed.core.error`)

var err error
for r := retry.StartWithCtx(ctx, changefeedRetryOptions); r.Next(); {
if err = distChangefeedFlow(ctx, p, 0 /* jobID */, details, progress, resultsCh); err == nil {
return nil
}

if knobs, ok := p.ExecCfg().DistSQLSrv.TestingKnobs.Changefeed.(*TestingKnobs); ok {
if knobs != nil && knobs.HandleDistChangefeedError != nil {
err = knobs.HandleDistChangefeedError(err)
}
}

if !changefeedbase.IsRetryableError(err) {
log.Warningf(ctx, `CHANGEFEED returning with error: %+v`, err)
return err
}

// Check for a schemachange boundary timestamp returned via a
// retryable error. Retrying without updating the changefeed progress
// will result in the changefeed performing the schema change again,
// causing an infinite loop.
if ts, ok := changefeedbase.MaybeGetRetryableErrorTimestamp(err); ok {
progress = jobspb.Progress{
Progress: &jobspb.Progress_HighWater{HighWater: &ts},
Details: &jobspb.Progress_Changefeed{
Changefeed: &jobspb.ChangefeedProgress{},
},
}
}
}
telemetry.Count(`changefeed.core.error`)
return changefeedbase.MaybeStripRetryableErrorMarker(err)
}

Expand Down Expand Up @@ -934,15 +968,10 @@ func (b *changefeedResumer) resumeWithRetries(
// bubbles up to this level, we'd like to "retry" the flow if possible. This
// could be because the sink is down or because a cockroach node has crashed
// or for many other reasons.
opts := retry.Options{
InitialBackoff: 5 * time.Millisecond,
Multiplier: 2,
MaxBackoff: 10 * time.Second,
}
var err error
var lastRunStatusUpdate time.Time

for r := retry.StartWithCtx(ctx, opts); r.Next(); {
for r := retry.StartWithCtx(ctx, changefeedRetryOptions); r.Next(); {
// startedCh is normally used to signal back to the creator of the job that
// the job has started; however, in this case nothing will ever receive
// on the channel, causing the changefeed flow to block. Replace it with
Expand All @@ -959,7 +988,7 @@ func (b *changefeedResumer) resumeWithRetries(
}
}

// Retry changefeed is error is retryable. In addition, we want to handle
// Retry changefeed if error is retryable. In addition, we want to handle
// context cancellation as retryable, but only if the resumer context has not been cancelled.
// (resumer context is canceled by the jobs framework -- so we should respect it).
isRetryableErr := changefeedbase.IsRetryableError(err) ||
Expand Down
89 changes: 34 additions & 55 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ func TestNoStopAfterNonTargetColumnDrop(t *testing.T) {
}
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
cdcTest(t, testFn)
}

// If we drop columns which are not targeted by the changefeed, it should not backfill.
Expand Down Expand Up @@ -1060,7 +1060,7 @@ func TestNoBackfillAfterNonTargetColumnDrop(t *testing.T) {
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
cdcTest(t, testFn)
}

func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) {
Expand Down Expand Up @@ -1105,7 +1105,7 @@ func TestChangefeedColumnDropsWithFamilyAndNonFamilyTargets(t *testing.T) {
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
cdcTest(t, testFn)
}

func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) {
Expand Down Expand Up @@ -1150,7 +1150,7 @@ func TestChangefeedColumnDropsOnMultipleFamiliesWithTheSameName(t *testing.T) {
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
cdcTest(t, testFn)
}

func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {
Expand Down Expand Up @@ -1182,7 +1182,7 @@ func TestChangefeedColumnDropsOnTheSameTableWithMultipleFamilies(t *testing.T) {
})
}

cdcTest(t, testFn, feedTestOmitSinks("sinkless"))
cdcTest(t, testFn)
}

func TestChangefeedExternalIODisabled(t *testing.T) {
Expand Down Expand Up @@ -5294,36 +5294,6 @@ func TestChangefeedPrimaryKeyChangeWorks(t *testing.T) {
foo := feed(t, f, baseStmt)
defer closeFeed(t, foo)

// maybeHandleRestart deals with the fact that sinkless changefeeds don't
// gracefully handle primary index changes but rather force the client to
// deal with restarting the changefeed as of the last resolved timestamp.
//
// This ends up being pretty sane; sinkless changefeeds already require this
// behavior in the face of other transient failures so clients already need
// to implement this logic.
maybeHandleRestart := func(t *testing.T) (cleanup func()) {
return func() {}
}
if strings.HasSuffix(t.Name(), "sinkless") {
maybeHandleRestart = func(t *testing.T) func() {
var resolved hlc.Timestamp
for {
m, err := foo.Next()
if err != nil {
assert.Contains(t, err.Error(),
fmt.Sprintf("schema change occurred at %s", resolved.Next().AsOfSystemTime()))
break
}
resolved = extractResolvedTimestamp(t, m)
}
const restartStmt = baseStmt + ", cursor = $1"
foo = feed(t, f, restartStmt, resolved.AsOfSystemTime())
return func() {
closeFeed(t, foo)
}
}
}

// 'initial' is skipped because only the latest value ('updated') is
// emitted by the initial scan.
assertPayloads(t, foo, []string{
Expand All @@ -5337,7 +5307,6 @@ func TestChangefeedPrimaryKeyChangeWorks(t *testing.T) {
})

sqlDB.Exec(t, `ALTER TABLE foo ALTER PRIMARY KEY USING COLUMNS (b)`)
defer maybeHandleRestart(t)()
sqlDB.Exec(t, `INSERT INTO foo VALUES (3, 'c'), (4, 'd')`)
assertPayloads(t, foo, []string{
`foo: ["c"]->{"after": {"a": 3, "b": "c"}}`,
Expand All @@ -5360,7 +5329,6 @@ INSERT INTO foo VALUES (1, 'f');
`foo: ["a"]->{"after": {"a": 6, "b": "a"}}`,
`foo: ["e"]->{"after": {"a": 5, "b": "e"}}`,
})
defer maybeHandleRestart(t)()
assertPayloads(t, foo, []string{
`foo: [1]->{"after": {"a": 1, "b": "f"}}`,
})
Expand Down Expand Up @@ -5406,24 +5374,6 @@ func TestChangefeedPrimaryKeyChangeWorksWithMultipleTables(t *testing.T) {
maybeHandleRestart := func(t *testing.T) (cleanup func()) {
return func() {}
}
if strings.HasSuffix(t.Name(), "sinkless") {
maybeHandleRestart = func(t *testing.T) func() {
var resolvedTS hlc.Timestamp
for {
m, err := cf.Next()
if err != nil {
assert.Contains(t, err.Error(), fmt.Sprintf("schema change occurred at %s", resolvedTS.Next().AsOfSystemTime()))
break
}
resolvedTS = extractResolvedTimestamp(t, m)
}
const restartStmt = baseStmt + ", cursor = $1"
cf = feed(t, f, restartStmt, resolvedTS.AsOfSystemTime())
return func() {
closeFeed(t, cf)
}
}
}

// 'initial' is skipped because only the latest value ('updated') is
// emitted by the initial scan.
Expand Down Expand Up @@ -7110,3 +7060,32 @@ func TestChangefeedTestTimesOut(t *testing.T) {

cdcTest(t, testFn)
}

// Regression for #85008.
func TestSchemachangeDoesNotBreakSinklessFeed(t *testing.T) {
defer leaktest.AfterTest(t)()
testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE TABLE mytable (id INT PRIMARY KEY)`)
sqlDB.Exec(t, `INSERT INTO mytable VALUES (0)`)

// Open up the changefeed.
cf := feed(t, f, `CREATE CHANGEFEED FOR TABLE mytable`)
defer closeFeed(t, cf)
assertPayloads(t, cf, []string{
`mytable: [0]->{"after": {"id": 0}}`,
})

sqlDB.Exec(t, `ALTER TABLE mytable ADD COLUMN val INT DEFAULT 0`)
assertPayloads(t, cf, []string{
`mytable: [0]->{"after": {"id": 0, "val": 0}}`,
})
sqlDB.Exec(t, `INSERT INTO mytable VALUES (1,1)`)
assertPayloads(t, cf, []string{
`mytable: [1]->{"after": {"id": 1, "val": 1}}`,
})
}

cdcTest(t, testFn, feedTestForceSink("sinkless"))
}
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/changefeedbase/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/sql",
"//pkg/sql/catalog/descpb",
"//pkg/sql/flowinfra",
"//pkg/util/hlc",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand Down
27 changes: 26 additions & 1 deletion pkg/ccl/changefeedccl/changefeedbase/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/flowinfra"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -77,7 +78,14 @@ func (e *taggedError) Unwrap() error { return e.wrapped }
const retryableErrorString = "retryable changefeed error"

type retryableError struct {
wrapped error
// A schema change may result in a changefeed returning retryableError,
// which can signal the changefeed to restart.
// boundaryTimestamp can be returned inside this error so
// the changefeed knows where to restart from. Note this is
// only useful for sinkless/core changefeeds because they do not have
// the ability to read/write their state to jobs tables during restarts.
boundaryTimestamp hlc.Timestamp
wrapped error
}

// MarkRetryableError wraps the given error, marking it as retryable to
Expand All @@ -86,6 +94,12 @@ func MarkRetryableError(e error) error {
return &retryableError{wrapped: e}
}

// MarkRetryableErrorWithTimestamp wraps the given error, marks it as
// retryable, and attaches a timestamp to the error.
func MarkRetryableErrorWithTimestamp(e error, ts hlc.Timestamp) error {
return &retryableError{boundaryTimestamp: ts, wrapped: e}
}

// Error implements the error interface.
func (e *retryableError) Error() string {
return fmt.Sprintf("%s: %s", retryableErrorString, e.wrapped.Error())
Expand Down Expand Up @@ -125,6 +139,17 @@ func IsRetryableError(err error) bool {
errors.Is(err, sql.ErrPlanChanged))
}

// MaybeGetRetryableErrorTimestamp will get the timestamp of an error if
// the error is a retryableError and the timestamp field is populated.
func MaybeGetRetryableErrorTimestamp(err error) (timestamp hlc.Timestamp, ok bool) {
if retryableErr := (*retryableError)(nil); errors.As(err, &retryableErr) {
if !retryableErr.boundaryTimestamp.IsEmpty() {
return retryableErr.boundaryTimestamp, true
}
}
return hlc.Timestamp{}, false
}

// MaybeStripRetryableErrorMarker performs some minimal attempt to clean the
// RetryableError marker out. This won't do anything if the RetryableError
// itself has been wrapped, but that's okay, we'll just have an uglier string.
Expand Down
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/uncertainty"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)
Expand Down Expand Up @@ -83,7 +84,9 @@ func (r *Replica) MaybeGossipNodeLivenessRaftMuLocked(
}

ba := roachpb.BatchRequest{}
ba.Timestamp = r.store.Clock().Now()
// Read at the maximum timestamp to ensure that we see the most recent
// liveness record, regardless of what timestamp it is written at.
ba.Timestamp = hlc.MaxTimestamp
ba.Add(&roachpb.ScanRequest{RequestHeader: roachpb.RequestHeaderFromSpan(span)})
// Call evaluateBatch instead of Send to avoid reacquiring latches.
rec := NewReplicaEvalContext(
Expand Down
Loading

0 comments on commit 8bcd0cd

Please sign in to comment.