Skip to content

Commit

Permalink
sql: Support SELECT FOR UPDATE SQL syntax
Browse files Browse the repository at this point in the history
Update the parser to recognize the SELECT ... FOR UPDATE SQL
syntax.  Update the SQL layer so scan nodes in the query plan
are flagged as lockForUpdate if the SELECT ... FOR UPDATE syntax
is used.  If a scan node with lockForUpdate is encountered during
plan execution, return an error so the user knows that SELECT ...
FOR UPDATE is not yet fully supported.

This is the first of several commits/PRs needed to support
SELECT ... FOR UPDATE.  Future commits will implement the
changes required at the KV layer, connect the two layers with
changes to the KV api, and eliminate the temporary error message
introduced in this commit.

See PR cockroachdb#19577 for the RFC, and Issue cockroachdb#6583.
  • Loading branch information
rytaft committed Oct 30, 2017
1 parent 818c668 commit ccb6904
Show file tree
Hide file tree
Showing 26 changed files with 7,177 additions and 6,963 deletions.
3 changes: 2 additions & 1 deletion pkg/sql/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ func (p *planner) validateCheckExpr(
// use the tableDesc we have, but this is a rare operation and be benefit
// would be marginal compared to the work of the actual query, so the added
// complexity seems unjustified.
rows, err := p.SelectClause(ctx, sel, nil, lim, nil, publicColumns)
rows, err := p.SelectClause(ctx, sel, nil /* orderBy */, lim, false, /* lockForUpdate */
nil /* desiredTypes */, publicColumns)
if err != nil {
return err
}
Expand Down
34 changes: 21 additions & 13 deletions pkg/sql/data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,10 @@ func newSourceInfoForSingleTable(

// getSources combines zero or more FROM sources into cross-joins.
func (p *planner) getSources(
ctx context.Context, sources []parser.TableExpr, scanVisibility scanVisibility,
ctx context.Context,
sources []parser.TableExpr,
scanVisibility scanVisibility,
lockForUpdate bool,
) (planDataSource, error) {
switch len(sources) {
case 0:
Expand All @@ -252,14 +255,14 @@ func (p *planner) getSources(
}, nil

case 1:
return p.getDataSource(ctx, sources[0], nil, scanVisibility)
return p.getDataSource(ctx, sources[0], nil /* hints */, scanVisibility, lockForUpdate)

default:
left, err := p.getDataSource(ctx, sources[0], nil, scanVisibility)
left, err := p.getDataSource(ctx, sources[0], nil /* hints */, scanVisibility, lockForUpdate)
if err != nil {
return planDataSource{}, err
}
right, err := p.getSources(ctx, sources[1:], scanVisibility)
right, err := p.getSources(ctx, sources[1:], scanVisibility, lockForUpdate)
if err != nil {
return planDataSource{}, err
}
Expand Down Expand Up @@ -334,7 +337,7 @@ func (p *planner) getVirtualDataSource(
func (p *planner) getDataSourceAsOneColumn(
ctx context.Context, src *parser.FuncExpr,
) (planDataSource, error) {
ds, err := p.getDataSource(ctx, src, nil, publicColumns)
ds, err := p.getDataSource(ctx, src, nil /* hints */, publicColumns, false /* lockForUpdate */)
if err != nil {
return ds, err
}
Expand Down Expand Up @@ -369,6 +372,7 @@ func (p *planner) getDataSource(
src parser.TableExpr,
hints *parser.IndexHints,
scanVisibility scanVisibility,
lockForUpdate bool,
) (planDataSource, error) {
switch t := src.(type) {
case *parser.NormalizableTableName:
Expand All @@ -386,7 +390,7 @@ func (p *planner) getDataSource(
if foundVirtual {
return ds, nil
}
return p.getTableScanOrViewPlan(ctx, tn, hints, scanVisibility)
return p.getTableScanOrViewPlan(ctx, tn, hints, scanVisibility, lockForUpdate)

case *parser.FuncExpr:
return p.getGeneratorPlan(ctx, t)
Expand All @@ -396,11 +400,11 @@ func (p *planner) getDataSource(

case *parser.JoinTableExpr:
// Joins: two sources.
left, err := p.getDataSource(ctx, t.Left, nil, scanVisibility)
left, err := p.getDataSource(ctx, t.Left, nil /* hints */, scanVisibility, lockForUpdate)
if err != nil {
return left, err
}
right, err := p.getDataSource(ctx, t.Right, nil, scanVisibility)
right, err := p.getDataSource(ctx, t.Right, nil /* hints */, scanVisibility, lockForUpdate)
if err != nil {
return right, err
}
Expand All @@ -417,10 +421,10 @@ func (p *planner) getDataSource(
}, nil

case *parser.ParenTableExpr:
return p.getDataSource(ctx, t.Expr, hints, scanVisibility)
return p.getDataSource(ctx, t.Expr, hints, scanVisibility, lockForUpdate)

case *parser.TableRef:
return p.getTableScanByRef(ctx, t, hints, scanVisibility)
return p.getTableScanByRef(ctx, t, hints, scanVisibility, lockForUpdate)

case *parser.AliasedTableExpr:
// Alias clause: source AS alias(cols...)
Expand All @@ -429,7 +433,7 @@ func (p *planner) getDataSource(
hints = t.Hints
}

src, err := p.getDataSource(ctx, t.Expr, hints, scanVisibility)
src, err := p.getDataSource(ctx, t.Expr, hints, scanVisibility, lockForUpdate)
if err != nil {
return src, err
}
Expand Down Expand Up @@ -479,6 +483,7 @@ func (p *planner) getTableScanByRef(
tref *parser.TableRef,
hints *parser.IndexHints,
scanVisibility scanVisibility,
lockForUpdate bool,
) (planDataSource, error) {
desc, err := p.getTableDescByID(ctx, sqlbase.ID(tref.TableID))
if err != nil {
Expand All @@ -498,7 +503,7 @@ func (p *planner) getTableScanByRef(
DBNameOriginallyOmitted: true,
}

src, err := p.getPlanForDesc(ctx, desc, &tn, hints, scanVisibility, tref.Columns)
src, err := p.getPlanForDesc(ctx, desc, &tn, hints, scanVisibility, lockForUpdate, tref.Columns)
if err != nil {
return src, err
}
Expand Down Expand Up @@ -572,6 +577,7 @@ func (p *planner) getTableScanOrViewPlan(
tn *parser.TableName,
hints *parser.IndexHints,
scanVisibility scanVisibility,
lockForUpdate bool,
) (planDataSource, error) {
if tn.PrefixOriginallySpecified {
// Prefixes are currently only supported for virtual tables.
Expand All @@ -584,7 +590,7 @@ func (p *planner) getTableScanOrViewPlan(
return planDataSource{}, err
}

return p.getPlanForDesc(ctx, desc, tn, hints, scanVisibility, nil)
return p.getPlanForDesc(ctx, desc, tn, hints, scanVisibility, lockForUpdate, nil /* wantedColumns */)
}

func (p *planner) getTableDesc(
Expand All @@ -607,6 +613,7 @@ func (p *planner) getPlanForDesc(
tn *parser.TableName,
hints *parser.IndexHints,
scanVisibility scanVisibility,
lockForUpdate bool,
wantedColumns []parser.ColumnID,
) (planDataSource, error) {
if desc.IsView() {
Expand All @@ -625,6 +632,7 @@ func (p *planner) getPlanForDesc(
if err := scan.initTable(p, desc, hints, scanVisibility, wantedColumns); err != nil {
return planDataSource{}, err
}
scan.lockForUpdate = lockForUpdate

return planDataSource{
info: newSourceInfoForSingleTable(*tn, planColumns(scan)),
Expand Down
3 changes: 2 additions & 1 deletion pkg/sql/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ func (p *planner) Delete(
Exprs: sqlbase.ColumnsSelectors(rd.FetchCols),
From: &parser.From{Tables: []parser.TableExpr{n.Table}},
Where: n.Where,
}, nil, n.Limit, nil, publicAndNonPublicColumns)
}, nil /* orderBy */, n.Limit, false, /* lockForUpdate */
nil /* desiredTypes */, publicAndNonPublicColumns)
if err != nil {
return nil, err
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/distsql_physical_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,9 @@ func initTableReaderSpec(
n *scanNode,
) (distsqlrun.TableReaderSpec, distsqlrun.PostProcessSpec, error) {
s := distsqlrun.TableReaderSpec{
Table: *n.desc,
Reverse: n.reverse,
Table: *n.desc,
Reverse: n.reverse,
LockForUpdate: n.lockForUpdate,
}
if n.index != &n.desc.PrimaryIndex {
for i := range n.desc.Indexes {
Expand Down Expand Up @@ -1785,8 +1786,9 @@ ColLoop:
}

joinReaderSpec := distsqlrun.JoinReaderSpec{
Table: *n.index.desc,
IndexIdx: 0,
Table: *n.index.desc,
IndexIdx: 0,
LockForUpdate: n.index.lockForUpdate,
}

post := distsqlrun.PostProcessSpec{
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsqlrun/columnbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ func (cb *columnBackfiller) init() error {
colIdxMap[c.ID] = i
}
return cb.fetcher.Init(
&desc, colIdxMap, &desc.PrimaryIndex, false, false, desc.Columns,
valNeededForCol, false, &cb.alloc,
&desc, colIdxMap, &desc.PrimaryIndex, false /* reverse */, false, /* lockForUpdate */
false /* isSecondaryIndex */, desc.Columns, valNeededForCol, false /* returnRangeInfo */, &cb.alloc,
)
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/distsqlrun/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func (ib *indexBackfiller) init() error {
}

return ib.fetcher.Init(
&desc, ib.colIdxMap, &desc.PrimaryIndex, false, false, cols,
valNeededForCol, false, &ib.alloc,
&desc, ib.colIdxMap, &desc.PrimaryIndex, false /* reverse */, false, /* lockForUpdate */
false /* isSecondaryIndex */, cols, valNeededForCol, false /* returnRangeInfo */, &ib.alloc,
)
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/distsqlrun/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func newJoinReader(
var err error
jr.index, _, err = initRowFetcher(
&jr.fetcher, &jr.desc, int(spec.IndexIdx), false, /* reverse */
jr.out.neededColumns(), &jr.alloc,
spec.LockForUpdate, jr.out.neededColumns(), &jr.alloc,
)
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit ccb6904

Please sign in to comment.