Skip to content

Commit

Permalink
partial fix for issue #294 processing some queries in batches
Browse files Browse the repository at this point in the history
  • Loading branch information
elcct authored and jvshahid committed Mar 12, 2014
1 parent e798c98 commit dbe76df
Show file tree
Hide file tree
Showing 12 changed files with 290 additions and 60 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,6 @@ Makefile
# log file
influxdb.log
benchmark.log

# config file
config.toml
4 changes: 4 additions & 0 deletions config.toml.sample
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ lru-cache-size = "200m"
# limit the max number of open files. max-open-files is per shard so this * that will be max.
max-open-shards = 0

# The default setting is 100. This option tells how many points will be fetched from LevelDb before
# they get flushed into backend.
point-batch-size = 100

# These options specify how data is sharded across the cluster. There are two
# shard configurations that have the same knobs: short term and long term.
# Any series that begins with a capital letter like Exceptions will be written
Expand Down
14 changes: 10 additions & 4 deletions src/cluster/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"engine"
"fmt"
"parser"
"protocol"
p "protocol"
"sort"
"strings"
Expand All @@ -32,6 +31,7 @@ type QueryProcessor interface {
// This method returns true if the query should continue. If the query should be stopped,
// like maybe the limit was hit, it should return false
YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool
YieldSeries(seriesName *string, columnNames []string, seriesIncoming *p.Series) bool
Close()
}

Expand Down Expand Up @@ -213,26 +213,32 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo
if self.ShouldAggregateLocally(querySpec) {
processor, err = engine.NewQueryEngine(querySpec.SelectQuery(), response)
if err != nil {
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: protocol.String(err.Error())}
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())}
log.Error("Error while creating engine: %s", err)
return
}
if querySpec.IsExplainQuery() {
processor.SetShardInfo(int(self.Id()), self.IsLocal)
}
} else {
maxPointsToBufferBeforeSending := 1000
processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending)
if querySpec.IsExplainQuery() {
processor.SetShardInfo(int(self.Id()), self.IsLocal)
}
}
}
shard, err := self.store.GetOrCreateShard(self.id)
if err != nil {
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: protocol.String(err.Error())}
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())}
log.Error("Error while getting shards: %s", err)
return
}
defer self.store.ReturnShard(self.id)
err = shard.Query(querySpec, processor)
processor.Close()
if err != nil {
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: protocol.String(err.Error())}
response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())}
}
response <- &p.Response{Type: &endStreamResponse}
return
Expand Down
15 changes: 11 additions & 4 deletions src/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,10 @@ type LoggingConfig struct {
}

type LevelDbConfiguration struct {
MaxOpenFiles int `toml:"max-open-files"`
LruCacheSize size `toml:"lru-cache-size"`
MaxOpenShards int `toml:"max-open-shards"`
MaxOpenFiles int `toml:"max-open-files"`
LruCacheSize size `toml:"lru-cache-size"`
MaxOpenShards int `toml:"max-open-shards"`
PointBatchSize int `toml:"point-batch-size"`
}

type ShardingDefinition struct {
Expand Down Expand Up @@ -187,6 +188,7 @@ type Configuration struct {
LevelDbMaxOpenFiles int
LevelDbLruCacheSize int
LevelDbMaxOpenShards int
LevelDbPointBatchSize int
ShortTermShard *ShardConfiguration
LongTermShard *ShardConfiguration
ReplicationFactor int
Expand Down Expand Up @@ -286,11 +288,16 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
config.LevelDbMaxOpenFiles = 100
}

// if it wasn't set, set it to 100
// if it wasn't set, set it to 200 MB
if config.LevelDbLruCacheSize == 0 {
config.LevelDbLruCacheSize = 200 * ONE_MEGABYTE
}

// if it wasn't set, set it to 100
if config.LevelDbPointBatchSize == 0 {
config.LevelDbPointBatchSize = 100
}

return config, nil
}

