diff --git a/pkg/kv/kvserver/spanset/batch.go b/pkg/kv/kvserver/spanset/batch.go index a09bc41d2e34..2ae3a19634d1 100644 --- a/pkg/kv/kvserver/spanset/batch.go +++ b/pkg/kv/kvserver/spanset/batch.go @@ -402,7 +402,7 @@ func (s spanSetReader) Closed() bool { return s.r.Closed() } -// ExportMVCCToSst is part of the engine.Reader interface. +// ExportMVCCToSst is part of the storage.Reader interface. func (s spanSetReader) ExportMVCCToSst( startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp, @@ -479,10 +479,16 @@ func (s spanSetReader) NewEngineIterator(opts storage.IterOptions) storage.Engin } } +// ConsistentIterators implements the storage.Reader interface. func (s spanSetReader) ConsistentIterators() bool { return s.r.ConsistentIterators() } +// PinEngineStateForIterators implements the storage.Reader interface. +func (s spanSetReader) PinEngineStateForIterators() error { + return s.r.PinEngineStateForIterators() +} + // GetDBEngine recursively searches for the underlying rocksDB engine. func GetDBEngine(reader storage.Reader, span roachpb.Span) storage.Reader { switch v := reader.(type) { @@ -495,7 +501,7 @@ func GetDBEngine(reader storage.Reader, span roachpb.Span) storage.Reader { } } -// getSpanReader is a getter to access the engine.Reader field of the +// getSpanReader is a getter to access the storage.Reader field of the // spansetReader. func getSpanReader(r ReadWriter, span roachpb.Span) storage.Reader { if err := r.spanSetReader.spans.CheckAllowed(SpanReadOnly, span); err != nil { @@ -700,7 +706,7 @@ func makeSpanSetReadWriterAt(rw storage.ReadWriter, spans *SpanSet, ts hlc.Times } } -// NewReadWriterAt returns an engine.ReadWriter that asserts access of the +// NewReadWriterAt returns a storage.ReadWriter that asserts access of the // underlying ReadWriter against the given SpanSet at a given timestamp. // If zero timestamp is provided, accesses are considered non-MVCC. func NewReadWriterAt(rw storage.ReadWriter, spans *SpanSet, ts hlc.Timestamp) storage.ReadWriter { @@ -734,7 +740,7 @@ func (s spanSetBatch) Repr() []byte { return s.b.Repr() } -// NewBatch returns an engine.Batch that asserts access of the underlying +// NewBatch returns a storage.Batch that asserts access of the underlying // Batch against the given SpanSet. We only consider span boundaries, associated // timestamps are not considered. func NewBatch(b storage.Batch, spans *SpanSet) storage.Batch { @@ -746,7 +752,7 @@ func NewBatch(b storage.Batch, spans *SpanSet) storage.Batch { } } -// NewBatchAt returns an engine.Batch that asserts access of the underlying +// NewBatchAt returns an storage.Batch that asserts access of the underlying // Batch against the given SpanSet at the given timestamp. // If the zero timestamp is used, all accesses are considered non-MVCC. func NewBatchAt(b storage.Batch, spans *SpanSet, ts hlc.Timestamp) storage.Batch { diff --git a/pkg/sql/opt/exec/execbuilder/testdata/hash_sharded_index b/pkg/sql/opt/exec/execbuilder/testdata/hash_sharded_index index 05a9dd7457ed..35d39ff35bb0 100644 --- a/pkg/sql/opt/exec/execbuilder/testdata/hash_sharded_index +++ b/pkg/sql/opt/exec/execbuilder/testdata/hash_sharded_index @@ -28,7 +28,7 @@ vectorized: true └── • render │ columns: (crdb_internal_a_shard_11_comp, column1) │ estimated row count: 2 - │ render crdb_internal_a_shard_11_comp: mod(fnv32(COALESCE(column1::STRING, '')), 11) + │ render crdb_internal_a_shard_11_comp: mod(fnv32(COALESCE(column1::STRING, '')), 11)::INT4 │ render column1: column1 │ └── • values @@ -63,7 +63,7 @@ vectorized: true └── • render │ columns: (crdb_internal_a_shard_12_comp, rowid_default, column1) │ estimated row count: 2 - │ render crdb_internal_a_shard_12_comp: mod(fnv32(COALESCE(column1::STRING, '')), 12) + │ render crdb_internal_a_shard_12_comp: mod(fnv32(COALESCE(column1::STRING, '')), 12)::INT4 │ render rowid_default: unique_rowid() │ render column1: column1 │ diff --git a/pkg/sql/opt/optbuilder/scope.go b/pkg/sql/opt/optbuilder/scope.go index c0bcc1ab880e..787e12f2fdb6 100644 --- a/pkg/sql/opt/optbuilder/scope.go +++ b/pkg/sql/opt/optbuilder/scope.go @@ -458,7 +458,7 @@ func (s *scope) resolveAndRequireType(expr tree.Expr, desired *types.T) tree.Typ if err != nil { panic(err) } - return s.ensureNullType(texpr, desired) + return tree.ReType(s.ensureNullType(texpr, desired), desired) } // ensureNullType tests the type of the given expression. If types.Unknown, then diff --git a/pkg/sql/opt/optbuilder/testdata/insert b/pkg/sql/opt/optbuilder/testdata/insert index 90e9eecfc3ac..90cfd4b2d41e 100644 --- a/pkg/sql/opt/optbuilder/testdata/insert +++ b/pkg/sql/opt/optbuilder/testdata/insert @@ -1168,13 +1168,13 @@ insert decimals │ │ │ │ │ ├── columns: column1:7!null column2:8 │ │ │ │ │ └── (1.1, ARRAY[0.95,NULL,15]) │ │ │ │ └── projections - │ │ │ │ └── 1.23 [as=c_default:9] + │ │ │ │ └── 1.23::DECIMAL(10,1) [as=c_default:9] │ │ │ └── projections │ │ │ ├── crdb_internal.round_decimal_values(column1:7, 0) [as=a:10] │ │ │ ├── crdb_internal.round_decimal_values(column2:8, 1) [as=b:11] │ │ │ └── crdb_internal.round_decimal_values(c_default:9, 1) [as=c_default:12] │ │ └── projections - │ │ └── a:10 + c_default:12 [as=d_comp:13] + │ │ └── (a:10 + c_default:12::DECIMAL)::DECIMAL(10,1) [as=d_comp:13] │ └── projections │ └── crdb_internal.round_decimal_values(d_comp:13, 1) [as=d_comp:14] └── projections diff --git a/pkg/sql/opt/optbuilder/testdata/update b/pkg/sql/opt/optbuilder/testdata/update index ce6d9c5dc1d9..46e189224a6d 100644 --- a/pkg/sql/opt/optbuilder/testdata/update +++ b/pkg/sql/opt/optbuilder/testdata/update @@ -1631,7 +1631,7 @@ update decimals │ │ │ │ │ ├── columns: a:7!null b:8 c:9 d:10 crdb_internal_mvcc_timestamp:11 tableoid:12 │ │ │ │ │ └── computed column expressions │ │ │ │ │ └── d:10 - │ │ │ │ │ └── a:7::DECIMAL + c:9::DECIMAL + │ │ │ │ │ └── (a:7::DECIMAL + c:9::DECIMAL)::DECIMAL(10,1) │ │ │ │ └── projections │ │ │ │ ├── 1.1 [as=a_new:13] │ │ │ │ └── ARRAY[0.95,NULL,15] [as=b_new:14] @@ -1639,7 +1639,7 @@ update decimals │ │ │ ├── crdb_internal.round_decimal_values(a_new:13, 0) [as=a_new:15] │ │ │ └── crdb_internal.round_decimal_values(b_new:14, 1) [as=b_new:16] │ │ └── projections - │ │ └── a_new:15 + c:9::DECIMAL [as=d_comp:17] + │ │ └── (a_new:15 + c:9::DECIMAL)::DECIMAL(10,1) [as=d_comp:17] │ └── projections │ └── crdb_internal.round_decimal_values(d_comp:17, 1) [as=d_comp:18] └── projections diff --git a/pkg/sql/opt/optbuilder/testdata/update-col-cast-bug b/pkg/sql/opt/optbuilder/testdata/update-col-cast-bug new file mode 100644 index 000000000000..86946a81acd1 --- /dev/null +++ b/pkg/sql/opt/optbuilder/testdata/update-col-cast-bug @@ -0,0 +1,52 @@ +exec-ddl +create table t (a int2, b int2, c int2 as (a + b) virtual) +---- + +build format=show-types +update t set a = (with cte as (select 1:::int8) select t.c from cte limit 1) +---- +with &1 (cte) + ├── project + │ ├── columns: int8:13(int!null) + │ ├── values + │ │ └── () [type=tuple] + │ └── projections + │ └── 1 [as=int8:13, type=int] + └── update t + ├── columns: + ├── fetch columns: a:7(int2) b:8(int2) t.c:9(int2) rowid:10(int) + ├── update-mapping: + │ ├── a_new:16 => a:1 + │ └── c_comp:17 => t.c:3 + └── project + ├── columns: c_comp:17(int2) a:7(int2) b:8(int2) t.c:9(int2) rowid:10(int!null) crdb_internal_mvcc_timestamp:11(decimal) tableoid:12(oid) a_new:16(int2) + ├── project + │ ├── columns: a_new:16(int2) a:7(int2) b:8(int2) t.c:9(int2) rowid:10(int!null) crdb_internal_mvcc_timestamp:11(decimal) tableoid:12(oid) + │ ├── project + │ │ ├── columns: t.c:9(int2) a:7(int2) b:8(int2) rowid:10(int!null) crdb_internal_mvcc_timestamp:11(decimal) tableoid:12(oid) + │ │ ├── scan t + │ │ │ ├── columns: a:7(int2) b:8(int2) rowid:10(int!null) crdb_internal_mvcc_timestamp:11(decimal) tableoid:12(oid) + │ │ │ └── computed column expressions + │ │ │ └── t.c:9 + │ │ │ └── (a:7::INT8 + b:8::INT8)::INT2 [type=int2] + │ │ └── projections + │ │ └── (a:7::INT8 + b:8::INT8)::INT2 [as=t.c:9, type=int2] + │ └── projections + │ └── subquery [as=a_new:16, type=int2] + │ └── max1-row + │ ├── columns: c:15(int2) + │ └── limit + │ ├── columns: c:15(int2) + │ ├── project + │ │ ├── columns: c:15(int2) + │ │ ├── limit hint: 1.00 + │ │ ├── with-scan &1 (cte) + │ │ │ ├── columns: int8:14(int!null) + │ │ │ ├── mapping: + │ │ │ │ └── int8:13(int) => int8:14(int) + │ │ │ └── limit hint: 1.00 + │ │ └── projections + │ │ └── t.c:9 [as=c:15, type=int2] + │ └── 1 [type=int] + └── projections + └── (a_new:16::INT8 + b:8::INT8)::INT2 [as=c_comp:17, type=int2] diff --git a/pkg/sql/opt/optbuilder/testdata/upsert b/pkg/sql/opt/optbuilder/testdata/upsert index 260517bc6341..3160ec784153 100644 --- a/pkg/sql/opt/optbuilder/testdata/upsert +++ b/pkg/sql/opt/optbuilder/testdata/upsert @@ -1722,13 +1722,13 @@ upsert decimals │ │ │ │ │ │ │ │ │ ├── columns: column1:7!null column2:8 │ │ │ │ │ │ │ │ │ └── (1.1, ARRAY[0.95]) │ │ │ │ │ │ │ │ └── projections - │ │ │ │ │ │ │ │ └── 1.23 [as=c_default:9] + │ │ │ │ │ │ │ │ └── 1.23::DECIMAL(10,1) [as=c_default:9] │ │ │ │ │ │ │ └── projections │ │ │ │ │ │ │ ├── crdb_internal.round_decimal_values(column1:7, 0) [as=a:10] │ │ │ │ │ │ │ ├── crdb_internal.round_decimal_values(column2:8, 1) [as=b:11] │ │ │ │ │ │ │ └── crdb_internal.round_decimal_values(c_default:9, 1) [as=c_default:12] │ │ │ │ │ │ └── projections - │ │ │ │ │ │ └── a:10 + c_default:12 [as=d_comp:13] + │ │ │ │ │ │ └── (a:10 + c_default:12::DECIMAL)::DECIMAL(10,1) [as=d_comp:13] │ │ │ │ │ └── projections │ │ │ │ │ └── crdb_internal.round_decimal_values(d_comp:13, 1) [as=d_comp:14] │ │ │ │ └── aggregations @@ -1742,11 +1742,11 @@ upsert decimals │ │ │ │ ├── columns: decimals.a:15!null decimals.b:16 c:17 d:18 crdb_internal_mvcc_timestamp:19 tableoid:20 │ │ │ │ └── computed column expressions │ │ │ │ └── d:18 - │ │ │ │ └── decimals.a:15::DECIMAL + c:17::DECIMAL + │ │ │ │ └── (decimals.a:15::DECIMAL + c:17::DECIMAL)::DECIMAL(10,1) │ │ │ └── filters │ │ │ └── a:10 = decimals.a:15 │ │ └── projections - │ │ └── decimals.a:15::DECIMAL + c:17::DECIMAL [as=d_comp:21] + │ │ └── (decimals.a:15::DECIMAL + c:17::DECIMAL)::DECIMAL(10,1) [as=d_comp:21] │ └── projections │ ├── CASE WHEN decimals.a:15 IS NULL THEN a:10 ELSE decimals.a:15 END [as=upsert_a:22] │ ├── CASE WHEN decimals.a:15 IS NULL THEN c_default:12 ELSE c:17 END [as=upsert_c:23] @@ -1794,13 +1794,13 @@ upsert decimals │ │ │ │ │ │ │ │ │ └── (1.1,) │ │ │ │ │ │ │ │ └── projections │ │ │ │ │ │ │ │ ├── NULL::DECIMAL(5,1)[] [as=b_default:8] - │ │ │ │ │ │ │ │ └── 1.23 [as=c_default:9] + │ │ │ │ │ │ │ │ └── 1.23::DECIMAL(10,1) [as=c_default:9] │ │ │ │ │ │ │ └── projections │ │ │ │ │ │ │ ├── crdb_internal.round_decimal_values(column1:7, 0) [as=a:10] │ │ │ │ │ │ │ ├── crdb_internal.round_decimal_values(b_default:8, 1) [as=b_default:11] │ │ │ │ │ │ │ └── crdb_internal.round_decimal_values(c_default:9, 1) [as=c_default:12] │ │ │ │ │ │ └── projections - │ │ │ │ │ │ └── a:10 + c_default:12 [as=d_comp:13] + │ │ │ │ │ │ └── (a:10 + c_default:12::DECIMAL)::DECIMAL(10,1) [as=d_comp:13] │ │ │ │ │ └── projections │ │ │ │ │ └── crdb_internal.round_decimal_values(d_comp:13, 1) [as=d_comp:14] │ │ │ │ └── aggregations @@ -1814,11 +1814,11 @@ upsert decimals │ │ │ │ ├── columns: decimals.a:15!null b:16 c:17 d:18 crdb_internal_mvcc_timestamp:19 tableoid:20 │ │ │ │ └── computed column expressions │ │ │ │ └── d:18 - │ │ │ │ └── decimals.a:15::DECIMAL + c:17::DECIMAL + │ │ │ │ └── (decimals.a:15::DECIMAL + c:17::DECIMAL)::DECIMAL(10,1) │ │ │ └── filters │ │ │ └── a:10 = decimals.a:15 │ │ └── projections - │ │ └── decimals.a:15::DECIMAL + c:17::DECIMAL [as=d_comp:21] + │ │ └── (decimals.a:15::DECIMAL + c:17::DECIMAL)::DECIMAL(10,1) [as=d_comp:21] │ └── projections │ ├── CASE WHEN decimals.a:15 IS NULL THEN a:10 ELSE decimals.a:15 END [as=upsert_a:22] │ ├── CASE WHEN decimals.a:15 IS NULL THEN b_default:11 ELSE b:16 END [as=upsert_b:23] @@ -1874,13 +1874,13 @@ upsert decimals │ │ │ │ │ │ │ │ │ │ │ ├── columns: column1:7!null column2:8 │ │ │ │ │ │ │ │ │ │ │ └── (1.1, ARRAY[0.95]) │ │ │ │ │ │ │ │ │ │ └── projections - │ │ │ │ │ │ │ │ │ │ └── 1.23 [as=c_default:9] + │ │ │ │ │ │ │ │ │ │ └── 1.23::DECIMAL(10,1) [as=c_default:9] │ │ │ │ │ │ │ │ │ └── projections │ │ │ │ │ │ │ │ │ ├── crdb_internal.round_decimal_values(column1:7, 0) [as=a:10] │ │ │ │ │ │ │ │ │ ├── crdb_internal.round_decimal_values(column2:8, 1) [as=b:11] │ │ │ │ │ │ │ │ │ └── crdb_internal.round_decimal_values(c_default:9, 1) [as=c_default:12] │ │ │ │ │ │ │ │ └── projections - │ │ │ │ │ │ │ │ └── a:10 + c_default:12 [as=d_comp:13] + │ │ │ │ │ │ │ │ └── (a:10 + c_default:12::DECIMAL)::DECIMAL(10,1) [as=d_comp:13] │ │ │ │ │ │ │ └── projections │ │ │ │ │ │ │ └── crdb_internal.round_decimal_values(d_comp:13, 1) [as=d_comp:14] │ │ │ │ │ │ └── aggregations @@ -1894,7 +1894,7 @@ upsert decimals │ │ │ │ │ │ ├── columns: decimals.a:15!null decimals.b:16 c:17 d:18 crdb_internal_mvcc_timestamp:19 tableoid:20 │ │ │ │ │ │ └── computed column expressions │ │ │ │ │ │ └── d:18 - │ │ │ │ │ │ └── decimals.a:15::DECIMAL + c:17::DECIMAL + │ │ │ │ │ │ └── (decimals.a:15::DECIMAL + c:17::DECIMAL)::DECIMAL(10,1) │ │ │ │ │ └── filters │ │ │ │ │ └── a:10 = decimals.a:15 │ │ │ │ └── projections @@ -1902,7 +1902,7 @@ upsert decimals │ │ │ └── projections │ │ │ └── crdb_internal.round_decimal_values(b_new:21, 1) [as=b_new:22] │ │ └── projections - │ │ └── decimals.a:15::DECIMAL + c:17::DECIMAL [as=d_comp:23] + │ │ └── (decimals.a:15::DECIMAL + c:17::DECIMAL)::DECIMAL(10,1) [as=d_comp:23] │ └── projections │ ├── CASE WHEN decimals.a:15 IS NULL THEN a:10 ELSE decimals.a:15 END [as=upsert_a:24] │ ├── CASE WHEN decimals.a:15 IS NULL THEN b:11 ELSE b_new:22 END [as=upsert_b:25] diff --git a/pkg/sql/opt/xform/testdata/external/trading-mutation b/pkg/sql/opt/xform/testdata/external/trading-mutation index 95702925936d..264a3cd8fcd2 100644 --- a/pkg/sql/opt/xform/testdata/external/trading-mutation +++ b/pkg/sql/opt/xform/testdata/external/trading-mutation @@ -1602,7 +1602,7 @@ update cardsinfo [as=ci] │ └── const-agg [as=q:37, outer=(37)] │ └── q:37 └── projections - ├── crdb_internal.round_decimal_values(buyprice:23::DECIMAL - discount:25::DECIMAL, 4) [as=discountbuyprice_comp:53, outer=(23,25), immutable] + ├── crdb_internal.round_decimal_values((buyprice:23::DECIMAL - discount:25::DECIMAL)::DECIMAL(10,4), 4) [as=discountbuyprice_comp:53, outer=(23,25), immutable] ├── CAST(NULL AS STRING) [as=notes_default:50] ├── 0 [as=oldinventory_default:51] └── COALESCE(sum_int:47, 0) [as=actualinventory_new:49, outer=(47)] diff --git a/pkg/storage/engine.go b/pkg/storage/engine.go index e112e7fc749f..4ebc94c90103 100644 --- a/pkg/storage/engine.go +++ b/pkg/storage/engine.go @@ -363,9 +363,10 @@ const ( // different iterators created by NewMVCCIterator, NewEngineIterator: // - pebbleSnapshot, because it uses an engine snapshot. // - pebbleReadOnly, pebbleBatch: when the IterOptions do not specify a -// timestamp hint. Note that currently the engine state visible here is -// not as of the time of the Reader creation. It is the time when the -// first iterator is created. +// timestamp hint (see IterOptions). Note that currently the engine state +// visible here is not as of the time of the Reader creation. It is the time +// when the first iterator is created, or earlier if +// PinEngineStateForIterators is called. // The ConsistentIterators method returns true when this consistency is // guaranteed by the Reader. // TODO(sumeer): this partial consistency can be a source of bugs if future @@ -443,9 +444,25 @@ type Reader interface { // after this function returns. NewEngineIterator(opts IterOptions) EngineIterator // ConsistentIterators returns true if the Reader implementation guarantees - // that the different iterators constructed by this Reader will see the - // same underlying Engine state. + // that the different iterators constructed by this Reader will see the same + // underlying Engine state. NB: this only applies to iterators without + // timestamp hints (see IterOptions), i.e., even if this returns true, those + // iterators can be "inconsistent" in terms of seeing a different engine + // state. The only exception to this is a Reader created using NewSnapshot. ConsistentIterators() bool + + // PinEngineStateForIterators ensures that the state seen by iterators + // without timestamp hints (see IterOptions) is pinned and will not see + // future mutations. It can be called multiple times on a Reader in which + // case the state seen will be either: + // - As of the first call. + // - For a Reader returned by Engine.NewSnapshot, the pinned state is as of + // the time the snapshot was taken. + // So the semantics that are true for all Readers is that the pinned state + // is somewhere in the time interval between the creation of the Reader and + // the first call to PinEngineStateForIterators. + // REQUIRES: ConsistentIterators returns true. + PinEngineStateForIterators() error } // PrecedingIntentState is information needed when writing or clearing an diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 60658870ca0a..c0e024b0aadf 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -779,6 +779,12 @@ func (p *Pebble) ConsistentIterators() bool { return false } +// PinEngineStateForIterators implements the Engine interface. +func (p *Pebble) PinEngineStateForIterators() error { + return errors.AssertionFailedf( + "PinEngineStateForIterators must not be called when ConsistentIterators returns false") +} + // ApplyBatchRepr implements the Engine interface. func (p *Pebble) ApplyBatchRepr(repr []byte, sync bool) error { // batch.SetRepr takes ownership of the underlying slice, so make a copy. @@ -1237,22 +1243,13 @@ type pebbleReadOnly struct { // need separate iterators for EngineKey and MVCCKey iteration since // iterators that make separated locks/intents look as interleaved need to // use both simultaneously. - // When the first iterator is initialized, the underlying *pebble.Iterator - // is stashed in iter, so that subsequent iterator initialization can use - // Iterator.Clone to use the same underlying engine state. This relies on - // the fact that all pebbleIterators created here are marked as reusable, - // which causes pebbleIterator.Close to not close iter. iter will be closed - // when pebbleReadOnly.Close is called. - // - // TODO(sumeer): The lazy iterator creation is insufficient to address - // issues like https://github.com/cockroachdb/cockroach/issues/55461. - // We could create the pebble.Iterator eagerly, since a caller using - // pebbleReadOnly is eventually going to create one anyway. But we - // already have different behaviors in different Reader implementations - // (see Reader.ConsistentIterators) that callers don't pay attention - // to, and adding another such difference could be a source of bugs. - // See https://github.com/cockroachdb/cockroach/pull/58515#pullrequestreview-563993424 - // for more discussion. + // When the first iterator is initialized, or when + // PinEngineStateForIterators is called (whichever happens first), the + // underlying *pebble.Iterator is stashed in iter, so that subsequent + // iterator initialization can use Iterator.Clone to use the same underlying + // engine state. This relies on the fact that all pebbleIterators created + // here are marked as reusable, which causes pebbleIterator.Close to not + // close iter. iter will be closed when pebbleReadOnly.Close is called. prefixIter pebbleIterator normalIter pebbleIterator prefixEngineIter pebbleIterator @@ -1474,6 +1471,14 @@ func (p *pebbleReadOnly) ConsistentIterators() bool { return true } +// PinEngineStateForIterators implements the Engine interface. +func (p *pebbleReadOnly) PinEngineStateForIterators() error { + if p.iter == nil { + p.iter = p.parent.db.NewIter(nil) + } + return nil +} + // Writer methods are not implemented for pebbleReadOnly. Ideally, the code // could be refactored so that a Reader could be supplied to evaluateBatch @@ -1673,6 +1678,12 @@ func (p pebbleSnapshot) ConsistentIterators() bool { return true } +// PinEngineStateForIterators implements the Reader interface. +func (p *pebbleSnapshot) PinEngineStateForIterators() error { + // Snapshot already pins state, so nothing to do. + return nil +} + // pebbleGetProto uses Reader.MVCCGet, so it not as efficient as a function // that can unmarshal without copying bytes. But we don't care about // efficiency, since this is used to implement Reader.MVCCGetProto, which is diff --git a/pkg/storage/pebble_batch.go b/pkg/storage/pebble_batch.go index 19f8715ba5cc..f8ebaff9a4ab 100644 --- a/pkg/storage/pebble_batch.go +++ b/pkg/storage/pebble_batch.go @@ -37,12 +37,13 @@ type pebbleBatch struct { // need separate iterators for EngineKey and MVCCKey iteration since // iterators that make separated locks/intents look as interleaved need to // use both simultaneously. - // When the first iterator is initialized, the underlying *pebble.Iterator - // is stashed in iter, so that subsequent iterator initialization can use - // Iterator.Clone to use the same underlying engine state. This relies on - // the fact that all pebbleIterators created here are marked as reusable, - // which causes pebbleIterator.Close to not close iter. iter will be closed - // when pebbleBatch.Close is called. + // When the first iterator is initialized, or when + // PinEngineStateForIterators is called (whichever happens first), the + // underlying *pebble.Iterator is stashed in iter, so that subsequent + // iterator initialization can use Iterator.Clone to use the same underlying + // engine state. This relies on the fact that all pebbleIterators created + // here are marked as reusable, which causes pebbleIterator.Close to not + // close iter. iter will be closed when pebbleBatch.Close is called. prefixIter pebbleIterator normalIter pebbleIterator prefixEngineIter pebbleIterator @@ -299,6 +300,18 @@ func (p *pebbleBatch) ConsistentIterators() bool { return true } +// PinEngineStateForIterators implements the Batch interface. +func (p *pebbleBatch) PinEngineStateForIterators() error { + if p.iter == nil { + if p.batch.Indexed() { + p.iter = p.batch.NewIter(nil) + } else { + p.iter = p.db.NewIter(nil) + } + } + return nil +} + // NewMVCCIterator implements the Batch interface. func (p *pebbleBatch) ApplyBatchRepr(repr []byte, sync bool) error { var batch pebble.Batch diff --git a/pkg/storage/pebble_test.go b/pkg/storage/pebble_test.go index 55020acba970..cbeadb049250 100644 --- a/pkg/storage/pebble_test.go +++ b/pkg/storage/pebble_test.go @@ -457,10 +457,14 @@ func TestPebbleIterConsistency(t *testing.T) { roEngine := eng.NewReadOnly() batch := eng.NewBatch() + roEngine2 := eng.NewReadOnly() + batch2 := eng.NewBatch() require.False(t, eng.ConsistentIterators()) require.True(t, roEngine.ConsistentIterators()) require.True(t, batch.ConsistentIterators()) + require.True(t, roEngine2.ConsistentIterators()) + require.True(t, batch2.ConsistentIterators()) // Since an iterator is created on pebbleReadOnly, pebbleBatch before // writing a newer version of "a", the newer version will not be visible to @@ -468,6 +472,9 @@ func TestPebbleIterConsistency(t *testing.T) { roEngine.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close() batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close() eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("a")}).Close() + // Pin the state for iterators. + require.Nil(t, roEngine2.PinEngineStateForIterators()) + require.Nil(t, batch2.PinEngineStateForIterators()) // Write a newer version of "a" require.NoError(t, eng.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2"))) @@ -506,26 +513,41 @@ func TestPebbleIterConsistency(t *testing.T) { checkMVCCIter(roEngine.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true})) checkMVCCIter(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) checkMVCCIter(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true})) + checkMVCCIter(roEngine2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) + checkMVCCIter(roEngine2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true})) + checkMVCCIter(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) + checkMVCCIter(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{Prefix: true})) checkEngineIter(roEngine.NewEngineIterator(IterOptions{UpperBound: []byte("b")})) checkEngineIter(roEngine.NewEngineIterator(IterOptions{Prefix: true})) checkEngineIter(batch.NewEngineIterator(IterOptions{UpperBound: []byte("b")})) checkEngineIter(batch.NewEngineIterator(IterOptions{Prefix: true})) + checkEngineIter(roEngine2.NewEngineIterator(IterOptions{UpperBound: []byte("b")})) + checkEngineIter(roEngine2.NewEngineIterator(IterOptions{Prefix: true})) + checkEngineIter(batch2.NewEngineIterator(IterOptions{UpperBound: []byte("b")})) + checkEngineIter(batch2.NewEngineIterator(IterOptions{Prefix: true})) - // The eng iterator will see both values. - iter := eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")}) - defer iter.Close() - iter.SeekGE(MVCCKey{Key: []byte("a")}) - count := 0 - for ; ; iter.Next() { - valid, err := iter.Valid() - require.NoError(t, err) - if !valid { - break + checkIterSeesBothValues := func(iter MVCCIterator) { + iter.SeekGE(MVCCKey{Key: []byte("a")}) + count := 0 + for ; ; iter.Next() { + valid, err := iter.Valid() + require.NoError(t, err) + if !valid { + break + } + count++ } - count++ + require.Equal(t, 2, count) + iter.Close() } - require.Equal(t, 2, count) + // The eng iterator will see both values. + checkIterSeesBothValues(eng.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) + // The indexed batches will see 2 values since the second one is written to the batch. + require.NoError(t, batch.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2"))) + require.NoError(t, batch2.PutMVCC(MVCCKey{[]byte("a"), ts2}, []byte("a2"))) + checkIterSeesBothValues(batch.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) + checkIterSeesBothValues(batch2.NewMVCCIterator(MVCCKeyIterKind, IterOptions{UpperBound: []byte("b")})) } func BenchmarkMVCCKeyCompare(b *testing.B) {