Skip to content

Commit

Permalink
Merge pull request #1580 from influxdb/support_all_value_types
Browse files Browse the repository at this point in the history
Support all value types
  • Loading branch information
otoolep committed Feb 17, 2015
2 parents 0ebd785 + e74fadf commit cd87583
Show file tree
Hide file tree
Showing 9 changed files with 366 additions and 152 deletions.
210 changes: 194 additions & 16 deletions database.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package influxdb

import (
"encoding/binary"
"encoding/json"
"fmt"
"math"
Expand All @@ -12,6 +13,10 @@ import (
"github.com/influxdb/influxdb/influxql"
)

const (
maxStringLength = 64 * 1024
)

// database is a collection of retention policies and shards. It also has methods
// for keeping an in memory index of all the measurements, series, and tags in the database.
// Methods on this struct aren't goroutine safe. They assume that the server is handling
Expand Down Expand Up @@ -219,22 +224,6 @@ func (m *Measurement) seriesByTags(tags map[string]string) *Series {
return m.series[string(marshalTags(tags))]
}

// mapValues converts a map of values with string keys to field id keys.
// Returns nil if any field doesn't exist.
func (m *Measurement) mapValues(values map[string]interface{}) map[uint8]interface{} {
other := make(map[uint8]interface{}, len(values))
for k, v := range values {
// TODO: Cast value to original field type.

f := m.FieldByName(k)
if f == nil {
panic(fmt.Sprintf("Field does not exist for %s", k))
}
other[f.ID] = v
}
return other
}

func (m *Measurement) seriesIDsAndFilters(stmt *influxql.SelectStatement) (seriesIDs, map[uint32]influxql.Expr) {
seriesIdsToExpr := make(map[uint32]influxql.Expr)
if stmt.Condition == nil {
Expand Down Expand Up @@ -635,6 +624,195 @@ type Field struct {
// Fields represents a list of fields.
type Fields []*Field

// FieldCodec providecs encoding and decoding functionality for the fields of a given
// Measurement. It is a distinct type to avoid locking writes on this node while
// potentially long-running queries are executing.
//
// It is not affected by changes to the Measurement object after codec creation.
type FieldCodec struct {
fieldsByID map[uint8]*Field
fieldsByName map[string]*Field
}

// NewFieldCodec returns a FieldCodec for the given Measurement. Must be called with
// a RLock that protects the Measurement.
func NewFieldCodec(m *Measurement) *FieldCodec {
fieldsByID := make(map[uint8]*Field, len(m.Fields))
fieldsByName := make(map[string]*Field, len(m.Fields))
for _, f := range m.Fields {
fieldsByID[f.ID] = f
fieldsByName[f.Name] = f
}
return &FieldCodec{fieldsByID: fieldsByID, fieldsByName: fieldsByName}
}

// EncodeFields converts a map of values with string keys to a byte slice of field
// IDs and values.
//
// If a field exists in the codec, but its type is different, an error is returned. If
// a field is not present in the codec, the system panics.
func (f *FieldCodec) EncodeFields(values map[string]interface{}) ([]byte, error) {
// Allocate byte slice and write field count.
b := make([]byte, 1, 10)
b[0] = byte(len(values))

for k, v := range values {
field := f.fieldsByName[k]
if field == nil {
panic(fmt.Sprintf("field does not exist for %s", k))
} else if influxql.InspectDataType(v) != field.Type {
return nil, fmt.Errorf("field %s is not of type %s", k, field.Type)
}

var buf []byte

switch field.Type {
case influxql.Number:
var value float64
// Convert integers to floats.
if intval, ok := v.(int); ok {
value = float64(intval)
} else {
value = v.(float64)
}

buf = make([]byte, 9)
binary.BigEndian.PutUint64(buf[1:9], math.Float64bits(value))
case influxql.Boolean:
value := v.(bool)

// Only 1 byte need for a boolean.
buf = make([]byte, 2)
if value {
buf[1] = byte(1)
}
case influxql.String:
value := v.(string)
if len(value) > maxStringLength {
value = value[:maxStringLength]
}
// Make a buffer for field ID (1 bytes), the string length (2 bytes), and the string.
buf = make([]byte, len(value)+3)

// Set the string length, then copy the string itself.
binary.BigEndian.PutUint16(buf[1:3], uint16(len(value)))
for i, c := range []byte(value) {
buf[i+3] = byte(c)
}
default:
panic(fmt.Sprintf("unsupported value type: %T", v))
}

// Always set the field ID as the leading byte.
buf[0] = field.ID

// Append temp buffer to the end.
b = append(b, buf...)
}

return b, nil
}

// DecodeByID scans a byte slice for a field with the given ID, converts it to its
// expected type, and return that value.
func (f *FieldCodec) DecodeByID(targetID uint8, b []byte) (interface{}, error) {
if len(b) == 0 {
return 0, ErrFieldNotFound
}

// Read the field count from the field byte.
n := int(b[0])
// Start from the second byte and iterate over until we're done decoding.
b = b[1:]
for i := 0; i < n; i++ {
field, ok := f.fieldsByID[b[0]]
if !ok {
panic(fmt.Sprintf("field ID %d has no mapping", b[0]))
}

var value interface{}
switch field.Type {
case influxql.Number:
// Move bytes forward.
value = math.Float64frombits(binary.BigEndian.Uint64(b[1:9]))
b = b[9:]
case influxql.Boolean:
if b[1] == 1 {
value = true
} else {
value = false
}
// Move bytes forward.
b = b[2:]
case influxql.String:
size := binary.BigEndian.Uint16(b[1:3])
value = string(b[3 : 3+size])
// Move bytes forward.
b = b[size+3:]
default:
panic(fmt.Sprintf("unsupported value type: %T", field.Type))
}

if field.ID == targetID {
return value, nil
}
}

return 0, ErrFieldNotFound
}

// DecodeFields decodes a byte slice into a set of field ids and values.
func (f *FieldCodec) DecodeFields(b []byte) map[uint8]interface{} {
if len(b) == 0 {
return nil
}

// Read the field count from the field byte.
n := int(b[0])

// Create a map to hold the decoded data.
values := make(map[uint8]interface{}, n)

// Start from the second byte and iterate over until we're done decoding.
b = b[1:]
for i := 0; i < n; i++ {
// First byte is the field identifier.
fieldID := b[0]
field := f.fieldsByID[fieldID]
if field == nil {
panic(fmt.Sprintf("field ID %d has no mapping", fieldID))
}

var value interface{}
switch field.Type {
case influxql.Number:
value = math.Float64frombits(binary.BigEndian.Uint64(b[1:9]))
// Move bytes forward.
b = b[9:]
case influxql.Boolean:
if b[1] == 1 {
value = true
} else {
value = false
}
// Move bytes forward.
b = b[2:]
case influxql.String:
size := binary.BigEndian.Uint16(b[1:3])
value = string(b[3:size])
// Move bytes forward.
b = b[size+3:]
default:
panic(fmt.Sprintf("unsupported value type: %T", f.fieldsByID[fieldID]))
}

values[fieldID] = value

}

return values
}

// Series belong to a Measurement and represent unique time series in a database
type Series struct {
ID uint32
Expand Down
84 changes: 83 additions & 1 deletion httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,89 @@ func TestHandler_serveWriteSeriesZeroTime(t *testing.T) {
}
}

func TestHandler_serveWriteSeriesInvalidField(t *testing.T) {
func TestHandler_serveWriteSeriesStringValues(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
srvr.SetDefaultRetentionPolicy("foo", "bar")

s := NewHTTPServer(srvr)
defer s.Close()

status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "logs", "tags": {"host": "server01"},"values": {"event": "disk full"}}]}`)
if status != http.StatusOK {
t.Fatalf("unexpected status: %d", status)
}
time.Sleep(100 * time.Millisecond) // Ensure data node picks up write.

query := map[string]string{"db": "foo", "q": "select event from logs"}
status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "")
if status != http.StatusOK {
t.Logf("query %s\n", query)
t.Log(body)
t.Errorf("unexpected status: %d", status)
}

r := &influxdb.Results{}
if err := json.Unmarshal([]byte(body), r); err != nil {
t.Logf("query : %s\n", query)
t.Log(body)
t.Error(err)
}
if len(r.Results) != 1 {
t.Fatalf("unexpected results count")
}
result := r.Results[0]
if len(result.Rows) != 1 {
t.Fatalf("unexpected row count, expected: %d, actual: %d", 1, len(result.Rows))
}
if result.Rows[0].Values[0][1] != "disk full" {
t.Fatalf("unexpected string value, actual: %s", result.Rows[0].Values[0][1])
}
}

func TestHandler_serveWriteSeriesBoolValues(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
srvr.SetDefaultRetentionPolicy("foo", "bar")

s := NewHTTPServer(srvr)
defer s.Close()

status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "disk", "tags": {"host": "server01"},"values": {"full": false}}]}`)
if status != http.StatusOK {
t.Fatalf("unexpected status: %d", status)
}
time.Sleep(100 * time.Millisecond) // Ensure data node picks up write.

query := map[string]string{"db": "foo", "q": "select full from disk"}
status, body := MustHTTP("GET", s.URL+`/query`, query, nil, "")
if status != http.StatusOK {
t.Logf("query %s\n", query)
t.Log(body)
t.Errorf("unexpected status: %d", status)
}

r := &influxdb.Results{}
if err := json.Unmarshal([]byte(body), r); err != nil {
t.Logf("query : %s\n", query)
t.Log(body)
t.Error(err)
}
if len(r.Results) != 1 {
t.Fatalf("unexpected results count")
}
result := r.Results[0]
if len(result.Rows) != 1 {
t.Fatalf("unexpected row count, expected: %d, actual: %d", 1, len(result.Rows))
}
if result.Rows[0].Values[0][1] != false {
t.Fatalf("unexpected string value, actual: %s", result.Rows[0].Values[0][1])
}
}

func TestHandler_serveWriteSeriesInvalidQueryField(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
Expand Down
3 changes: 3 additions & 0 deletions influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ var (
// ErrFieldTypeConflict is returned when a new field already exists with a different type.
ErrFieldTypeConflict = errors.New("field type conflict")

// ErrFieldNotFound
ErrFieldNotFound = errors.New("field not found")

// ErrSeriesNotFound is returned when looking up a non-existent series by database, name and tags
ErrSeriesNotFound = errors.New("series not found")

Expand Down
Loading

0 comments on commit cd87583

Please sign in to comment.