Skip to content

Commit

Permalink
Update server and handler to work with streamed responses
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldix authored and otoolep committed Apr 2, 2015
1 parent 20c55bc commit 332c427
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 161 deletions.
138 changes: 103 additions & 35 deletions httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (
"github.com/influxdb/influxdb/uuid"
)

const (
DefaultChunkSize = 10000
)

// TODO: Standard response headers (see: HeaderHandler)
// TODO: Compression (see: CompressionHeaderHandler)

Expand Down Expand Up @@ -172,12 +176,91 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ
httpError(w, "error parsing query: "+err.Error(), pretty, http.StatusBadRequest)
return
}

// Execute query. One result will return for each statement.
results := h.server.ExecuteQuery(query, db, user)
chunked := q.Get("chunked") == "true"
chunkSize := influxdb.NoChunkingSize
if chunked {
cs, err := strconv.ParseInt(q.Get("chunk_size"), 10, 64)
if err != nil {
chunkSize = DefaultChunkSize
} else {
chunkSize = int(cs)
}
}

// Send results to client.
httpResults(w, results, pretty)
w.Header().Add("content-type", "application/json")
results, err := h.server.ExecuteQuery(query, db, user, chunkSize)
if err != nil {
if isAuthorizationError(err) {
w.WriteHeader(http.StatusUnauthorized)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
return
}

res := influxdb.Results{Results: make([]*influxdb.Result, 0)}

statusWritten := false

for r := range results {
// write the status header based on the first result returned in the channel
if !statusWritten {
if r != nil && r.Err != nil {
if isAuthorizationError(r.Err) {
w.WriteHeader(http.StatusUnauthorized)
} else if isMeasurementNotFoundError(r.Err) {
w.WriteHeader(http.StatusOK)
} else if isFieldNotFoundError(r.Err) {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
} else {
w.WriteHeader(http.StatusOK)
}
statusWritten = true
}

if r == nil {
continue
}

// if chunked we write out this result and flush
if chunked {
w.Write(marshalPretty(r, pretty))
continue
//w.(http.Flusher).Flush()
}

// it's not chunked so buffer results in memory.
// results for statements need to be combined together. We need to check if this new result is
// for the same statement as the last result, or for the next statement
l := len(res.Results)
if l == 0 {
res.Results = append(res.Results, r)
} else if res.Results[l-1].StatementID == r.StatementID {
cr := res.Results[l-1]
cr.Series = append(cr.Series, r.Series...)
} else {
res.Results = append(res.Results, r)
}
}

// if it's not chunked we buffered everything in memory, so write it out
if !chunked {
w.Write(marshalPretty(res, pretty))
}
}

func marshalPretty(r interface{}, pretty bool) []byte {
var b []byte
if pretty {
b, _ = json.MarshalIndent(r, "", " ")
} else {
b, _ = json.Marshal(r)
}
return b
}

func interfaceToString(v interface{}) string {
Expand Down Expand Up @@ -211,9 +294,14 @@ type Batch struct {
// Return all the measurements from the given DB
func (h *Handler) showMeasurements(db string, user *influxdb.User) ([]string, error) {
var measurements []string
results := h.server.ExecuteQuery(&influxql.Query{Statements: []influxql.Statement{&influxql.ShowMeasurementsStatement{}}}, db, user)
if results.Err != nil {
return measurements, results.Err
c, err := h.server.ExecuteQuery(&influxql.Query{Statements: []influxql.Statement{&influxql.ShowMeasurementsStatement{}}}, db, user, 0)
if err != nil {
return measurements, err
}
results := influxdb.Results{}

for r := range c {
results.Results = append(results.Results, r)
}

for _, result := range results.Results {
Expand Down Expand Up @@ -263,9 +351,14 @@ func (h *Handler) serveDump(w http.ResponseWriter, r *http.Request, user *influx
httpError(w, "error with dump: "+err.Error(), pretty, http.StatusInternalServerError)
return
}
//
results := h.server.ExecuteQuery(query, db, user)
for _, result := range results.Results {

res, err := h.server.ExecuteQuery(query, db, user, DefaultChunkSize)
if err != nil {
w.Write([]byte("*** SERVER-SIDE ERROR. MISSING DATA ***"))
w.Write(delim)
return
}
for result := range res {
for _, row := range result.Series {
points := make([]Point, 1)
var point Point
Expand Down Expand Up @@ -571,31 +664,6 @@ func isFieldNotFoundError(err error) bool {
return (strings.HasPrefix(err.Error(), "field not found"))
}

// httpResult writes a Results array to the client.
func httpResults(w http.ResponseWriter, results influxdb.Results, pretty bool) {
w.Header().Add("content-type", "application/json")

if results.Error() != nil {
if isAuthorizationError(results.Error()) {
w.WriteHeader(http.StatusUnauthorized)
} else if isMeasurementNotFoundError(results.Error()) {
w.WriteHeader(http.StatusOK)
} else if isFieldNotFoundError(results.Error()) {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
}

var b []byte
if pretty {
b, _ = json.MarshalIndent(results, "", " ")
} else {
b, _ = json.Marshal(results)
}
w.Write(b)
}

// httpError writes an error to the client in a standard format.
func httpError(w http.ResponseWriter, error string, pretty bool, code int) {
w.Header().Add("content-type", "application/json")
Expand Down
64 changes: 64 additions & 0 deletions httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"net/http/httptest"
"net/url"
"os"
"reflect"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -1615,10 +1616,73 @@ func TestSnapshotHandler(t *testing.T) {
}
}

// Ensure that the server will stream out results if a chunked response is requested
func TestHandler_ChunkedResponses(t *testing.T) {
c := test.NewMessagingClient()
defer c.Close()
srvr := OpenAuthlessServer(c)
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
srvr.SetDefaultRetentionPolicy("foo", "bar")

s := NewHTTPServer(srvr)
defer s.Close()

status, errString := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [
{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z", "fields": {"value": 100}},
{"name": "cpu", "tags": {"host": "server02"},"timestamp": "2009-11-10T23:30:00Z", "fields": {"value": 25}}]}`)
if status != http.StatusOK {
t.Fatalf("unexpected status: %d - %s", status, errString)
}
time.Sleep(100 * time.Millisecond) // Ensure data node picks up write.

resp, err := chunkedQuery(s.URL, "foo", "select value from cpu")
if err != nil {
t.Fatalf("error making request: %s", err.Error())
}
defer resp.Body.Close()
for i := 0; i < 2; i++ {
chunk := make([]byte, 2048, 2048)
n, err := resp.Body.Read(chunk)
if err != nil {
t.Fatalf("error reading response: %s", err.Error())
}
results := &influxdb.Results{}
err = json.Unmarshal(chunk[0:n], results)
if err != nil {
t.Fatalf("error unmarshaling resultsz: %s", err.Error())
}
if len(results.Results) != 1 {
t.Fatalf("didn't get 1 result: %s\n", mustMarshalJSON(results))
}
if len(results.Results[0].Series) != 1 {
t.Fatalf("didn't get 1 series: %s\n", mustMarshalJSON(results))
}
var vals [][]interface{}
if i == 0 {
vals = [][]interface{}{{"2009-11-10T23:00:00Z", 100}}
} else {
vals = [][]interface{}{{"2009-11-10T23:30:00Z", 25}}
}
if !reflect.DeepEqual(results.Results[0].Series[0].Values, vals) {
t.Fatalf("values weren't what was expected:\n exp: %s\n got: %s", mustMarshalJSON(vals), mustMarshalJSON(results.Results[0].Series[0].Values))
}
}
}

// batchWrite JSON Unmarshal tests

// Utility functions for this test suite.

func chunkedQuery(host, db, q string) (*http.Response, error) {
params := map[string]string{"db": db, "q": q, "chunked": "true", "chunk_size": "1"}
query := url.Values{}
for k, v := range params {
query.Set(k, v)
}
return http.Get(host + "/query?" + query.Encode())
}

func MustHTTP(verb, path string, params, headers map[string]string, body string) (int, string) {
req, err := http.NewRequest(verb, path, bytes.NewBuffer([]byte(body)))
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion influxql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ func NewPlanner(db DB) *Planner {
}

// Plan creates an execution plan for the given SelectStatement and returns an Executor.
func (p *Planner) Plan(stmt *SelectStatement) (*Executor, error) {
func (p *Planner) Plan(stmt *SelectStatement, chunkSize int) (*Executor, error) {
now := p.Now().UTC()

// Replace instances of "now()" with the current time.
Expand Down
Loading

0 comments on commit 332c427

Please sign in to comment.