Skip to content

Commit

Permalink
wire up INTO queries.
Browse files Browse the repository at this point in the history
Since INTO queries need to have absolute information about the database
to work, we need to create a loopback interface back to the cluster
in order to perform them.
  • Loading branch information
Daniel Morsing committed Oct 12, 2015
1 parent f1e0c59 commit 3f77f68
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 31 deletions.
12 changes: 12 additions & 0 deletions cluster/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,18 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
return mapping, nil
}

// WritePointsInto is a copy of WritePoints that uses a tsdb structure instead of
// a cluster structure for information. This is to avoid a circular dependency
func (w *PointsWriter) WritePointsInto(p *tsdb.IntoWriteRequest) error {
req := WritePointsRequest{
Database: p.Database,
RetentionPolicy: p.RetentionPolicy,
ConsistencyLevel: ConsistencyLevelAny,
Points: p.Points,
}
return w.WritePoints(&req)
}

// WritePoints writes across multiple local and remote data nodes according the consistency level.
func (w *PointsWriter) WritePoints(p *WritePointsRequest) error {
w.statMap.Add(statWriteReq, 1)
Expand Down
3 changes: 3 additions & 0 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ func NewServer(c *Config, buildInfo *BuildInfo) (*Server, error) {
s.PointsWriter.ShardWriter = s.ShardWriter
s.PointsWriter.HintedHandoff = s.HintedHandoff

// needed for executing into queries.
s.QueryExecutor.IntoWriter = s.PointsWriter

// Initialize the monitor
s.Monitor.Version = s.buildInfo.Version
s.Monitor.Commit = s.buildInfo.Commit
Expand Down
54 changes: 54 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4956,3 +4956,57 @@ func TestServer_Query_FieldWithMultiplePeriodsMeasurementPrefixMatch(t *testing.
}
}
}

func TestServer_Query_IntoTarget(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig(), "")
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicyInfo("rp0", 1, 0)); err != nil {
t.Fatal(err)
}
if err := s.MetaStore.SetDefaultRetentionPolicy("db0", "rp0"); err != nil {
t.Fatal(err)
}

