Skip to content

Commit

Permalink
bigquery: don't call tabledata.list if there are no rows in the query…
Browse files Browse the repository at this point in the history
… result

If jobs.getQueryResults says that the total rows in the result is
zero, have RowIterator.Next return immediately with iterator.Done,
without calling tabledata.list.

This allows queries with DML statements to be treated just like any
other query: callers can attempt to iterate over the resulting
rows. (They will get iterator.Done on calls to RowIterator.Next.)
Without this change, the call to tabledata.list will fail.

Change-Id: I9c051449ecc6307efbe20e9edc01ce6a7826e297
Reviewed-on: https://code-review.googlesource.com/30610
Reviewed-by: kokoro <[email protected]>
Reviewed-by: Jean de Klerk <[email protected]>
  • Loading branch information
jba committed Jul 19, 2018
1 parent f4024bf commit ffb8ba1
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 23 deletions.
25 changes: 15 additions & 10 deletions bigquery/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1166,23 +1166,27 @@ func TestIntegration_DML(t *testing.T) {

func runDML(ctx context.Context, sql string) error {
// Retry insert; sometimes it fails with INTERNAL.
return internal.Retry(ctx, gax.Backoff{}, func() (bool, error) {
// Use DML to insert.
q := client.Query(sql)
job, err := q.Run(ctx)
return internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
ri, err := client.Query(sql).Read(ctx)
if err != nil {
if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
return true, err // fail on 4xx
}
return false, err
}
if err := wait(ctx, job); err != nil {
if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
return true, err // fail on 4xx
}
return false, err
// It is OK to try to iterate over DML results. The first call to Next
// will return iterator.Done.
err = ri.Next(nil)
if err == nil {
return true, errors.New("want iterator.Done on the first call, got nil")
}
if err == iterator.Done {
return true, nil
}
return true, nil
if e, ok := err.(*googleapi.Error); ok && e.Code < 500 {
return true, err // fail on 4xx
}
return false, err
})
}

Expand Down Expand Up @@ -1891,6 +1895,7 @@ func TestIntegration_Model(t *testing.T) {
VALUES (1, 0), (2, 1), (3, 0), (4, 1)`,
tableName)
wantNumRows := 4

if err := runDML(ctx, sql); err != nil {
t.Fatal(err)
}
Expand Down
15 changes: 11 additions & 4 deletions bigquery/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,20 @@ import (
"google.golang.org/api/iterator"
)

// Construct a RowIterator.
// If pf is nil, there are no rows in the result set.
func newRowIterator(ctx context.Context, t *Table, pf pageFetcher) *RowIterator {
it := &RowIterator{
ctx: ctx,
table: t,
pf: pf,
}
it.pageInfo, it.nextFunc = iterator.NewPageInfo(
it.fetch,
func() int { return len(it.rows) },
func() interface{} { r := it.rows; it.rows = nil; return r })
if pf != nil {
it.pageInfo, it.nextFunc = iterator.NewPageInfo(
it.fetch,
func() int { return len(it.rows) },
func() interface{} { r := it.rows; it.rows = nil; return r })
}
return it
}

Expand Down Expand Up @@ -99,6 +103,9 @@ type RowIterator struct {
// NullDateTime. You can also use a *[]Value or *map[string]Value to read from a
// table with NULLs.
func (it *RowIterator) Next(dst interface{}) error {
if it.pf == nil { // There are no rows in the result set.
return iterator.Done
}
var vl ValueLoader
switch dst := dst.(type) {
case ValueLoader:
Expand Down
19 changes: 12 additions & 7 deletions bigquery/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (j *Job) Wait(ctx context.Context) (js *JobStatus, err error) {

if j.isQuery() {
// We can avoid polling for query jobs.
if _, err := j.waitForQuery(ctx, j.projectID); err != nil {
if _, _, err := j.waitForQuery(ctx, j.projectID); err != nil {
return nil, err
}
// Note: extra RPC even if you just want to wait for the query to finish.
Expand Down Expand Up @@ -262,7 +262,7 @@ func (j *Job) Read(ctx context.Context) (ri *RowIterator, err error) {
return j.read(ctx, j.waitForQuery, fetchPage)
}

func (j *Job) read(ctx context.Context, waitForQuery func(context.Context, string) (Schema, error), pf pageFetcher) (*RowIterator, error) {
func (j *Job) read(ctx context.Context, waitForQuery func(context.Context, string) (Schema, uint64, error), pf pageFetcher) (*RowIterator, error) {
if !j.isQuery() {
return nil, errors.New("bigquery: cannot read from a non-query job")
}
Expand All @@ -272,21 +272,26 @@ func (j *Job) read(ctx context.Context, waitForQuery func(context.Context, strin
if destTable != nil && projectID != destTable.ProjectId {
return nil, fmt.Errorf("bigquery: job project ID is %q, but destination table's is %q", projectID, destTable.ProjectId)
}
schema, err := waitForQuery(ctx, projectID)
schema, totalRows, err := waitForQuery(ctx, projectID)
if err != nil {
return nil, err
}
if destTable == nil {
return nil, errors.New("bigquery: query job missing destination table")
}
dt := bqToTable(destTable, j.c)
if totalRows == 0 {
pf = nil
}
it := newRowIterator(ctx, dt, pf)
it.Schema = schema
it.TotalRows = totalRows
return it, nil
}

// waitForQuery waits for the query job to complete and returns its schema.
func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, error) {
// waitForQuery waits for the query job to complete and returns its schema. It also
// returns the total number of rows in the result set.
func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint64, error) {
// Use GetQueryResults only to wait for completion, not to read results.
call := j.c.bqs.Jobs.GetQueryResults(projectID, j.jobID).Location(j.location).Context(ctx).MaxResults(0)
setClientHeader(call.Header())
Expand All @@ -307,9 +312,9 @@ func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, error
return true, nil
})
if err != nil {
return nil, err
return nil, 0, err
}
return bqToSchema(res.Schema), nil
return bqToSchema(res.Schema), res.TotalRows, nil
}

// JobStatistics contains statistics about a job.
Expand Down
4 changes: 2 additions & 2 deletions bigquery/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func (s *pageFetcherReadStub) fetchPage(ctx context.Context, t *Table, schema Sc
return result, nil
}

func waitForQueryStub(context.Context, string) (Schema, error) {
return nil, nil
func waitForQueryStub(context.Context, string) (Schema, uint64, error) {
return nil, 1, nil
}

func TestRead(t *testing.T) {
Expand Down

0 comments on commit ffb8ba1

Please sign in to comment.