Expand Down
12 changes: 6 additions & 6 deletions src/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri
log.Debug("READING: shard: ", shards[i].String())
for {
response := <-responseChan
log.Debug("GOT RESPONSE: ", response.Type, response.Series)

//log.Debug("GOT RESPONSE: ", response.Type, response.Series)
log.Debug("GOT RESPONSE: ", response.Type)
if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse {
if response.ErrorMessage != nil && err == nil {
err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage)
Expand All @@ -304,12 +306,10 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri

// if the data wasn't aggregated at the shard level, aggregate
// the data here
log.Debug("YIELDING: ", len(response.Series.Points))
if response.Series != nil {
for _, p := range response.Series.Points {
processor.YieldPoint(response.Series.Name, response.Series.Fields, p)
}
}
log.Debug("YIELDING: ", len(response.Series.Points))
processor.YieldSeries(response.Series.Name, response.Series.Fields, response.Series)
}
}
log.Debug("DONE: shard: ", shards[i].String())
}
Expand Down
49 changes: 34 additions & 15 deletions src/datastore/leveldb_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ import (
)

type LevelDbShard struct {
db *levigo.DB
readOptions *levigo.ReadOptions
writeOptions *levigo.WriteOptions
lastIdUsed uint64
columnIdMutex sync.Mutex
closed bool
db *levigo.DB
readOptions *levigo.ReadOptions
writeOptions *levigo.WriteOptions
lastIdUsed uint64
columnIdMutex sync.Mutex
closed bool
pointBatchSize int
}

func NewLevelDbShard(db *levigo.DB) (*LevelDbShard, error) {
func NewLevelDbShard(db *levigo.DB, pointBatchSize int) (*LevelDbShard, error) {
ro := levigo.NewReadOptions()
lastIdBytes, err2 := db.Get(ro, NEXT_ID_KEY)
if err2 != nil {
Expand All @@ -44,10 +45,11 @@ func NewLevelDbShard(db *levigo.DB) (*LevelDbShard, error) {
}

return &LevelDbShard{
db: db,
writeOptions: levigo.NewWriteOptions(),
readOptions: ro,
lastIdUsed: lastId,
db: db,
writeOptions: levigo.NewWriteOptions(),
readOptions: ro,
lastIdUsed: lastId,
pointBatchSize: pointBatchSize,
}, nil
}

Expand Down Expand Up @@ -184,6 +186,8 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
}
}()

seriesOutgoing := &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)}

// TODO: clean up, this is super gnarly
// optimize for the case where we're pulling back only a single column or aggregate
for {
Expand Down Expand Up @@ -272,18 +276,33 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser
}

shouldContinue := true
for _, alias := range aliases {
_alias := alias
if !processor.YieldPoint(&_alias, fieldNames, point) {
shouldContinue = false

seriesOutgoing.Points = append(seriesOutgoing.Points, point)

if len(seriesOutgoing.Points) >= self.pointBatchSize {
for _, alias := range aliases {
_alias := alias
if !processor.YieldSeries(&_alias, fieldNames, seriesOutgoing) {
shouldContinue = false
}
}
seriesOutgoing = &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)}
}

if !shouldContinue {
break
}
}

//Yield remaining data
for _, alias := range aliases {
_alias := alias
log.Debug("Final Flush %s", _alias)
if !processor.YieldSeries(&_alias, fieldNames, seriesOutgoing) {
log.Debug("Cancelled...")
}
}

return nil
}

Expand Down
4 changes: 3 additions & 1 deletion src/datastore/leveldb_shard_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type LevelDbShardDatastore struct {
levelDbOptions *levigo.Options
writeBuffer *cluster.WriteBuffer
maxOpenShards int
pointBatchSize int
}

const (
Expand Down Expand Up @@ -92,6 +93,7 @@ func NewLevelDbShardDatastore(config *configuration.Configuration) (*LevelDbShar
lastAccess: make(map[uint32]int64),
shardRefCounts: make(map[uint32]int),
shardsToClose: make(map[uint32]bool),
pointBatchSize: config.LevelDbPointBatchSize,
}, nil
}

Expand Down Expand Up @@ -123,7 +125,7 @@ func (self *LevelDbShardDatastore) GetOrCreateShard(id uint32) (cluster.LocalSha
return nil, err
}

db, err = NewLevelDbShard(ldb)
db, err = NewLevelDbShard(ldb, self.pointBatchSize)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit dbe76df

Please sign in to comment.