Skip to content

Commit 8f94b96

Browse files
committed
cleanup 3
1 parent b9cee28 commit 8f94b96

8 files changed

+183
-231
lines changed

pkg/sql/conn_executor.go

+104-129
Large diffs are not rendered by default.

pkg/sql/conn_io.go

+27-40
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,7 @@ const (
6161
// (the buffer is not involved in this).
6262
// The buffer internally maintains a cursor representing the reader's position.
6363
// The reader has to manually move the cursor using advanceOne(),
64-
// seekToNextBatch() and rewind(). To support querying the buffer to ask if the
65-
// consumer is idle (through the ConsumerIdle() method), the contract is that
66-
// the consumer only advances the cursor after it is done producing results for
67-
// the previous command.
64+
// seekToNextBatch() and rewind().
6865
// In practice, the writer is a module responsible for communicating with a SQL
6966
// client (i.e. pgwire.conn) and the reader is a connExecutor.
7067
//
@@ -331,7 +328,7 @@ func (buf *StmtBuf) Push(ctx context.Context, cmd Command) error {
331328
//
332329
// If the buffer has previously been Close()d, or is closed while this is
333330
// blocked, io.EOF is returned.
334-
func (buf *StmtBuf) curCmd(ctx context.Context) (Command, CmdPos, error) {
331+
func (buf *StmtBuf) curCmd() (Command, CmdPos, error) {
335332
buf.mu.Lock()
336333
defer buf.mu.Unlock()
337334
for {
@@ -357,14 +354,6 @@ func (buf *StmtBuf) curCmd(ctx context.Context) (Command, CmdPos, error) {
357354
}
358355
}
359356

360-
// ConsumerIdle returns true if the consumer is currently blocked waiting for a
361-
// command to be pushed into the buffer. See top comments about contract.
362-
func (buf *StmtBuf) ConsumerIdle() bool {
363-
buf.mu.Lock()
364-
defer buf.mu.Unlock()
365-
return buf.mu.readerBlocked
366-
}
367-
368357
// translatePosLocked translates an absolute position of a command (counting
369358
// from the connection start) to the index of the respective command in the
370359
// buffer (so, it returns an index relative to the start of the buffer).
@@ -400,14 +389,19 @@ func (buf *StmtBuf) ltrim(ctx context.Context, pos CmdPos) {
400389
if buf.mu.startPos == pos {
401390
break
402391
}
392+
buf.mu.data[0] = nil
403393
buf.mu.data = buf.mu.data[1:]
404394
buf.mu.startPos++
405395
}
396+
// nil out the whole buffer if
397+
if len(buf.mu.data) == 0 {
398+
buf.mu.data = nil
399+
}
406400
}
407401

408402
// advanceOne advances the cursor one Command over. The command over which the
409403
// cursor will be positioned when this returns may not be in the buffer yet.
410-
func (buf *StmtBuf) advanceOne(ctx context.Context) {
404+
func (buf *StmtBuf) advanceOne() {
411405
buf.mu.Lock()
412406
buf.mu.curPos++
413407
buf.mu.Unlock()
@@ -424,7 +418,7 @@ func (buf *StmtBuf) advanceOne(ctx context.Context) {
424418
//
425419
// It is an error to start seeking when the cursor is positioned on an empty
426420
// slot.
427-
func (buf *StmtBuf) seekToNextBatch(ctx context.Context) error {
421+
func (buf *StmtBuf) seekToNextBatch() error {
428422
buf.mu.Lock()
429423
curPos := buf.mu.curPos
430424
cmdIdx, err := buf.translatePosLocked(curPos)
@@ -440,8 +434,8 @@ func (buf *StmtBuf) seekToNextBatch(ctx context.Context) error {
440434

441435
var foundSync bool
442436
for !foundSync {
443-
buf.advanceOne(ctx)
444-
_, pos, err := buf.curCmd(ctx)
437+
buf.advanceOne()
438+
_, pos, err := buf.curCmd()
445439
if err != nil {
446440
return err
447441
}
@@ -537,29 +531,33 @@ type ClientComm interface {
537531
}
538532

539533
// CommandResult represents the result of a statement. It which needs to be
540-
// ultimately delivered to the client. The pgwire module implements this.
534+
// ultimately delivered to the client. pgwire.conn implements this.
541535
type CommandResult interface {
542536
RestrictedCommandResult
537+
CommandResultClose
543538

544539
// SetLimit is used when executing a portal to set a limit on the number of
545540
// rows to be returned. We don't currently properly support this feature of
546541
// the Postgres protocol; instead, we'll return an error if the number of rows
547542
// produced is larger than this limit.
548543
SetLimit(n int)
544+
}
549545

550-
// Close - see ResultBase.
551-
Close(TransactionStatusIndicator)
552-
553-
// CloseWithErr - see ResultBase.
554-
CloseWithErr(error)
546+
type CommandResultErrBase interface {
547+
// SetError accumulates an execution error that needs to be reported to the
548+
// client. No further calls other than Close() and Discard() are allowed. In
549+
// particular, CloseWithErr() is not allowed.
550+
SetError(error)
555551

556-
// Discard - see ResultBase.
557-
Discard()
552+
// Err returns the error previously set with SetError(), if any.
553+
Err() error
558554
}
559555

560556
// RestrictedCommandResult is a subset of CommandResult meant to make it clear
561557
// that its clients don't close the CommandResult.
562558
type RestrictedCommandResult interface {
559+
CommandResultErrBase
560+
563561
// SetColumns informs the client about the schema of the result. The columns
564562
// can be nil.
565563
//
@@ -585,14 +583,6 @@ type RestrictedCommandResult interface {
585583
// value. The callback can be nil, in which case nothing will be called.
586584
SetFinishedCallback(callback func())
587585

588-
// SetError accumulates an execution error that needs to be reported to the
589-
// client. No further calls other that Err() and Close()/Discard() are allowed
590-
// - in particular, CloseWithErr() is not allowed.
591-
SetError(error)
592-
593-
// Err returns the error previously set with SetError(), if any.
594-
Err() error
595-
596586
// IncrementRowsAffected increments a counter by n. This is used for all
597587
// result types other than tree.Rows.
598588
IncrementRowsAffected(n int)
@@ -662,14 +652,11 @@ type CopyInResult interface {
662652
// ResultBase is the common interface implemented by all the different command
663653
// results.
664654
type ResultBase interface {
665-
// SetError accumulates an execution error that needs to be reported to the
666-
// client. No further calls other than Close() and Discard() are allowed. In
667-
// particular, CloseWithErr() is not allowed.
668-
SetError(error)
669-
670-
// Err returns the error previously set with SetError(), if any.
671-
Err() error
655+
CommandResultErrBase
656+
CommandResultClose
657+
}
672658

659+
type CommandResultClose interface {
673660
// Close marks a result as complete. All results must be eventually closed
674661
// through Close()/CloseWithErr()/Discard. No further uses of the CommandResult are
675662
// allowed.

pkg/sql/conn_io_test.go

+25-26
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ func TestStmtBuf(t *testing.T) {
8181
// same statement.
8282
expPos := CmdPos(0)
8383
for i := 0; i < 2; i++ {
84-
cmd, pos, err := buf.curCmd(ctx)
84+
cmd, pos, err := buf.curCmd()
8585
if err != nil {
8686
t.Fatal(err)
8787
}
@@ -91,9 +91,9 @@ func TestStmtBuf(t *testing.T) {
9191
assertStmt(t, cmd, "SELECT 1")
9292
}
9393

94-
buf.advanceOne(ctx)
94+
buf.advanceOne()
9595
expPos++
96-
cmd, pos, err := buf.curCmd(ctx)
96+
cmd, pos, err := buf.curCmd()
9797
if err != nil {
9898
t.Fatal(err)
9999
}
@@ -102,9 +102,9 @@ func TestStmtBuf(t *testing.T) {
102102
}
103103
assertStmt(t, cmd, "SELECT 2")
104104

105-
buf.advanceOne(ctx)
105+
buf.advanceOne()
106106
expPos++
107-
cmd, pos, err = buf.curCmd(ctx)
107+
cmd, pos, err = buf.curCmd()
108108
if err != nil {
109109
t.Fatal(err)
110110
}
@@ -113,9 +113,9 @@ func TestStmtBuf(t *testing.T) {
113113
}
114114
assertStmt(t, cmd, "SELECT 3")
115115

116-
buf.advanceOne(ctx)
116+
buf.advanceOne()
117117
expPos++
118-
cmd, pos, err = buf.curCmd(ctx)
118+
cmd, pos, err = buf.curCmd()
119119
if err != nil {
120120
t.Fatal(err)
121121
}
@@ -127,7 +127,7 @@ func TestStmtBuf(t *testing.T) {
127127
// Now rewind.
128128
expPos = 1
129129
buf.rewind(ctx, expPos)
130-
cmd, pos, err = buf.curCmd(ctx)
130+
cmd, pos, err = buf.curCmd()
131131
if err != nil {
132132
t.Fatal(err)
133133
}
@@ -153,7 +153,7 @@ func TestStmtBufSignal(t *testing.T) {
153153
}()
154154

155155
expPos := CmdPos(0)
156-
cmd, pos, err := buf.curCmd(ctx)
156+
cmd, pos, err := buf.curCmd()
157157
if err != nil {
158158
t.Fatal(err)
159159
}
@@ -177,8 +177,8 @@ func TestStmtBufLtrim(t *testing.T) {
177177
mustPush(ctx, t, buf, ExecStmt{Stmt: stmt})
178178
}
179179
// Advance the cursor so that we can trim.
180-
buf.advanceOne(ctx)
181-
buf.advanceOne(ctx)
180+
buf.advanceOne()
181+
buf.advanceOne()
182182
trimPos := CmdPos(2)
183183
buf.ltrim(ctx, trimPos)
184184
if l := len(buf.mu.data); l != 3 {
@@ -203,7 +203,7 @@ func TestStmtBufClose(t *testing.T) {
203203
mustPush(ctx, t, buf, ExecStmt{Stmt: stmt})
204204
buf.Close()
205205

206-
_, _, err = buf.curCmd(ctx)
206+
_, _, err = buf.curCmd()
207207
if err != io.EOF {
208208
t.Fatalf("expected EOF, got: %v", err)
209209
}
@@ -213,14 +213,13 @@ func TestStmtBufClose(t *testing.T) {
213213
func TestStmtBufCloseUnblocksReader(t *testing.T) {
214214
defer leaktest.AfterTest(t)()
215215

216-
ctx := context.TODO()
217216
buf := NewStmtBuf()
218217

219218
go func() {
220219
buf.Close()
221220
}()
222221

223-
_, _, err := buf.curCmd(ctx)
222+
_, _, err := buf.curCmd()
224223
if err != io.EOF {
225224
t.Fatalf("expected EOF, got: %v", err)
226225
}
@@ -242,29 +241,29 @@ func TestStmtBufPreparedStmt(t *testing.T) {
242241
mustPush(ctx, t, buf, PrepareStmt{Name: "p1"})
243242
mustPush(ctx, t, buf, PrepareStmt{Name: "p2"})
244243

245-
cmd, _, err := buf.curCmd(ctx)
244+
cmd, _, err := buf.curCmd()
246245
if err != nil {
247246
t.Fatal(err)
248247
}
249248
assertStmt(t, cmd, "SELECT 1")
250249

251-
buf.advanceOne(ctx)
252-
cmd, _, err = buf.curCmd(ctx)
250+
buf.advanceOne()
251+
cmd, _, err = buf.curCmd()
253252
if err != nil {
254253
t.Fatal(err)
255254
}
256255
assertPrepareStmt(t, cmd, "p1")
257256

258-
buf.advanceOne(ctx)
259-
cmd, _, err = buf.curCmd(ctx)
257+
buf.advanceOne()
258+
cmd, _, err = buf.curCmd()
260259
if err != nil {
261260
t.Fatal(err)
262261
}
263262
assertPrepareStmt(t, cmd, "p2")
264263

265264
// Rewind to the first prepared stmt.
266265
buf.rewind(ctx, CmdPos(1))
267-
cmd, _, err = buf.curCmd(ctx)
266+
cmd, _, err = buf.curCmd()
268267
if err != nil {
269268
t.Fatal(err)
270269
}
@@ -301,10 +300,10 @@ func TestStmtBufBatching(t *testing.T) {
301300
mustPush(ctx, t, buf, ExecStmt{Stmt: s1})
302301

303302
// Go to 2nd batch.
304-
if err := buf.seekToNextBatch(ctx); err != nil {
303+
if err := buf.seekToNextBatch(); err != nil {
305304
t.Fatal(err)
306305
}
307-
_, pos, err := buf.curCmd(ctx)
306+
_, pos, err := buf.curCmd()
308307
if err != nil {
309308
t.Fatal(err)
310309
}
@@ -313,10 +312,10 @@ func TestStmtBufBatching(t *testing.T) {
313312
}
314313

315314
// Go to 3rd batch.
316-
if err := buf.seekToNextBatch(ctx); err != nil {
315+
if err := buf.seekToNextBatch(); err != nil {
317316
t.Fatal(err)
318317
}
319-
_, pos, err = buf.curCmd(ctx)
318+
_, pos, err = buf.curCmd()
320319
if err != nil {
321320
t.Fatal(err)
322321
}
@@ -331,10 +330,10 @@ func TestStmtBufBatching(t *testing.T) {
331330
}()
332331

333332
// Go to 4th batch.
334-
if err := buf.seekToNextBatch(ctx); err != nil {
333+
if err := buf.seekToNextBatch(); err != nil {
335334
t.Fatal(err)
336335
}
337-
_, pos, err = buf.curCmd(ctx)
336+
_, pos, err = buf.curCmd()
338337
if err != nil {
339338
t.Fatal(err)
340339
}

pkg/sql/executor.go

+1
Original file line numberDiff line numberDiff line change
@@ -2262,6 +2262,7 @@ func (e *Executor) execStmtInParallel(
22622262
return cols, nil
22632263
}
22642264

2265+
// Cfg returns a reference to the ExecutorConfig.
22652266
func (e *Executor) Cfg() *ExecutorConfig {
22662267
return &e.cfg
22672268
}

0 commit comments

Comments
 (0)