Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processing some queries in batches and new EXPLAIN command #318

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,6 @@ Makefile
# log file
influxdb.log
benchmark.log

# config file
config.toml
4 changes: 4 additions & 0 deletions config.toml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ lru-cache-size = "200m"
# limit the max number of open files. max-open-files is per shard so this * that will be max.
max-open-shards = 0

# The default setting is 100. This option tells how many points will be fetched from LevelDb before
# they get flushed into backend.
point-batch-size = 100

# These options specify how data is sharded across the cluster. There are two
# shard configurations that have the same knobs: short term and long term.
# Any series that begins with a capital letter like Exceptions will be written
Expand Down
33 changes: 29 additions & 4 deletions src/cluster/shard.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package cluster

import (
import (
"engine"
"errors"
"fmt"
Expand Down Expand Up @@ -30,7 +30,15 @@ type QueryProcessor interface {
// This method returns true if the query should continue. If the query should be stopped,
// like maybe the limit was hit, it should return false
YieldPoint(seriesName *string, columnNames []string, point *protocol.Point) bool

YieldSeries(seriesName *string, columnNames []string, seriesIncoming *protocol.Series) bool
Close()

// Set by the shard, so EXPLAIN query can know query against which shard is being measured
SetShardInfo(shardId int, shardLocal bool)

// Let QueryProcessor identify itself. What if it is a spy and we can't check that?
GetName() string
}

type NewShardData struct {
Expand Down Expand Up @@ -65,6 +73,7 @@ type ShardData struct {
durationIsSplit bool
shardDuration time.Duration
localServerId uint32
isRemoteQuery bool
}

func NewShard(id uint32, startTime, endTime time.Time, shardType ShardType, durationIsSplit bool, wal WAL) *ShardData {
Expand All @@ -79,6 +88,8 @@ func NewShard(id uint32, startTime, endTime time.Time, shardType ShardType, dura
shardType: shardType,
durationIsSplit: durationIsSplit,
shardDuration: endTime.Sub(startTime),
// by default, assume shard is being used locally, otherwise protobuf handler will set this flag to true if called remotely
isRemoteQuery: false,
}
}

Expand Down Expand Up @@ -149,6 +160,14 @@ func (self *ShardData) SetLocalStore(store LocalShardStore, localServerId uint32
return nil
}

func (self *ShardData) SetRemoteQuery(flag bool) {
self.isRemoteQuery = flag
}

func (self *ShardData) IsRemoteQuery() bool {
return self.isRemoteQuery
}

func (self *ShardData) IsLocal() bool {
return self.store != nil
}
Expand Down Expand Up @@ -202,14 +221,20 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *protoco
maxDeleteResults := 10000
processor = engine.NewPassthroughEngine(response, maxDeleteResults)
} else {
if self.ShouldAggregateLocally(querySpec) {
if self.ShouldAggregateLocally(querySpec) {
processor = engine.NewQueryEngine(querySpec.SelectQuery(), response)
} else {
if querySpec.IsExplainQuery() {
processor.SetShardInfo(int(self.Id()), !self.IsRemoteQuery())
}
} else {
maxPointsToBufferBeforeSending := 1000
processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending)
if querySpec.IsExplainQuery() {
processor.SetShardInfo(int(self.Id()), !self.IsRemoteQuery())
}
}
}
err := self.localShard.Query(querySpec, processor)
err := self.localShard.Query(querySpec, processor)
processor.Close()
return err
}
Expand Down
15 changes: 11 additions & 4 deletions src/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ type LoggingConfig struct {
}

type LevelDbConfiguration struct {
MaxOpenFiles int `toml:"max-open-files"`
LruCacheSize size `toml:"lru-cache-size"`
MaxOpenShards int `toml:"max-open-shards"`
MaxOpenFiles int `toml:"max-open-files"`
LruCacheSize size `toml:"lru-cache-size"`
MaxOpenShards int `toml:"max-open-shards"`
PointBatchSize int `toml:"point-batch-size"`
}

type ShardingDefinition struct {
Expand Down Expand Up @@ -187,6 +188,7 @@ type Configuration struct {
LevelDbMaxOpenFiles int
LevelDbLruCacheSize int
LevelDbMaxOpenShards int
LevelDbPointBatchSize int
ShortTermShard *ShardConfiguration
LongTermShard *ShardConfiguration
ReplicationFactor int
Expand Down Expand Up @@ -286,11 +288,16 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
config.LevelDbMaxOpenFiles = 100
}

// if it wasn't set, set it to 100
// if it wasn't set, set it to 200 MB
if config.LevelDbLruCacheSize == 0 {
config.LevelDbLruCacheSize = 200 * ONE_MEGABYTE
}

// if it wasn't set, set it to 100
if config.LevelDbPointBatchSize == 0 {
config.LevelDbPointBatchSize = 100
}

return config, nil
}

Expand Down
60 changes: 38 additions & 22 deletions src/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ var (

// shorter constants for readability
var (
dropDatabase = protocol.Request_DROP_DATABASE
queryRequest = protocol.Request_QUERY
endStreamResponse = protocol.Response_END_STREAM
queryResponse = protocol.Response_QUERY
heartbeatResponse = protocol.Response_HEARTBEAT
replayReplication = protocol.Request_REPLICATION_REPLAY
sequenceNumber = protocol.Request_SEQUENCE_NUMBER
dropDatabase = protocol.Request_DROP_DATABASE
queryRequest = protocol.Request_QUERY
endStreamResponse = protocol.Response_END_STREAM
queryResponse = protocol.Response_QUERY
heartbeatResponse = protocol.Response_HEARTBEAT
explainQueryResponse = protocol.Response_EXPLAIN_QUERY

replayReplication = protocol.Request_REPLICATION_REPLAY
sequenceNumber = protocol.Request_SEQUENCE_NUMBER

write = protocol.Request_WRITE
)
Expand Down Expand Up @@ -249,15 +251,18 @@ func (self *CoordinatorImpl) getShardsAndProcessor(querySpec *parser.QuerySpec,
}

go func() {
for {
res := <-responseChan
if *res.Type == endStreamResponse || *res.Type == accessDeniedResponse {
for {
response := <-responseChan

if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse {
writer.Close()
seriesClosed <- true
return
}
if res.Series != nil && len(res.Series.Points) > 0 {
writer.Write(res.Series)
if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) {
if response.Series != nil && len(response.Series.Points) > 0 {
writer.Write(response.Series)
}
}
}
}()
Expand All @@ -271,6 +276,7 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri
responses := make([]chan *protocol.Response, 0)
for _, shard := range shards {
responseChan := make(chan *protocol.Response, self.config.QueryShardBufferSize)
// We query shards for data and stream them to query processor
go shard.Query(querySpec, responseChan)
responses = append(responses, responseChan)
}
Expand All @@ -279,27 +285,37 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri
log.Debug("READING: shard: ", shards[i].String())
for {
response := <-responseChan
log.Debug("GOT RESPONSE: ", response.Type, response.Series)

//log.Debug("GOT RESPONSE: ", response.Type, response.Series)
log.Debug("GOT RESPONSE: ", response.Type)
if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse {
break
}

// if we don't have a processor, yield the point to the writer
if processor == nil {
log.Debug("WRITING: ", len(response.Series.Points))
seriesWriter.Write(response.Series)
log.Debug("WRITING (done)")
// this happens if shard took care of the query
// otherwise client will get points from passthrough engine
if processor == nil {
// If we have EXPLAIN query, we don't write actual points (of response.Type Query) to the client
if !(*response.Type == queryResponse && querySpec.IsExplainQuery()) {
seriesWriter.Write(response.Series)
}
continue
}

// if the data wasn't aggregated at the shard level, aggregate
// the data here
log.Debug("YIELDING: ", len(response.Series.Points))
if response.Series != nil {
for _, p := range response.Series.Points {
processor.YieldPoint(response.Series.Name, response.Series.Fields, p)
log.Debug("YIELDING: ", len(response.Series.Points))

// we need to forward message type to PasstrhoughEngine
// this is a bit dirty TODO: refactor it...
if querySpec.IsExplainQuery() && processor.GetName() == "PassthroughEngine" {

}
}

processor.YieldSeries(response.Series.Name, response.Series.Fields, response.Series)
}
}
log.Debug("DONE: shard: ", shards[i].String())
}
Expand Down
4 changes: 3 additions & 1 deletion src/coordinator/protobuf_request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn
// the query should always parse correctly since it was parsed at the originating server.
queries, err := parser.ParseQuery(*request.Query)
if err != nil || len(queries) < 1 {
log.Error("Erorr parsing query: ", err)
log.Error("Error parsing query: ", err)
errorMsg := fmt.Sprintf("Cannot find user %s", *request.UserName)
response := &protocol.Response{Type: &endStreamResponse, ErrorMessage: &errorMsg, RequestId: request.Id}
self.WriteResponse(conn, response)
Expand All @@ -80,6 +80,8 @@ func (self *ProtobufRequestHandler) handleQuery(request *protocol.Request, conn
}

shard := self.clusterConfig.GetLocalShardById(*request.ShardId)
shard.SetRemoteQuery(true)

querySpec := parser.NewQuerySpec(user, *request.Database, query)

responseChan := make(chan *protocol.Response)
Expand Down
2 changes: 1 addition & 1 deletion src/daemon/influxd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

const (
version = "dev"
gitSha = "HEAD"
gitSha = "58cdd6f2737d186bc586f0e60ddb2121a244968c"
)

func setupLogging(loggingLevel, logFile string) {
Expand Down
51 changes: 35 additions & 16 deletions src/datastore/leveldb_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ import (
)

type LevelDbShard struct {
db *levigo.DB
readOptions *levigo.ReadOptions
writeOptions *levigo.WriteOptions
lastIdUsed uint64
columnIdMutex sync.Mutex
db *levigo.DB
readOptions *levigo.ReadOptions
writeOptions *levigo.WriteOptions
lastIdUsed uint64
columnIdMutex sync.Mutex
pointBatchSize int
}

func NewLevelDbShard(db *levigo.DB) (*LevelDbShard, error) {
func NewLevelDbShard(db *levigo.DB, pointBatchSize int) (*LevelDbShard, error) {
ro := levigo.NewReadOptions()
lastIdBytes, err2 := db.Get(ro, NEXT_ID_KEY)
if err2 != nil {
Expand All @@ -43,10 +44,11 @@ func NewLevelDbShard(db *levigo.DB) (*LevelDbShard, error) {
}

return &LevelDbShard{
db: db,
writeOptions: levigo.NewWriteOptions(),
readOptions: ro,
lastIdUsed: lastId,
db: db,
writeOptions: levigo.NewWriteOptions(),
readOptions: ro,
lastIdUsed: lastId,
pointBatchSize: pointBatchSize,
}, nil
}

Expand Down Expand Up @@ -179,6 +181,8 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
}
}()

seriesOutgoing := &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)}

// TODO: clean up, this is super gnarly
// optimize for the case where we're pulling back only a single column or aggregate
for {
Expand Down Expand Up @@ -267,18 +271,33 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
}

shouldContinue := true
for _, alias := range aliases {
_alias := alias
if !processor.YieldPoint(&_alias, fieldNames, point) {
shouldContinue = false
}
}

seriesOutgoing.Points = append(seriesOutgoing.Points, point)

if len(seriesOutgoing.Points) >= self.pointBatchSize {
for _, alias := range aliases {
_alias := alias
if !processor.YieldSeries(&_alias, fieldNames, seriesOutgoing) {
shouldContinue = false
}
}
seriesOutgoing = &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)}
}

if !shouldContinue {
break
}
}

//Yield remaining data
for _, alias := range aliases {
_alias := alias
log.Debug("Final Flush %s", _alias)
if !processor.YieldSeries(&_alias, fieldNames, seriesOutgoing) {
log.Debug("Cancelled...")
}
}

return nil
}

Expand Down
4 changes: 3 additions & 1 deletion src/datastore/leveldb_shard_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type LevelDbShardDatastore struct {
levelDbOptions *levigo.Options
writeBuffer *cluster.WriteBuffer
maxOpenShards int
pointBatchSize int
}

const (
Expand Down Expand Up @@ -87,6 +88,7 @@ func NewLevelDbShardDatastore(config *configuration.Configuration) (*LevelDbShar
levelDbOptions: opts,
maxOpenShards: config.LevelDbMaxOpenShards,
lastAccess: make(map[uint32]int64),
pointBatchSize: config.LevelDbPointBatchSize,
}, nil
}

Expand Down Expand Up @@ -127,7 +129,7 @@ func (self *LevelDbShardDatastore) GetOrCreateShard(id uint32) (cluster.LocalSha
return nil, err
}

db, err = NewLevelDbShard(ldb)
db, err = NewLevelDbShard(ldb, self.pointBatchSize)
if err != nil {
return nil, err
}
Expand Down
Loading