Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support chunked responses #2142

Merged
merged 10 commits into from
Apr 2, 2015
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
- [#2128](https://github.com/influxdb/influxdb/pull/2128): Data node discovery from brokers
- [#2142](https://github.com/influxdb/influxdb/pull/2142): Support chunked queries

### Bugfixes
- [#2147](https://github.com/influxdb/influxdb/pull/2147): Set Go Max procs in a better location
Expand Down
1 change: 1 addition & 0 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
expected: `{"results":[{"series":[{"name":"cpu","columns":["time","alert_id"],"values":[["2015-02-28T01:03:36.703820946Z","alert"]]}]}]}`,
},
{
name: "select where field greater than some value",
write: `{"database" : "%DB%", "retentionPolicy" : "%RP%", "points": [{"name": "cpu", "timestamp": "2009-11-10T23:00:02Z", "fields": {"load": 100}},
{"name": "cpu", "timestamp": "2009-11-10T23:01:02Z", "fields": {"load": 80}}]}`,
query: `select load from "%DB%"."%RP%".cpu where load > 100`,
Expand Down
162 changes: 128 additions & 34 deletions httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import (
"github.com/influxdb/influxdb/uuid"
)

const (
// With raw data queries, mappers will read up to this amount before sending results back to the engine.
// This is the default size in the number of values returned in a raw query. Could be many more bytes depending on fields returned.
DefaultChunkSize = 10000
)

// TODO: Standard response headers (see: HeaderHandler)
// TODO: Compression (see: CompressionHeaderHandler)

Expand Down Expand Up @@ -173,11 +179,110 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ
return
}

// Execute query. One result will return for each statement.
results := h.server.ExecuteQuery(query, db, user)
// get the chunking settings
chunked := q.Get("chunked") == "true"
// even if we're not chunking, the engine will chunk at this size and then the handler will combine results
chunkSize := DefaultChunkSize
if chunked {
if cs, err := strconv.ParseInt(q.Get("chunk_size"), 10, 64); err == nil {
chunkSize = int(cs)
}
}

// Send results to client.
httpResults(w, results, pretty)
w.Header().Add("content-type", "application/json")
results, err := h.server.ExecuteQuery(query, db, user, chunkSize)
if err != nil {
if isAuthorizationError(err) {
w.WriteHeader(http.StatusUnauthorized)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
return
}

// if we're not chunking, this will be the in memory buffer for all results before sending to client
res := influxdb.Results{Results: make([]*influxdb.Result, 0)}
statusWritten := false

// pull all results from the channel
for r := range results {
// write the status header based on the first result returned in the channel
if !statusWritten {
if r != nil && r.Err != nil {
if isAuthorizationError(r.Err) {
w.WriteHeader(http.StatusUnauthorized)
} else if isMeasurementNotFoundError(r.Err) {
w.WriteHeader(http.StatusOK)
} else if isFieldNotFoundError(r.Err) {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
} else {
w.WriteHeader(http.StatusOK)
}
statusWritten = true
}

// ignore nils
if r == nil {
continue
}

// if chunked we write out this result and flush
if chunked {
res.Results = []*influxdb.Result{r}
w.Write(marshalPretty(res, pretty))
w.(http.Flusher).Flush()
continue
}

// it's not chunked so buffer results in memory.
// results for statements need to be combined together. We need to check if this new result is
// for the same statement as the last result, or for the next statement
l := len(res.Results)
if l == 0 {
res.Results = append(res.Results, r)
} else if res.Results[l-1].StatementID == r.StatementID {
cr := res.Results[l-1]
cr.Series = append(cr.Series, r.Series...)
} else {
res.Results = append(res.Results, r)
}
}

// if it's not chunked we buffered everything in memory, so write it out
if !chunked {
w.Write(marshalPretty(res, pretty))
}
}

// marshalPretty will marshal the interface to json either pretty printed or not
func marshalPretty(r interface{}, pretty bool) []byte {
var b []byte
var err error
if pretty {
b, err = json.MarshalIndent(r, "", " ")
} else {
b, err = json.Marshal(r)
}

// if for some reason there was an error, convert to a result object with the error
if err != nil {
if pretty {
b, err = json.MarshalIndent(&influxdb.Result{Err: err}, "", " ")
} else {
b, err = json.Marshal(&influxdb.Result{Err: err})
}
}

// if there's still an error, json is out and a straight up error string is in
if err != nil {
return []byte(err.Error())
}

return b
}

func interfaceToString(v interface{}) string {
Expand Down Expand Up @@ -211,9 +316,14 @@ type Batch struct {
// Return all the measurements from the given DB
func (h *Handler) showMeasurements(db string, user *influxdb.User) ([]string, error) {
var measurements []string
results := h.server.ExecuteQuery(&influxql.Query{Statements: []influxql.Statement{&influxql.ShowMeasurementsStatement{}}}, db, user)
if results.Err != nil {
return measurements, results.Err
c, err := h.server.ExecuteQuery(&influxql.Query{Statements: []influxql.Statement{&influxql.ShowMeasurementsStatement{}}}, db, user, 0)
if err != nil {
return measurements, err
}
results := influxdb.Results{}

for r := range c {
results.Results = append(results.Results, r)
}

for _, result := range results.Results {
Expand Down Expand Up @@ -263,9 +373,14 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influx
httpError(w, "error with dump: "+err.Error(), pretty, http.StatusInternalServerError)
return
}
//
results := h.server.ExecuteQuery(query, db, user)
for _, result := range results.Results {

res, err := h.server.ExecuteQuery(query, db, user, DefaultChunkSize)
if err != nil {
w.Write([]byte("*** SERVER-SIDE ERROR. MISSING DATA ***"))
w.Write(delim)
return
}
for result := range res {
for _, row := range result.Series {
points := make([]Point, 1)
var point Point
Expand Down Expand Up @@ -571,31 +686,6 @@ func isFieldNotFoundError(err error) bool {
return (strings.HasPrefix(err.Error(), "field not found"))
}

// httpResult writes a Results array to the client.
func httpResults(w http.ResponseWriter, results influxdb.Results, pretty bool) {
w.Header().Add("content-type", "application/json")

if results.Error() != nil {
if isAuthorizationError(results.Error()) {
w.WriteHeader(http.StatusUnauthorized)
} else if isMeasurementNotFoundError(results.Error()) {
w.WriteHeader(http.StatusOK)
} else if isFieldNotFoundError(results.Error()) {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
}

var b []byte
if pretty {
b, _ = json.MarshalIndent(results, "", " ")
} else {
b, _ = json.Marshal(results)
}
w.Write(b)
}

// httpError writes an error to the client in a standard format.
func httpError(w http.ResponseWriter, error string, pretty bool, code int) {
w.Header().Add("content-type", "application/json")
Expand Down Expand Up @@ -676,6 +766,10 @@ func (w gzipResponseWriter) Write(b []byte) (int, error) {
return w.Writer.Write(b)
}

func (w gzipResponseWriter) Flush() {
w.Writer.(*gzip.Writer).Flush()
}

// determines if the client can accept compressed responses, and encodes accordingly
func gzipFilter(inner http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
63 changes: 63 additions & 0 deletions httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,10 +1615,73 @@ func TestSnapshotHandler(t *testing.T) {
}
}

// Ensure that the server will stream out results if a chunked response is requested
func TestHandler_ChunkedResponses(t *testing.T) {
c := test.NewDefaultMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
srvr.SetDefaultRetentionPolicy("foo", "bar")

s := NewHTTPServer(srvr)
defer s.Close()

status, errString := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [
{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 100}},
{"name": "cpu", "tags": {"host": "server02"},"timestamp": "2009-11-10T23:30:00Z", "fields": {"value": 25}}]}`)
if status != http.StatusOK {
t.Fatalf("unexpected status: %d - %s", status, errString)
}
time.Sleep(100 * time.Millisecond) // Ensure data node picks up write.

resp, err := chunkedQuery(s.URL, "foo", "select value from cpu")
if err != nil {
t.Fatalf("error making request: %s", err.Error())
}
defer resp.Body.Close()
for i := 0; i < 2; i++ {
chunk := make([]byte, 2048, 2048)
n, err := resp.Body.Read(chunk)
if err != nil {
t.Fatalf("error reading response: %s", err.Error())
}
results := &influxdb.Results{}
err = json.Unmarshal(chunk[0:n], results)
if err != nil {
t.Fatalf("error unmarshaling resultsz: %s", err.Error())
}
if len(results.Results) != 1 {
t.Fatalf("didn't get 1 result: %s\n", mustMarshalJSON(results))
}
if len(results.Results[0].Series) != 1 {
t.Fatalf("didn't get 1 series: %s\n", mustMarshalJSON(results))
}
var vals [][]interface{}
if i == 0 {
vals = [][]interface{}{{"2009-11-10T23:00:00Z", 100}}
} else {
vals = [][]interface{}{{"2009-11-10T23:30:00Z", 25}}
}
if mustMarshalJSON(vals) != mustMarshalJSON(results.Results[0].Series[0].Values) {
t.Fatalf("values weren't what was expected:\n exp: %s\n got: %s", mustMarshalJSON(vals), mustMarshalJSON(results.Results[0].Series[0].Values))
}
}
}

// batchWrite JSON Unmarshal tests

// Utility functions for this test suite.

func chunkedQuery(host, db, q string) (*http.Response, error) {
params := map[string]string{"db": db, "q": q, "chunked": "true", "chunk_size": "1"}
query := url.Values{}
for k, v := range params {
query.Set(k, v)
}
return http.Get(host + "/query?" + query.Encode())
}

func MustHTTP(verb, path string, params, headers map[string]string, body string) (int, string) {
req, err := http.NewRequest(verb, path, bytes.NewBuffer([]byte(body)))
if err != nil {
Expand Down
Loading