diff --git a/httpd/handler.go b/httpd/handler.go index 30dd4d403f8..785d19af8ea 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -24,6 +24,10 @@ import ( "github.com/influxdb/influxdb/uuid" ) +const ( + DefaultChunkSize = 10000 +) + // TODO: Standard response headers (see: HeaderHandler) // TODO: Compression (see: CompressionHeaderHandler) @@ -172,12 +176,91 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ httpError(w, "error parsing query: "+err.Error(), pretty, http.StatusBadRequest) return } - - // Execute query. One result will return for each statement. - results := h.server.ExecuteQuery(query, db, user) + chunked := q.Get("chunked") == "true" + chunkSize := influxdb.NoChunkingSize + if chunked { + cs, err := strconv.ParseInt(q.Get("chunk_size"), 10, 64) + if err != nil { + chunkSize = DefaultChunkSize + } else { + chunkSize = int(cs) + } + } // Send results to client. - httpResults(w, results, pretty) + w.Header().Add("content-type", "application/json") + results, err := h.server.ExecuteQuery(query, db, user, chunkSize) + if err != nil { + if isAuthorizationError(err) { + w.WriteHeader(http.StatusUnauthorized) + } else { + w.WriteHeader(http.StatusInternalServerError) + } + return + } + + res := influxdb.Results{Results: make([]*influxdb.Result, 0)} + + statusWritten := false + + for r := range results { + // write the status header based on the first result returned in the channel + if !statusWritten { + if r != nil && r.Err != nil { + if isAuthorizationError(r.Err) { + w.WriteHeader(http.StatusUnauthorized) + } else if isMeasurementNotFoundError(r.Err) { + w.WriteHeader(http.StatusOK) + } else if isFieldNotFoundError(r.Err) { + w.WriteHeader(http.StatusOK) + } else { + w.WriteHeader(http.StatusInternalServerError) + } + } else { + w.WriteHeader(http.StatusOK) + } + statusWritten = true + } + + if r == nil { + continue + } + + // if chunked we write out this result and flush + if chunked { + w.Write(marshalPretty(r, pretty)) + continue + //w.(http.Flusher).Flush() + } + + // it's not chunked so buffer results in memory. + // results for statements need to be combined together. We need to check if this new result is + // for the same statement as the last result, or for the next statement + l := len(res.Results) + if l == 0 { + res.Results = append(res.Results, r) + } else if res.Results[l-1].StatementID == r.StatementID { + cr := res.Results[l-1] + cr.Series = append(cr.Series, r.Series...) + } else { + res.Results = append(res.Results, r) + } + } + + // if it's not chunked we buffered everything in memory, so write it out + if !chunked { + w.Write(marshalPretty(res, pretty)) + } +} + +func marshalPretty(r interface{}, pretty bool) []byte { + var b []byte + if pretty { + b, _ = json.MarshalIndent(r, "", " ") + } else { + b, _ = json.Marshal(r) + } + return b } func interfaceToString(v interface{}) string { @@ -211,9 +294,14 @@ type Batch struct { // Return all the measurements from the given DB func (h *Handler) showMeasurements(db string, user *influxdb.User) ([]string, error) { var measurements []string - results := h.server.ExecuteQuery(&influxql.Query{Statements: []influxql.Statement{&influxql.ShowMeasurementsStatement{}}}, db, user) - if results.Err != nil { - return measurements, results.Err + c, err := h.server.ExecuteQuery(&influxql.Query{Statements: []influxql.Statement{&influxql.ShowMeasurementsStatement{}}}, db, user, 0) + if err != nil { + return measurements, err + } + results := influxdb.Results{} + + for r := range c { + results.Results = append(results.Results, r) } for _, result := range results.Results { @@ -263,9 +351,14 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influx httpError(w, "error with dump: "+err.Error(), pretty, http.StatusInternalServerError) return } - // - results := h.server.ExecuteQuery(query, db, user) - for _, result := range results.Results { + + res, err := h.server.ExecuteQuery(query, db, user, DefaultChunkSize) + if err != nil { + w.Write([]byte("*** SERVER-SIDE ERROR. MISSING DATA ***")) + w.Write(delim) + return + } + for result := range res { for _, row := range result.Series { points := make([]Point, 1) var point Point @@ -571,31 +664,6 @@ func isFieldNotFoundError(err error) bool { return (strings.HasPrefix(err.Error(), "field not found")) } -// httpResult writes a Results array to the client. -func httpResults(w http.ResponseWriter, results influxdb.Results, pretty bool) { - w.Header().Add("content-type", "application/json") - - if results.Error() != nil { - if isAuthorizationError(results.Error()) { - w.WriteHeader(http.StatusUnauthorized) - } else if isMeasurementNotFoundError(results.Error()) { - w.WriteHeader(http.StatusOK) - } else if isFieldNotFoundError(results.Error()) { - w.WriteHeader(http.StatusOK) - } else { - w.WriteHeader(http.StatusInternalServerError) - } - } - - var b []byte - if pretty { - b, _ = json.MarshalIndent(results, "", " ") - } else { - b, _ = json.Marshal(results) - } - w.Write(b) -} - // httpError writes an error to the client in a standard format. func httpError(w http.ResponseWriter, error string, pretty bool, code int) { w.Header().Add("content-type", "application/json") diff --git a/httpd/handler_test.go b/httpd/handler_test.go index 2840e0ded08..9f665c21dd2 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -11,6 +11,7 @@ import ( "net/http/httptest" "net/url" "os" + "reflect" "strings" "testing" "time" @@ -1615,10 +1616,73 @@ func TestSnapshotHandler(t *testing.T) { } } +// Ensure that the server will stream out results if a chunked response is requested +func TestHandler_ChunkedResponses(t *testing.T) { + c := test.NewMessagingClient() + defer c.Close() + srvr := OpenAuthlessServer(c) + srvr.CreateDatabase("foo") + srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar")) + srvr.SetDefaultRetentionPolicy("foo", "bar") + + s := NewHTTPServer(srvr) + defer s.Close() + + status, errString := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [ + {"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 100}}, + {"name": "cpu", "tags": {"host": "server02"},"timestamp": "2009-11-10T23:30:00Z", "fields": {"value": 25}}]}`) + if status != http.StatusOK { + t.Fatalf("unexpected status: %d - %s", status, errString) + } + time.Sleep(100 * time.Millisecond) // Ensure data node picks up write. + + resp, err := chunkedQuery(s.URL, "foo", "select value from cpu") + if err != nil { + t.Fatalf("error making request: %s", err.Error()) + } + defer resp.Body.Close() + for i := 0; i < 2; i++ { + chunk := make([]byte, 2048, 2048) + n, err := resp.Body.Read(chunk) + if err != nil { + t.Fatalf("error reading response: %s", err.Error()) + } + results := &influxdb.Results{} + err = json.Unmarshal(chunk[0:n], results) + if err != nil { + t.Fatalf("error unmarshaling resultsz: %s", err.Error()) + } + if len(results.Results) != 1 { + t.Fatalf("didn't get 1 result: %s\n", mustMarshalJSON(results)) + } + if len(results.Results[0].Series) != 1 { + t.Fatalf("didn't get 1 series: %s\n", mustMarshalJSON(results)) + } + var vals [][]interface{} + if i == 0 { + vals = [][]interface{}{{"2009-11-10T23:00:00Z", 100}} + } else { + vals = [][]interface{}{{"2009-11-10T23:30:00Z", 25}} + } + if !reflect.DeepEqual(results.Results[0].Series[0].Values, vals) { + t.Fatalf("values weren't what was expected:\n exp: %s\n got: %s", mustMarshalJSON(vals), mustMarshalJSON(results.Results[0].Series[0].Values)) + } + } +} + // batchWrite JSON Unmarshal tests // Utility functions for this test suite. +func chunkedQuery(host, db, q string) (*http.Response, error) { + params := map[string]string{"db": db, "q": q, "chunked": "true", "chunk_size": "1"} + query := url.Values{} + for k, v := range params { + query.Set(k, v) + } + return http.Get(host + "/query?" + query.Encode()) +} + func MustHTTP(verb, path string, params, headers map[string]string, body string) (int, string) { req, err := http.NewRequest(verb, path, bytes.NewBuffer([]byte(body))) if err != nil { diff --git a/influxql/engine.go b/influxql/engine.go index 25caa168d84..3288156847e 100644 --- a/influxql/engine.go +++ b/influxql/engine.go @@ -576,7 +576,7 @@ func NewPlanner(db DB) *Planner { } // Plan creates an execution plan for the given SelectStatement and returns an Executor. -func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) { +func (p *Planner) Plan(stmt *SelectStatement, chunkSize int) (*Executor, error) { now := p.Now().UTC() // Replace instances of "now()" with the current time. diff --git a/server.go b/server.go index d285412f0c3..c8a75faa42f 100644 --- a/server.go +++ b/server.go @@ -49,6 +49,9 @@ const ( // Defines the minimum duration allowed for all retention policies retentionPolicyMinDuration = time.Hour + + // When planning a select statement, passing zero tells it not to chunk results + NoChunkingSize = 0 ) // Server represents a collection of metadata and raw metric data. @@ -2027,9 +2030,10 @@ func (s *Server) ReadSeries(database, retentionPolicy, name string, tags map[str } // ExecuteQuery executes an InfluxQL query against the server. -// Returns a resultset for each statement in the query. -// Stops on first execution error that occurs. -func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Results { +// If the user isn't authorized to access the database an error will be returned. +// It sends results down the passed in chan and closes it when done. It will close the chan +// on the first statement that throws an error. +func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User, chunkSize int) (chan *Result, error) { // Authorize user to execute the query. if s.authenticationEnabled { if err := s.Authorize(user, q, database); err != nil { @@ -2037,127 +2041,144 @@ func (s *Server) ExecuteQuery(q *influxql.Query, database string, user *User) Re } } - // Build empty resultsets. - results := Results{Results: make([]*Result, len(q.Statements))} s.stats.Add("queriesRx", int64(len(q.Statements))) - // Execute each statement. - for i, stmt := range q.Statements { - // Set default database and policy on the statement. - if err := s.NormalizeStatement(stmt, database); err != nil { - results.Results[i] = &Result{Err: err} - break - } + // Execute each statement. Keep the iterator external so we can + // track how many of the statements were executed + results := make(chan *Result) + go func() { + var i int + var stmt influxql.Statement + for i, stmt = range q.Statements { + // Set default database and policy on the statement. + if err := s.NormalizeStatement(stmt, database); err != nil { + results <- &Result{Err: err} + break + } - var res *Result - switch stmt := stmt.(type) { - case *influxql.SelectStatement: - res = s.executeSelectStatement(stmt, database, user) - case *influxql.CreateDatabaseStatement: - res = s.executeCreateDatabaseStatement(stmt, user) - case *influxql.DropDatabaseStatement: - res = s.executeDropDatabaseStatement(stmt, user) - case *influxql.ShowDatabasesStatement: - res = s.executeShowDatabasesStatement(stmt, user) - case *influxql.ShowServersStatement: - res = s.executeShowServersStatement(stmt, user) - case *influxql.CreateUserStatement: - res = s.executeCreateUserStatement(stmt, user) - case *influxql.DeleteStatement: - res = s.executeDeleteStatement() - case *influxql.DropUserStatement: - res = s.executeDropUserStatement(stmt, user) - case *influxql.ShowUsersStatement: - res = s.executeShowUsersStatement(stmt, user) - case *influxql.DropSeriesStatement: - res = s.executeDropSeriesStatement(stmt, database, user) - case *influxql.ShowSeriesStatement: - res = s.executeShowSeriesStatement(stmt, database, user) - case *influxql.DropMeasurementStatement: - res = s.executeDropMeasurementStatement(stmt, database, user) - case *influxql.ShowMeasurementsStatement: - res = s.executeShowMeasurementsStatement(stmt, database, user) - case *influxql.ShowTagKeysStatement: - res = s.executeShowTagKeysStatement(stmt, database, user) - case *influxql.ShowTagValuesStatement: - res = s.executeShowTagValuesStatement(stmt, database, user) - case *influxql.ShowFieldKeysStatement: - res = s.executeShowFieldKeysStatement(stmt, database, user) - case *influxql.ShowStatsStatement: - res = s.executeShowStatsStatement(stmt, user) - case *influxql.ShowDiagnosticsStatement: - res = s.executeShowDiagnosticsStatement(stmt, user) - case *influxql.GrantStatement: - res = s.executeGrantStatement(stmt, user) - case *influxql.RevokeStatement: - res = s.executeRevokeStatement(stmt, user) - case *influxql.CreateRetentionPolicyStatement: - res = s.executeCreateRetentionPolicyStatement(stmt, user) - case *influxql.AlterRetentionPolicyStatement: - res = s.executeAlterRetentionPolicyStatement(stmt, user) - case *influxql.DropRetentionPolicyStatement: - res = s.executeDropRetentionPolicyStatement(stmt, user) - case *influxql.ShowRetentionPoliciesStatement: - res = s.executeShowRetentionPoliciesStatement(stmt, user) - case *influxql.CreateContinuousQueryStatement: - res = s.executeCreateContinuousQueryStatement(stmt, user) - case *influxql.DropContinuousQueryStatement: - continue - case *influxql.ShowContinuousQueriesStatement: - res = s.executeShowContinuousQueriesStatement(stmt, database, user) - default: - panic(fmt.Sprintf("unsupported statement type: %T", stmt)) - } + var res *Result + switch stmt := stmt.(type) { + case *influxql.SelectStatement: + if err := s.executeSelectStatement(i, stmt, database, user, results, chunkSize); err != nil { + results <- &Result{Err: err} + break + } + case *influxql.CreateDatabaseStatement: + res = s.executeCreateDatabaseStatement(stmt, user) + case *influxql.DropDatabaseStatement: + res = s.executeDropDatabaseStatement(stmt, user) + case *influxql.ShowDatabasesStatement: + res = s.executeShowDatabasesStatement(stmt, user) + case *influxql.ShowServersStatement: + res = s.executeShowServersStatement(stmt, user) + case *influxql.CreateUserStatement: + res = s.executeCreateUserStatement(stmt, user) + case *influxql.DeleteStatement: + res = s.executeDeleteStatement() + case *influxql.DropUserStatement: + res = s.executeDropUserStatement(stmt, user) + case *influxql.ShowUsersStatement: + res = s.executeShowUsersStatement(stmt, user) + case *influxql.DropSeriesStatement: + res = s.executeDropSeriesStatement(stmt, database, user) + case *influxql.ShowSeriesStatement: + res = s.executeShowSeriesStatement(stmt, database, user) + case *influxql.DropMeasurementStatement: + res = s.executeDropMeasurementStatement(stmt, database, user) + case *influxql.ShowMeasurementsStatement: + res = s.executeShowMeasurementsStatement(stmt, database, user) + case *influxql.ShowTagKeysStatement: + res = s.executeShowTagKeysStatement(stmt, database, user) + case *influxql.ShowTagValuesStatement: + res = s.executeShowTagValuesStatement(stmt, database, user) + case *influxql.ShowFieldKeysStatement: + res = s.executeShowFieldKeysStatement(stmt, database, user) + case *influxql.ShowStatsStatement: + res = s.executeShowStatsStatement(stmt, user) + case *influxql.GrantStatement: + res = s.executeGrantStatement(stmt, user) + case *influxql.RevokeStatement: + res = s.executeRevokeStatement(stmt, user) + case *influxql.CreateRetentionPolicyStatement: + res = s.executeCreateRetentionPolicyStatement(stmt, user) + case *influxql.AlterRetentionPolicyStatement: + res = s.executeAlterRetentionPolicyStatement(stmt, user) + case *influxql.DropRetentionPolicyStatement: + res = s.executeDropRetentionPolicyStatement(stmt, user) + case *influxql.ShowRetentionPoliciesStatement: + res = s.executeShowRetentionPoliciesStatement(stmt, user) + case *influxql.CreateContinuousQueryStatement: + res = s.executeCreateContinuousQueryStatement(stmt, user) + case *influxql.DropContinuousQueryStatement: + continue + case *influxql.ShowContinuousQueriesStatement: + res = s.executeShowContinuousQueriesStatement(stmt, database, user) + default: + panic(fmt.Sprintf("unsupported statement type: %T", stmt)) + } - // If an error occurs then stop processing remaining statements. - results.Results[i] = res - if res.Err != nil { - break + if res != nil { + // set the StatementID for the handler on the other side to combine results + res.StatementID = i + + // If an error occurs then stop processing remaining statements. + results <- res + if res.Err != nil { + break + } + } } - s.stats.Inc("queriesExecuted") - } - // Fill any empty results after error. - for i, res := range results.Results { - if res == nil { - results.Results[i] = &Result{Err: ErrNotExecuted} + // if there was an error send results that the remaining statements weren't executed + for ; i < len(q.Statements)-1; i++ { + results <- &Result{Err: ErrNotExecuted} } - } - return results + s.stats.Inc("queriesExecuted") + close(results) + }() + + return results, nil } // executeSelectStatement plans and executes a select statement against a database. -func (s *Server) executeSelectStatement(stmt *influxql.SelectStatement, database string, user *User) *Result { +func (s *Server) executeSelectStatement(statementID int, stmt *influxql.SelectStatement, database string, user *User, results chan *Result, chunkSize int) error { // Perform any necessary query re-writing. stmt, err := s.rewriteSelectStatement(stmt) if err != nil { - return &Result{Err: err} + return err } // Plan statement execution. - e, err := s.planSelectStatement(stmt) + e, err := s.planSelectStatement(stmt, chunkSize) if err != nil { - return &Result{Err: err} + return err } // Execute plan. ch, err := e.Execute() if err != nil { - return &Result{Err: err} + return err } - // Read all rows from channel. - res := &Result{Series: make([]*influxql.Row, 0)} + // Stream results from the channel. We should send an empty result if nothing comes through. + resultSent := false for row := range ch { if row.Err != nil { res.Err = row.Err - return res + return row.Err + } else { + resultSent = true + results <- &Result{StatementID: statementID, Series: []*influxql.Row{row}} } res.Series = append(res.Series, row) } - return res + if !resultSent { + results <- &Result{StatementID: statementID, Series: make([]*influxql.Row, 0)} + } + + return nil } // rewriteSelectStatement performs any necessary query re-writing. @@ -2291,14 +2312,14 @@ func (s *Server) expandSources(sources influxql.Sources) (influxql.Sources, erro } // plans a selection statement under lock. -func (s *Server) planSelectStatement(stmt *influxql.SelectStatement) (*influxql.Executor, error) { +func (s *Server) planSelectStatement(stmt *influxql.SelectStatement, chunkSize int) (*influxql.Executor, error) { s.mu.RLock() defer s.mu.RUnlock() // Plan query. p := influxql.NewPlanner(s) - return p.Plan(stmt) + return p.Plan(stmt, chunkSize) } func (s *Server) executeCreateDatabaseStatement(q *influxql.CreateDatabaseStatement, user *User) *Result { @@ -3253,8 +3274,11 @@ func (s *Server) processor(conn MessagingConn, done chan struct{}) { // Result represents a resultset returned from a single statement. type Result struct { - Series influxql.Rows - Err error + // StatementID is just the statement's position in the query. It's used + // to combine statement results if they're being buffered in memory. + StatementID int `json:"-"` + Series influxql.Rows + Err error } // MarshalJSON encodes the result into JSON. @@ -3733,7 +3757,7 @@ func (s *Server) runContinuousQuery(cq *ContinuousQuery) { // runContinuousQueryAndWriteResult will run the query against the cluster and write the results back in func (s *Server) runContinuousQueryAndWriteResult(cq *ContinuousQuery) error { - e, err := s.planSelectStatement(cq.cq.Source) + e, err := s.planSelectStatement(cq.cq.Source, NoChunkingSize) if err != nil { return err diff --git a/server_test.go b/server_test.go index 28b8c29bddb..a0e1e950db3 100644 --- a/server_test.go +++ b/server_test.go @@ -757,7 +757,7 @@ func TestServer_AlterRetentionPolicy(t *testing.T) { // Test update duration only. duration = time.Hour - results := s.ExecuteQuery(MustParseQuery(`ALTER RETENTION POLICY bar ON foo DURATION 1h`), "foo", nil) + results := s.executeQuery(MustParseQuery(`ALTER RETENTION POLICY bar ON foo DURATION 1h`), "foo", nil) if results.Error() != nil { t.Fatalf("unexpected error: %s", results.Error()) } @@ -775,7 +775,7 @@ func TestServer_AlterRetentionPolicy(t *testing.T) { // set duration to infinite to catch edge case. duration = 0 - results = s.ExecuteQuery(MustParseQuery(`ALTER RETENTION POLICY bar ON foo DURATION INF`), "foo", nil) + results = s.executeQuery(MustParseQuery(`ALTER RETENTION POLICY bar ON foo DURATION INF`), "foo", nil) if results.Error() != nil { t.Fatalf("unexpected error: %s", results.Error()) } @@ -842,7 +842,7 @@ func TestServer_AlterRetentionPolicy_Minduration(t *testing.T) { // Test update duration only. duration = time.Hour - results := s.ExecuteQuery(MustParseQuery(`ALTER RETENTION POLICY bar ON foo DURATION 1m`), "foo", nil) + results := s.executeQuery(MustParseQuery(`ALTER RETENTION POLICY bar ON foo DURATION 1m`), "foo", nil) if results.Error() == nil { t.Fatalf("unexpected error: %s", results.Error()) } @@ -1061,7 +1061,7 @@ func TestServer_DropMeasurement(t *testing.T) { c.Sync(index) // Ensure measurement exists - results := s.ExecuteQuery(MustParseQuery(`SHOW MEASUREMENTS`), "foo", nil) + results := s.executeQuery(MustParseQuery(`SHOW MEASUREMENTS`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 1 { @@ -1071,7 +1071,7 @@ func TestServer_DropMeasurement(t *testing.T) { } // Ensure series exists - results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + results = s.executeQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 1 { @@ -1081,12 +1081,12 @@ func TestServer_DropMeasurement(t *testing.T) { } // Drop measurement - results = s.ExecuteQuery(MustParseQuery(`DROP MEASUREMENT cpu`), "foo", nil) + results = s.executeQuery(MustParseQuery(`DROP MEASUREMENT cpu`), "foo", nil) if results.Error() != nil { t.Fatalf("unexpected error: %s", results.Error()) } - results = s.ExecuteQuery(MustParseQuery(`SHOW MEASUREMENTS`), "foo", nil) + results = s.executeQuery(MustParseQuery(`SHOW MEASUREMENTS`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 0 { @@ -1095,7 +1095,7 @@ func TestServer_DropMeasurement(t *testing.T) { t.Fatalf("unexpected row(0): %s", s) } - results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + results = s.executeQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 0 { @@ -1116,7 +1116,7 @@ func TestServer_DropMeasurementNoneExists(t *testing.T) { s.CreateUser("susy", "pass", false) // Drop measurement - results := s.ExecuteQuery(MustParseQuery(`DROP MEASUREMENT bar`), "foo", nil) + results := s.executeQuery(MustParseQuery(`DROP MEASUREMENT bar`), "foo", nil) if results.Error().Error() != `measurement not found` { t.Fatalf("unexpected error: %s", results.Error()) } @@ -1130,7 +1130,7 @@ func TestServer_DropMeasurementNoneExists(t *testing.T) { c.Sync(index) // Drop measurement after writing data to ensure we still get the same error - results = s.ExecuteQuery(MustParseQuery(`DROP MEASUREMENT bar`), "foo", nil) + results = s.executeQuery(MustParseQuery(`DROP MEASUREMENT bar`), "foo", nil) if results.Error().Error() != `measurement not found` { t.Fatalf("unexpected error: %s", results.Error()) } @@ -1155,7 +1155,7 @@ func TestServer_DropSeries(t *testing.T) { c.Sync(index) // Ensure series exists - results := s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + results := s.executeQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 1 { @@ -1165,12 +1165,12 @@ func TestServer_DropSeries(t *testing.T) { } // Drop series - results = s.ExecuteQuery(MustParseQuery(`DROP SERIES FROM cpu`), "foo", nil) + results = s.executeQuery(MustParseQuery(`DROP SERIES FROM cpu`), "foo", nil) if results.Error() != nil { t.Fatalf("unexpected error: %s", results.Error()) } - results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + results = s.executeQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 0 { @@ -1206,12 +1206,12 @@ func TestServer_DropSeriesFromMeasurement(t *testing.T) { c.Sync(index) // Drop series - results := s.ExecuteQuery(MustParseQuery(`DROP SERIES FROM memory`), "foo", nil) + results := s.executeQuery(MustParseQuery(`DROP SERIES FROM memory`), "foo", nil) if results.Error() != nil { t.Fatalf("unexpected error: %s", results.Error()) } - results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + results = s.executeQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 1 { @@ -1251,7 +1251,7 @@ func TestServer_DropSeriesTagsPreserved(t *testing.T) { } c.Sync(index) - results := s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + results := s.executeQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 1 { @@ -1260,12 +1260,12 @@ func TestServer_DropSeriesTagsPreserved(t *testing.T) { t.Fatalf("unexpected row(0): %s", s) } - results = s.ExecuteQuery(MustParseQuery(`DROP SERIES FROM cpu where host='serverA'`), "foo", nil) + results = s.executeQuery(MustParseQuery(`DROP SERIES FROM cpu where host='serverA'`), "foo", nil) if results.Error() != nil { t.Fatalf("unexpected error: %s", results.Error()) } - results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) + results = s.executeQuery(MustParseQuery(`SHOW SERIES`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 1 { @@ -1274,14 +1274,16 @@ func TestServer_DropSeriesTagsPreserved(t *testing.T) { t.Fatalf("unexpected row(0): %s", s) } - results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM cpu where host='serverA'`), "foo", nil) - if res := results.Results[0]; res.Err != nil { + results = s.executeQuery(MustParseQuery(`SELECT * FROM cpu where host='serverA'`), "foo", nil) + if len(results.Results) == 0 { + t.Fatal("expected results to be non-empty") + } else if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 0 { t.Fatalf("unexpected row count: %d", len(res.Series)) } - results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM cpu where host='serverB'`), "foo", nil) + results = s.executeQuery(MustParseQuery(`SELECT * FROM cpu where host='serverB'`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 1 { @@ -1290,7 +1292,7 @@ func TestServer_DropSeriesTagsPreserved(t *testing.T) { t.Fatalf("unexpected row(0): %s", s) } - results = s.ExecuteQuery(MustParseQuery(`SELECT * FROM cpu where region='uswest'`), "foo", nil) + results = s.executeQuery(MustParseQuery(`SELECT * FROM cpu where region='uswest'`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 1 { @@ -1318,7 +1320,7 @@ func TestServer_ShowSeriesLimitOffset(t *testing.T) { s.MustWriteSeries("foo", "raw", []influxdb.Point{{Name: "memory", Tags: map[string]string{"region": "us-east", "host": "serverA"}, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(100)}}}) // Select data from the server. - results := s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 3 OFFSET 1`), "foo", nil) + results := s.executeQuery(MustParseQuery(`SHOW SERIES LIMIT 3 OFFSET 1`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 2 { @@ -1328,7 +1330,7 @@ func TestServer_ShowSeriesLimitOffset(t *testing.T) { } // Select data from the server. - results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 2 OFFSET 4`), "foo", nil) + results = s.executeQuery(MustParseQuery(`SHOW SERIES LIMIT 2 OFFSET 4`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 1 { @@ -1338,7 +1340,7 @@ func TestServer_ShowSeriesLimitOffset(t *testing.T) { } // Select data from the server. - results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 2 OFFSET 20`), "foo", nil) + results = s.executeQuery(MustParseQuery(`SHOW SERIES LIMIT 2 OFFSET 20`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 0 { @@ -1346,7 +1348,7 @@ func TestServer_ShowSeriesLimitOffset(t *testing.T) { } // Select data from the server. - results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 4 OFFSET 0`), "foo", nil) + results = s.executeQuery(MustParseQuery(`SHOW SERIES LIMIT 4 OFFSET 0`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 2 { @@ -1354,7 +1356,7 @@ func TestServer_ShowSeriesLimitOffset(t *testing.T) { } // Select data from the server. - results = s.ExecuteQuery(MustParseQuery(`SHOW SERIES LIMIT 20`), "foo", nil) + results = s.executeQuery(MustParseQuery(`SHOW SERIES LIMIT 20`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error: %s", res.Err) } else if len(res.Series) != 2 { @@ -1742,7 +1744,7 @@ func TestServer_RunContinuousQueries(t *testing.T) { time.Sleep(time.Millisecond * 100) verify := func(num int, exp string) { - results := s.ExecuteQuery(MustParseQuery(`SELECT mean(mean) FROM cpu_region GROUP BY region`), "foo", nil) + results := s.executeQuery(MustParseQuery(`SELECT mean(mean) FROM cpu_region GROUP BY region`), "foo", nil) if res := results.Results[0]; res.Err != nil { t.Fatalf("unexpected error verify %d: %s", num, res.Err) } else if len(res.Series) != 2 { @@ -1943,6 +1945,25 @@ func (s *Server) MustWriteSeries(database, retentionPolicy string, points []infl return index } +func (s *Server) executeQuery(q *influxql.Query, db string, user *influxdb.User) influxdb.Results { + results, err := s.ExecuteQuery(q, db, user, influxdb.NoChunkingSize) + if err != nil { + return influxdb.Results{Err: err} + } + res := influxdb.Results{} + for r := range results { + l := len(res.Results) + if l == 0 { + res.Results = append(res.Results, r) + } else if res.Results[l-1].StatementID == r.StatementID { + res.Results[l-1].Series = append(res.Results[l-1].Series, r.Series...) + } else { + res.Results = append(res.Results, r) + } + } + return res +} + // tempfile returns a temporary path. func tempfile() string { f, _ := ioutil.TempFile("", "influxdb-")