-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathcontext.go
770 lines (661 loc) · 27.7 KB
/
context.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package eval
import (
"context"
"math"
"time"
apd "github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/streamingccl/streampb"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgwirecancel"
"github.com/cockroachdb/cockroach/pkg/sql/sem/catid"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb"
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/timeofday"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil/pgdate"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)
// Context defines the context in which to evaluate an expression, allowing
// the retrieval of state such as the node ID or statement start time.
//
// ATTENTION: Some fields from this struct (particularly, but not exclusively,
// from SessionData) are also represented in execinfrapb.EvalContext. Whenever
// something that affects DistSQL execution is added, it needs to be marshaled
// through that proto too.
// TODO(andrei): remove or limit the duplication.
//
// NOTE(andrei): Context is dusty; it started as a collection of fields
// needed by expression evaluation, but it has grown quite large; some of the
// things in it don't seem to belong in this low-level package (e.g. Planner).
// In the sql package it is embedded by extendedEvalContext, which adds some
// more fields from the sql package. Through that extendedEvalContext, this
// struct now generally used by planNodes.
type Context struct {
// SessionDataStack stores the session variables accessible by the correct
// context. Each element on the stack represents the beginning of a new
// transaction or nested transaction (savepoints).
SessionDataStack *sessiondata.Stack
// TxnState is a string representation of the current transactional state.
TxnState string
// TxnReadOnly specifies if the current transaction is read-only.
TxnReadOnly bool
// TxnImplicit specifies if the current transaction is implicit.
TxnImplicit bool
// TxnIsSingleStmt specifies the current implicit transaction consists of only
// a single statement.
TxnIsSingleStmt bool
Settings *cluster.Settings
// ClusterID is the logical cluster ID for this tenant.
ClusterID uuid.UUID
// ClusterName is the security string used to secure the RPC layer.
ClusterName string
// NodeID is either the SQL instance ID or KV Node ID, depending on
// circumstances.
// TODO(knz,radu): Split this into separate fields.
NodeID *base.SQLIDContainer
Codec keys.SQLCodec
// Locality contains the location of the current node as a set of user-defined
// key/value pairs, ordered from most inclusive to least inclusive. If there
// are no tiers, then the node's location is not known. Example:
//
// [region=us,dc=east]
//
Locality roachpb.Locality
Tracer *tracing.Tracer
// The statement timestamp. May be different for every statement.
// Used for statement_timestamp().
StmtTimestamp time.Time
// The transaction timestamp. Needs to stay stable for the lifetime
// of a transaction. Used for now(), current_timestamp(),
// transaction_timestamp() and the like.
TxnTimestamp time.Time
// AsOfSystemTime denotes the explicit AS OF SYSTEM TIME timestamp for the
// query, if any. If the query is not an AS OF SYSTEM TIME query,
// AsOfSystemTime is nil.
// TODO(knz): we may want to support table readers at arbitrary
// timestamps, so that each FROM clause can have its own
// timestamp. In that case, the timestamp would not be set
// globally for the entire txn and this field would not be needed.
AsOfSystemTime *AsOfSystemTime
// Placeholders relates placeholder names to their type and, later, value.
// This pointer should always be set to the location of the PlaceholderInfo
// in the corresponding SemaContext during normal execution. Placeholders are
// available during Eval to permit lookup of a particular placeholder's
// underlying datum, if available.
Placeholders *tree.PlaceholderInfo
// Annotations augments the AST with extra information. This pointer should
// always be set to the location of the Annotations in the corresponding
// SemaContext.
Annotations *tree.Annotations
// IVarContainer is used to evaluate IndexedVars. Note that the underlying
// implementation must support the eval.IndexedVarContainer interface.
IVarContainer tree.IndexedVarContainer
// iVarContainerStack is used when we swap out IVarContainers in order to
// evaluate an intermediate expression. This keeps track of those which we
// need to restore once we finish evaluating it.
iVarContainerStack []tree.IndexedVarContainer
// deprecatedContext holds the context in which the expression is evaluated.
//
// Deprecated: this field should not be used because an effort to remove it
// from Context is under way.
deprecatedContext context.Context
Planner Planner
StreamManagerFactory StreamManagerFactory
// Not using sql.JobExecContext type to avoid cycle dependency with sql package
JobExecContext interface{}
PrivilegedAccessor PrivilegedAccessor
SessionAccessor SessionAccessor
ClientNoticeSender ClientNoticeSender
Sequence SequenceOperators
Tenant TenantOperator
// Regions stores information about regions.
Regions RegionOperator
JoinTokenCreator JoinTokenCreator
Gossip GossipOperator
PreparedStatementState PreparedStatementState
// The transaction in which the statement is executing.
Txn *kv.Txn
ReCache *tree.RegexpCache
// TODO(mjibson): remove prepareOnly in favor of a 2-step prepare-exec solution
// that is also able to save the plan to skip work during the exec step.
PrepareOnly bool
// SkipNormalize indicates whether expressions should be normalized
// (false) or not (true). It is set to true conditionally by
// EXPLAIN(TYPES[, NORMALIZE]).
SkipNormalize bool
CollationEnv tree.CollationEnvironment
TestingKnobs TestingKnobs
// TestingMon is a memory monitor that should be only used in tests. In
// production code consider using either the monitor of the planner or of
// the flow.
// TODO(yuzefovich): remove this.
TestingMon *mon.BytesMonitor
// SingleDatumAggMemAccount is a memory account that all aggregate builtins
// that store a single datum will share to account for the memory needed to
// perform the aggregation (i.e. memory not reported by AggregateFunc.Size
// method). This memory account exists so that such aggregate functions
// could "batch" their reservations - otherwise, we end up a situation
// where each aggregate function struct grows its own memory account by
// tiny amount, yet the account reserves a lot more resulting in
// significantly overestimating the memory usage.
SingleDatumAggMemAccount *mon.BoundAccount
SQLLivenessReader sqlliveness.Reader
SQLStatsController SQLStatsController
SchemaTelemetryController SchemaTelemetryController
IndexUsageStatsController IndexUsageStatsController
// CompactEngineSpan is used to force compaction of a span in a store.
CompactEngineSpan CompactEngineSpanFunc
// SetCompactionConcurrency is used to change the compaction concurrency of
// a store.
SetCompactionConcurrency SetCompactionConcurrencyFunc
// KVStoresIterator is used by various crdb_internal builtins to directly
// access stores on this node.
KVStoresIterator kvserverbase.StoresIterator
// ConsistencyChecker is to generate the results in calls to
// crdb_internal.check_consistency.
ConsistencyChecker ConsistencyCheckRunner
// RangeProber is used in calls to crdb_internal.probe_ranges.
RangeProber RangeProber
// StmtDiagnosticsRequestInserter is used by the
// crdb_internal.request_statement_bundle builtin to insert a statement
// bundle request.
StmtDiagnosticsRequestInserter StmtDiagnosticsRequestInsertFunc
// CatalogBuiltins is used by various builtins which depend on looking up
// catalog information. Unlike the Planner, it is available in DistSQL.
CatalogBuiltins CatalogBuiltins
// QueryCancelKey is the key used by the pgwire protocol to cancel the
// query currently running in this session.
QueryCancelKey pgwirecancel.BackendKeyData
DescIDGenerator DescIDGenerator
// RangeStatsFetcher is used to fetch RangeStats.
RangeStatsFetcher RangeStatsFetcher
// ChangefeedState stores the state (progress) of core changefeeds.
ChangefeedState ChangefeedState
}
// DescIDGenerator generates unique descriptor IDs.
type DescIDGenerator interface {
// PeekNextUniqueDescID returns a lower bound to the next as-of-yet unassigned
// unique descriptor ID in the sequence.
PeekNextUniqueDescID(ctx context.Context) (catid.DescID, error)
// GenerateUniqueDescID returns the next available Descriptor ID and
// increments the counter.
GenerateUniqueDescID(ctx context.Context) (catid.DescID, error)
// IncrementDescID increments the descriptor ID counter by at least inc.
IncrementDescID(ctx context.Context, inc int64) error
}
// RangeStatsFetcher is used to fetch RangeStats.
type RangeStatsFetcher interface {
// RangeStats fetches the stats for the ranges which contain the passed keys.
RangeStats(ctx context.Context, keys ...roachpb.Key) ([]*roachpb.RangeStatsResponse, error)
}
var _ tree.ParseTimeContext = &Context{}
// ConsistencyCheckRunner is an interface embedded in eval.Context used by
// crdb_internal.check_consistency.
type ConsistencyCheckRunner interface {
CheckConsistency(
ctx context.Context, from, to roachpb.Key, mode roachpb.ChecksumMode,
) (*roachpb.CheckConsistencyResponse, error)
}
// RangeProber is an interface embedded in eval.Context used by
// crdb_internal.probe_ranges.
type RangeProber interface {
RunProbe(
ctx context.Context, key roachpb.Key, isWrite bool,
) error
}
// SetDeprecatedContext updates the context.Context of this Context. Previously
// stored context is returned.
//
// Deprecated: this method should not be used because an effort to remove the
// context.Context from Context is under way.
func (ec *Context) SetDeprecatedContext(ctx context.Context) context.Context {
oldCtx := ec.deprecatedContext
ec.deprecatedContext = ctx
return oldCtx
}
// UnwrapDatum encapsulates UnwrapDatum for use in the tree.CompareContext.
func (ec *Context) UnwrapDatum(d tree.Datum) tree.Datum {
if ec == nil {
// When ec is nil, then eval.UnwrapDatum is equivalent to
// tree.UnwrapDOidWrapper. We have this special handling in order to not
// hit a nil pointer exception when accessing deprecatedContext field.
return tree.UnwrapDOidWrapper(d)
}
return UnwrapDatum(ec.deprecatedContext, ec, d)
}
// MustGetPlaceholderValue is part of the tree.CompareContext interface.
func (ec *Context) MustGetPlaceholderValue(p *tree.Placeholder) tree.Datum {
e, ok := ec.Placeholders.Value(p.Idx)
if !ok {
panic(errors.AssertionFailedf("fail"))
}
out, err := Expr(ec.deprecatedContext, ec, e)
if err != nil {
panic(errors.NewAssertionErrorWithWrappedErrf(err, "fail"))
}
return out
}
// MakeTestingEvalContext returns an EvalContext that includes a MemoryMonitor.
func MakeTestingEvalContext(st *cluster.Settings) Context {
monitor := mon.NewMonitor(
"test-monitor",
mon.MemoryResource,
nil, /* curCount */
nil, /* maxHist */
-1, /* increment */
math.MaxInt64, /* noteworthy */
st,
)
return MakeTestingEvalContextWithMon(st, monitor)
}
// MakeTestingEvalContextWithMon returns an EvalContext with the given
// MemoryMonitor. Ownership of the memory monitor is transferred to the
// EvalContext so do not start or close the memory monitor.
func MakeTestingEvalContextWithMon(st *cluster.Settings, monitor *mon.BytesMonitor) Context {
ctx := Context{
Codec: keys.SystemSQLCodec,
Txn: &kv.Txn{},
SessionDataStack: sessiondata.NewStack(&sessiondata.SessionData{}),
Settings: st,
NodeID: base.TestingIDContainer,
}
monitor.Start(context.Background(), nil /* pool */, mon.NewStandaloneBudget(math.MaxInt64))
ctx.TestingMon = monitor
ctx.Planner = &fakePlannerWithMonitor{monitor: monitor}
ctx.StreamManagerFactory = &fakeStreamManagerFactory{}
ctx.deprecatedContext = context.TODO()
now := timeutil.Now()
ctx.SetTxnTimestamp(now)
ctx.SetStmtTimestamp(now)
return ctx
}
type fakePlannerWithMonitor struct {
Planner
monitor *mon.BytesMonitor
}
// Mon is part of the Planner interface.
func (p *fakePlannerWithMonitor) Mon() *mon.BytesMonitor {
return p.monitor
}
type fakeStreamManagerFactory struct {
StreamManagerFactory
}
// SessionData returns the SessionData the current EvalCtx should use to eval.
func (ec *Context) SessionData() *sessiondata.SessionData {
if ec.SessionDataStack == nil {
return nil
}
return ec.SessionDataStack.Top()
}
// Copy returns a deep copy of ctx.
func (ec *Context) Copy() *Context {
ctxCopy := *ec
ctxCopy.iVarContainerStack = make([]tree.IndexedVarContainer, len(ec.iVarContainerStack), cap(ec.iVarContainerStack))
copy(ctxCopy.iVarContainerStack, ec.iVarContainerStack)
return &ctxCopy
}
// PushIVarContainer replaces the current IVarContainer with a different one -
// pushing the current one onto a stack to be replaced later once
// PopIVarContainer is called.
func (ec *Context) PushIVarContainer(c tree.IndexedVarContainer) {
ec.iVarContainerStack = append(ec.iVarContainerStack, ec.IVarContainer)
ec.IVarContainer = c
}
// PopIVarContainer discards the current IVarContainer on the EvalContext,
// replacing it with an older one.
func (ec *Context) PopIVarContainer() {
ec.IVarContainer = ec.iVarContainerStack[len(ec.iVarContainerStack)-1]
ec.iVarContainerStack = ec.iVarContainerStack[:len(ec.iVarContainerStack)-1]
}
// QualityOfService returns the current value of session setting
// default_transaction_quality_of_service if session data is available,
// otherwise the default value (0).
func (ec *Context) QualityOfService() sessiondatapb.QoSLevel {
if ec.SessionData() == nil {
return sessiondatapb.Normal
}
return ec.SessionData().DefaultTxnQualityOfService
}
// NewTestingEvalContext is a convenience version of MakeTestingEvalContext
// that returns a pointer.
func NewTestingEvalContext(st *cluster.Settings) *Context {
ctx := MakeTestingEvalContext(st)
return &ctx
}
// Stop closes out the EvalContext and must be called once it is no longer in use.
//
// Note: Stop is intended to gracefully handle panics. For this to work
// properly, either:
// - use `defer evalctx.Stop()`, i.e. have `Stop` be called by `defer` directly;
// - or, use `recover()` explicitly and call `EmergencyStop()` under the conditional.
func (ec *Context) Stop(c context.Context) {
if r := recover(); r != nil {
ec.EmergencyStop(c)
panic(r)
} else {
ec.TestingMon.Stop(c)
}
}
// EmergencyStop should be called in the recover() branch of a context
// shutdown when `defer evalCtx.Stop()` is not possible/desirable.
func (ec *Context) EmergencyStop(c context.Context) {
ec.TestingMon.EmergencyStop(c)
}
// FmtCtx creates a FmtCtx with the given options as well as the EvalContext's session data.
func (ec *Context) FmtCtx(f tree.FmtFlags, opts ...tree.FmtCtxOption) *tree.FmtCtx {
if ec.SessionData() != nil {
opts = append(
[]tree.FmtCtxOption{tree.FmtDataConversionConfig(ec.SessionData().DataConversionConfig)},
opts...,
)
}
return tree.NewFmtCtx(
f,
opts...,
)
}
// GetStmtTimestamp retrieves the current statement timestamp as per
// the evaluation context. The timestamp is guaranteed to be nonzero.
func (ec *Context) GetStmtTimestamp() time.Time {
// TODO(knz): a zero timestamp should never be read, even during
// Prepare. This will need to be addressed.
if !ec.PrepareOnly && ec.StmtTimestamp.IsZero() {
panic(errors.AssertionFailedf("zero statement timestamp in EvalContext"))
}
return ec.StmtTimestamp
}
// GetClusterTimestamp retrieves the current cluster timestamp as per
// the evaluation context. The timestamp is guaranteed to be nonzero.
func (ec *Context) GetClusterTimestamp() *tree.DDecimal {
ts := ec.Txn.CommitTimestamp()
if ts.IsEmpty() {
panic(errors.AssertionFailedf("zero cluster timestamp in txn"))
}
return TimestampToDecimalDatum(ts)
}
// HasPlaceholders returns true if this EvalContext's placeholders have been
// assigned. Will be false during Prepare.
func (ec *Context) HasPlaceholders() bool {
return ec.Placeholders != nil
}
const regionKey = "region"
// GetLocalRegion returns the region name of the local processor
// on which we're executing.
func (ec *Context) GetLocalRegion() (regionName string, ok bool) {
return ec.Locality.Find(regionKey)
}
// TimestampToDecimal converts the logical timestamp into a decimal
// value with the number of nanoseconds in the integer part and the
// logical counter in the decimal part.
func TimestampToDecimal(ts hlc.Timestamp) apd.Decimal {
// Compute Walltime * 10^10 + Logical.
// We need 10 decimals for the Logical field because its maximum
// value is 4294967295 (2^32-1), a value with 10 decimal digits.
var res apd.Decimal
val := &res.Coeff
val.SetInt64(ts.WallTime)
val.Mul(val, big10E10)
val.Add(val, apd.NewBigInt(int64(ts.Logical)))
// val must be positive. If it was set to a negative value above,
// transfer the sign to res.Negative.
res.Negative = val.Sign() < 0
val.Abs(val)
// Shift 10 decimals to the right, so that the logical
// field appears as fractional part.
res.Exponent = -10
return res
}
// DecimalToInexactDTimestampTZ is the inverse of TimestampToDecimal. It converts
// a decimal constructed from an hlc.Timestamp into an approximate DTimestampTZ
// containing the walltime of the hlc.Timestamp.
func DecimalToInexactDTimestampTZ(d *tree.DDecimal) (*tree.DTimestampTZ, error) {
ts, err := decimalToHLC(d)
if err != nil {
return nil, err
}
return tree.MakeDTimestampTZ(timeutil.Unix(0, ts.WallTime), time.Microsecond)
}
func decimalToHLC(d *tree.DDecimal) (hlc.Timestamp, error) {
var coef apd.BigInt
coef.Set(&d.Decimal.Coeff)
// The physical portion of the HLC is stored shifted up by 10^10, so shift
// it down and clear out the logical component.
coef.Div(&coef, big10E10)
if !coef.IsInt64() {
return hlc.Timestamp{}, pgerror.Newf(
pgcode.DatetimeFieldOverflow,
"timestamp value out of range: %s", d.String(),
)
}
return hlc.Timestamp{WallTime: coef.Int64()}, nil
}
// DecimalToInexactDTimestamp is the inverse of TimestampToDecimal. It converts
// a decimal constructed from an hlc.Timestamp into an approximate DTimestamp
// containing the walltime of the hlc.Timestamp.
func DecimalToInexactDTimestamp(d *tree.DDecimal) (*tree.DTimestamp, error) {
ts, err := decimalToHLC(d)
if err != nil {
return nil, err
}
return TimestampToInexactDTimestamp(ts), nil
}
// TimestampToDecimalDatum is the same as TimestampToDecimal, but
// returns a datum.
func TimestampToDecimalDatum(ts hlc.Timestamp) *tree.DDecimal {
res := TimestampToDecimal(ts)
return &tree.DDecimal{
Decimal: res,
}
}
// TimestampToInexactDTimestamp converts the logical timestamp into an
// inexact DTimestamp by dropping the logical counter and using the wall
// time at the microsecond precision.
func TimestampToInexactDTimestamp(ts hlc.Timestamp) *tree.DTimestamp {
return tree.MustMakeDTimestamp(timeutil.Unix(0, ts.WallTime), time.Microsecond)
}
// GetRelativeParseTime implements ParseTimeContext.
func (ec *Context) GetRelativeParseTime() time.Time {
ret := ec.TxnTimestamp
if ret.IsZero() {
ret = timeutil.Now()
}
return ret.In(ec.GetLocation())
}
// GetTxnTimestamp retrieves the current transaction timestamp as per
// the evaluation context. The timestamp is guaranteed to be nonzero.
func (ec *Context) GetTxnTimestamp(precision time.Duration) *tree.DTimestampTZ {
// TODO(knz): a zero timestamp should never be read, even during
// Prepare. This will need to be addressed.
if !ec.PrepareOnly && ec.TxnTimestamp.IsZero() {
panic(errors.AssertionFailedf("zero transaction timestamp in EvalContext"))
}
return tree.MustMakeDTimestampTZ(ec.GetRelativeParseTime(), precision)
}
// GetTxnTimestampNoZone retrieves the current transaction timestamp as per
// the evaluation context. The timestamp is guaranteed to be nonzero.
func (ec *Context) GetTxnTimestampNoZone(precision time.Duration) *tree.DTimestamp {
// TODO(knz): a zero timestamp should never be read, even during
// Prepare. This will need to be addressed.
if !ec.PrepareOnly && ec.TxnTimestamp.IsZero() {
panic(errors.AssertionFailedf("zero transaction timestamp in EvalContext"))
}
// Move the time to UTC, but keeping the location's time.
t := ec.GetRelativeParseTime()
_, offsetSecs := t.Zone()
return tree.MustMakeDTimestamp(t.Add(time.Second*time.Duration(offsetSecs)).In(time.UTC), precision)
}
// GetTxnTime retrieves the current transaction time as per
// the evaluation context.
func (ec *Context) GetTxnTime(precision time.Duration) *tree.DTimeTZ {
// TODO(knz): a zero timestamp should never be read, even during
// Prepare. This will need to be addressed.
if !ec.PrepareOnly && ec.TxnTimestamp.IsZero() {
panic(errors.AssertionFailedf("zero transaction timestamp in EvalContext"))
}
return tree.NewDTimeTZFromTime(ec.GetRelativeParseTime().Round(precision))
}
// GetTxnTimeNoZone retrieves the current transaction time as per
// the evaluation context.
func (ec *Context) GetTxnTimeNoZone(precision time.Duration) *tree.DTime {
// TODO(knz): a zero timestamp should never be read, even during
// Prepare. This will need to be addressed.
if !ec.PrepareOnly && ec.TxnTimestamp.IsZero() {
panic(errors.AssertionFailedf("zero transaction timestamp in EvalContext"))
}
return tree.MakeDTime(timeofday.FromTime(ec.GetRelativeParseTime().Round(precision)))
}
// SetTxnTimestamp sets the corresponding timestamp in the EvalContext.
func (ec *Context) SetTxnTimestamp(ts time.Time) {
ec.TxnTimestamp = ts
}
// SetStmtTimestamp sets the corresponding timestamp in the EvalContext.
func (ec *Context) SetStmtTimestamp(ts time.Time) {
ec.StmtTimestamp = ts
}
// GetLocation returns the session timezone.
func (ec *Context) GetLocation() *time.Location {
if ec == nil {
return time.UTC
}
return ec.SessionData().GetLocation()
}
// GetIntervalStyle returns the session interval style.
func (ec *Context) GetIntervalStyle() duration.IntervalStyle {
if ec.SessionData() == nil {
return duration.IntervalStyle_POSTGRES
}
return ec.SessionData().GetIntervalStyle()
}
// GetDateStyle returns the session date style.
func (ec *Context) GetDateStyle() pgdate.DateStyle {
if ec.SessionData() == nil {
return pgdate.DefaultDateStyle()
}
return ec.SessionData().GetDateStyle()
}
// BoundedStaleness returns true if this query uses bounded staleness.
func (ec *Context) BoundedStaleness() bool {
return ec.AsOfSystemTime != nil &&
ec.AsOfSystemTime.BoundedStaleness
}
// ensureExpectedType will return an error if a datum does not match the
// provided type. If the expected type is Any or if the datum is a Null
// type, then no error will be returned.
func ensureExpectedType(exp *types.T, d tree.Datum) error {
if !(exp.Family() == types.AnyFamily || d.ResolvedType().Family() == types.UnknownFamily ||
d.ResolvedType().Equivalent(exp)) {
return errors.AssertionFailedf(
"expected return type %q, got: %q", errors.Safe(exp), errors.Safe(d.ResolvedType()))
}
return nil
}
// arrayOfType returns a fresh DArray of the input type.
func arrayOfType(typ *types.T) (*tree.DArray, error) {
if typ.Family() != types.ArrayFamily {
return nil, errors.AssertionFailedf("array node type (%v) is not types.TArray", typ)
}
if err := types.CheckArrayElementType(typ.ArrayContents()); err != nil {
return nil, err
}
return tree.NewDArray(typ.ArrayContents()), nil
}
// UnwrapDatum returns the base Datum type for a provided datum, stripping
// an *DOidWrapper if present. This is useful for cases like type switches,
// where type aliases should be ignored.
func UnwrapDatum(ctx context.Context, evalCtx *Context, d tree.Datum) tree.Datum {
d = tree.UnwrapDOidWrapper(d)
if p, ok := d.(*tree.Placeholder); ok && evalCtx != nil && evalCtx.HasPlaceholders() {
ret, err := Expr(ctx, evalCtx, p)
if err != nil {
// If we fail to evaluate the placeholder, it's because we don't have
// a placeholder available. Just return the placeholder and someone else
// will handle this problem.
return d
}
return ret
}
return d
}
// StreamManagerFactory stores methods that return the streaming managers.
type StreamManagerFactory interface {
GetReplicationStreamManager(ctx context.Context) (ReplicationStreamManager, error)
GetStreamIngestManager(ctx context.Context) (StreamIngestManager, error)
}
// ReplicationStreamManager represents a collection of APIs that streaming replication supports
// on the production side.
type ReplicationStreamManager interface {
// StartReplicationStream starts a stream replication job for the specified tenant on the producer side.
StartReplicationStream(
ctx context.Context,
tenantID uint64,
) (streampb.StreamID, error)
// HeartbeatReplicationStream sends a heartbeat to the replication stream producer, indicating
// consumer has consumed until the given 'frontier' timestamp. This updates the producer job
// progress and extends its life, and the new producer progress will be returned.
// If 'frontier' is hlc.MaxTimestamp, returns the producer progress without updating it.
HeartbeatReplicationStream(
ctx context.Context,
streamID streampb.StreamID,
frontier hlc.Timestamp,
) (streampb.StreamReplicationStatus, error)
// StreamPartition starts streaming replication on the producer side for the partition specified
// by opaqueSpec which contains serialized streampb.StreamPartitionSpec protocol message and
// returns a value generator which yields events for the specified partition.
StreamPartition(
streamID streampb.StreamID,
opaqueSpec []byte,
) (ValueGenerator, error)
// GetReplicationStreamSpec gets a stream replication spec on the producer side.
GetReplicationStreamSpec(
ctx context.Context,
streamID streampb.StreamID,
) (*streampb.ReplicationStreamSpec, error)
// CompleteReplicationStream completes a replication stream job on the producer side.
// 'successfulIngestion' indicates whether the stream ingestion finished successfully and
// determines the fate of the producer job, succeeded or canceled.
CompleteReplicationStream(
ctx context.Context,
streamID streampb.StreamID,
successfulIngestion bool,
) error
}
// StreamIngestManager represents a collection of APIs that streaming replication supports
// on the ingestion side.
type StreamIngestManager interface {
// CompleteStreamIngestion signals a running stream ingestion job to complete on the consumer side.
CompleteStreamIngestion(
ctx context.Context,
ingestionJobID jobspb.JobID,
cutoverTimestamp hlc.Timestamp,
) error
// GetStreamIngestionStats gets a statistics summary for a stream ingestion job.
GetStreamIngestionStats(
ctx context.Context,
ingestionJobID jobspb.JobID,
) (*streampb.StreamIngestionStats, error)
}