writes := []string{
fmt.Sprintf(`foo value=1 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:00Z").UnixNano()),
fmt.Sprintf(`foo value=2 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:10Z").UnixNano()),
fmt.Sprintf(`foo value=3 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:20Z").UnixNano()),
fmt.Sprintf(`foo value=4 %d`, mustParseTime(time.RFC3339Nano, "2000-01-01T00:00:30Z").UnixNano()),
}

test := NewTest("db0", "rp0")
test.write = strings.Join(writes, "\n")

test.addQueries([]*Query{
&Query{
name: "into",
params: url.Values{"db": []string{"db0"}},
command: `SELECT value AS something INTO baz FROM foo`,
exp: `{"results":[{"series":[{"name":"result","columns":["time","written"],"values":[["1970-01-01T00:00:00Z",4]]}]}]}`,
},
&Query{
name: "confirm results",
params: url.Values{"db": []string{"db0"}},
command: `SELECT something FROM baz`,
exp: `{"results":[{"series":[{"name":"baz","columns":["time","something"],"values":[["2000-01-01T00:00:00Z",1],["2000-01-01T00:00:10Z",2],["2000-01-01T00:00:20Z",3],["2000-01-01T00:00:30Z",4]]}]}]}`,
},
}...)

if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}
53 changes: 53 additions & 0 deletions tsdb/into.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package tsdb

import (
"errors"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/models"
"time"
)

// convertRowToPoints will convert a query result Row into Points that can be written back in.
// Used for INTO queries
func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point, error) {
// figure out which parts of the result are the time and which are the fields
timeIndex := -1
fieldIndexes := make(map[string]int)
for i, c := range row.Columns {
if c == "time" {
timeIndex = i
} else {
fieldIndexes[c] = i
}
}

if timeIndex == -1 {
return nil, errors.New("error finding time index in result")
}

points := make([]models.Point, 0, len(row.Values))
for _, v := range row.Values {
vals := make(map[string]interface{})
for fieldName, fieldIndex := range fieldIndexes {
vals[fieldName] = v[fieldIndex]
}

p := models.NewPoint(measurementName, row.Tags, vals, v[timeIndex].(time.Time))

points = append(points, p)
}

return points, nil
}

func intoDB(stmt *influxql.SelectStatement) (string, error) {
if stmt.Target.Measurement.Database != "" {
return stmt.Target.Measurement.Database, nil
}
return "", errNoDatabaseInTarget
}

var errNoDatabaseInTarget = errors.New("no database in target")

func intoRP(stmt *influxql.SelectStatement) string { return stmt.Target.Measurement.RetentionPolicy }
func intoMeasurement(stmt *influxql.SelectStatement) string { return stmt.Target.Measurement.Name }
107 changes: 76 additions & 31 deletions tsdb/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,24 @@ type QueryExecutor struct {
CreateMapper(shard meta.ShardInfo, stmt influxql.Statement, chunkSize int) (Mapper, error)
}

IntoWriter interface {
WritePointsInto(p *IntoWriteRequest) error
}

Logger *log.Logger
QueryLogEnabled bool

// the local data store
Store *Store
}

// partial copy of cluster.WriteRequest
type IntoWriteRequest struct {
Database string
RetentionPolicy string
Points []models.Point
}

// NewQueryExecutor returns an initialized QueryExecutor
func NewQueryExecutor(store *Store) *QueryExecutor {
return &QueryExecutor{
Expand Down Expand Up @@ -275,34 +286,6 @@ func (q *QueryExecutor) PlanSelect(stmt *influxql.SelectStatement, chunkSize int
return executor, nil
}

// executeSelectStatement plans and executes a select statement against a database.
func (q *QueryExecutor) executeSelectStatement(statementID int, stmt *influxql.SelectStatement, results chan *influxql.Result, chunkSize int) error {
// Plan statement execution.
e, err := q.PlanSelect(stmt, chunkSize)
if err != nil {
return err
}

// Execute plan.
ch := e.Execute()

// Stream results from the channel. We should send an empty result if nothing comes through.
resultSent := false
for row := range ch {
if row.Err != nil {
return row.Err
}
resultSent = true
results <- &influxql.Result{StatementID: statementID, Series: []*models.Row{row}}
}

if !resultSent {
results <- &influxql.Result{StatementID: statementID, Series: make([]*models.Row, 0)}
}

return nil
}

// expandSources expands regex sources and removes duplicates.
// NOTE: sources must be normalized (db and rp set) before calling this function.
func (q *QueryExecutor) expandSources(sources influxql.Sources) (influxql.Sources, error) {
Expand Down Expand Up @@ -697,15 +680,47 @@ func (q *QueryExecutor) executeStatement(statementID int, stmt influxql.Statemen

// Execute plan.
ch := e.Execute()

var writeerr error
var intoNum int64
var isinto bool
// Stream results from the channel. We should send an empty result if nothing comes through.
resultSent := false
for row := range ch {
// We had a write error. Continue draining results from the channel
// so we don't hang the goroutine in the executor.
if writeerr != nil {
continue
}
if row.Err != nil {
return row.Err
}
resultSent = true
results <- &influxql.Result{StatementID: statementID, Series: []*models.Row{row}}
selectstmt, ok := stmt.(*influxql.SelectStatement)
if ok && selectstmt.Target != nil {
isinto = true
// this is a into query. Write results back to database
writeerr = q.writeInto(row, selectstmt)
intoNum += int64(len(row.Values))
} else {
resultSent = true
results <- &influxql.Result{StatementID: statementID, Series: []*models.Row{row}}
}
}
if writeerr != nil {
return writeerr
} else if isinto {
results <- &influxql.Result{
StatementID: statementID,
Series: []*models.Row{{
Name: "result",
// it seems weird to give a time here, but so much stuff breaks if you don't
Columns: []string{"time", "written"},
Values: [][]interface{}{{
time.Unix(0, 0).UTC(),
intoNum,
}},
}},
}
return nil
}

if !resultSent {
Expand All @@ -715,6 +730,36 @@ func (q *QueryExecutor) executeStatement(statementID int, stmt influxql.Statemen
return nil
}

func (q *QueryExecutor) writeInto(row *models.Row, selectstmt *influxql.SelectStatement) error {
// It might seem a bit weird that this is where we do this, since we will have to
// convert rows back to points. The Executors (both aggregate and raw) are complex
// enough that changing them to write back to the DB is going to be clumsy
//
// it might seem weird to have the write be in the QueryExecutor, but the interweaving of
// limitedRowWriter and ExecuteAggregate/Raw makes it ridiculously hard to make sure that the
// results will be the same as when queried normally.
measurement := intoMeasurement(selectstmt)
intodb, err := intoDB(selectstmt)
if err != nil {
return err
}
rp := intoRP(selectstmt)
points, err := convertRowToPoints(measurement, row)
if err != nil {
return err
}
req := &IntoWriteRequest{
Database: intodb,
RetentionPolicy: rp,
Points: points,
}
err = q.IntoWriter.WritePointsInto(req)
if err != nil {
return err
}
return nil
}

func (q *QueryExecutor) executeShowTagValuesStatement(stmt *influxql.ShowTagValuesStatement, database string) *influxql.Result {
// Check for time in WHERE clause (not supported).
if influxql.HasTimeExpr(stmt.Condition) {
Expand Down

0 comments on commit 3f77f68

Please sign in to comment.