diff --git a/.gitignore b/.gitignore index 4006c9d7c05..993cda6f82a 100644 --- a/.gitignore +++ b/.gitignore @@ -54,3 +54,6 @@ Makefile # log file influxdb.log benchmark.log + +# config file +config.toml diff --git a/config.toml.sample b/config.toml.sample index 4ebf8ff5820..19a62fc2de3 100644 --- a/config.toml.sample +++ b/config.toml.sample @@ -83,6 +83,10 @@ lru-cache-size = "200m" # limit the max number of open files. max-open-files is per shard so this * that will be max. max-open-shards = 0 +# The default setting is 100. This option tells how many points will be fetched from LevelDb before +# they get flushed into backend. +point-batch-size = 100 + # These options specify how data is sharded across the cluster. There are two # shard configurations that have the same knobs: short term and long term. # Any series that begins with a capital letter like Exceptions will be written diff --git a/src/cluster/shard.go b/src/cluster/shard.go index 3d7beaf6a03..e8355ffad55 100644 --- a/src/cluster/shard.go +++ b/src/cluster/shard.go @@ -6,7 +6,6 @@ import ( "engine" "fmt" "parser" - "protocol" p "protocol" "sort" "strings" @@ -32,6 +31,7 @@ type QueryProcessor interface { // This method returns true if the query should continue. If the query should be stopped, // like maybe the limit was hit, it should return false YieldPoint(seriesName *string, columnNames []string, point *p.Point) bool + YieldSeries(seriesName *string, columnNames []string, seriesIncoming *p.Series) bool Close() } @@ -213,18 +213,24 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo if self.ShouldAggregateLocally(querySpec) { processor, err = engine.NewQueryEngine(querySpec.SelectQuery(), response) if err != nil { - response <- &p.Response{Type: &endStreamResponse, ErrorMessage: protocol.String(err.Error())} + response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())} log.Error("Error while creating engine: %s", err) return } + if querySpec.IsExplainQuery() { + processor.SetShardInfo(int(self.Id()), self.IsLocal) + } } else { maxPointsToBufferBeforeSending := 1000 processor = engine.NewPassthroughEngine(response, maxPointsToBufferBeforeSending) + if querySpec.IsExplainQuery() { + processor.SetShardInfo(int(self.Id()), self.IsLocal) + } } } shard, err := self.store.GetOrCreateShard(self.id) if err != nil { - response <- &p.Response{Type: &endStreamResponse, ErrorMessage: protocol.String(err.Error())} + response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())} log.Error("Error while getting shards: %s", err) return } @@ -232,7 +238,7 @@ func (self *ShardData) Query(querySpec *parser.QuerySpec, response chan *p.Respo err = shard.Query(querySpec, processor) processor.Close() if err != nil { - response <- &p.Response{Type: &endStreamResponse, ErrorMessage: protocol.String(err.Error())} + response <- &p.Response{Type: &endStreamResponse, ErrorMessage: p.String(err.Error())} } response <- &p.Response{Type: &endStreamResponse} return diff --git a/src/configuration/configuration.go b/src/configuration/configuration.go index 43d18622af2..4181bf4ba12 100644 --- a/src/configuration/configuration.go +++ b/src/configuration/configuration.go @@ -87,9 +87,10 @@ type LoggingConfig struct { } type LevelDbConfiguration struct { - MaxOpenFiles int `toml:"max-open-files"` - LruCacheSize size `toml:"lru-cache-size"` - MaxOpenShards int `toml:"max-open-shards"` + MaxOpenFiles int `toml:"max-open-files"` + LruCacheSize size `toml:"lru-cache-size"` + MaxOpenShards int `toml:"max-open-shards"` + PointBatchSize int `toml:"point-batch-size"` } type ShardingDefinition struct { @@ -187,6 +188,7 @@ type Configuration struct { LevelDbMaxOpenFiles int LevelDbLruCacheSize int LevelDbMaxOpenShards int + LevelDbPointBatchSize int ShortTermShard *ShardConfiguration LongTermShard *ShardConfiguration ReplicationFactor int @@ -286,11 +288,16 @@ func parseTomlConfiguration(filename string) (*Configuration, error) { config.LevelDbMaxOpenFiles = 100 } - // if it wasn't set, set it to 100 + // if it wasn't set, set it to 200 MB if config.LevelDbLruCacheSize == 0 { config.LevelDbLruCacheSize = 200 * ONE_MEGABYTE } + // if it wasn't set, set it to 100 + if config.LevelDbPointBatchSize == 0 { + config.LevelDbPointBatchSize = 100 + } + return config, nil } diff --git a/src/coordinator/coordinator.go b/src/coordinator/coordinator.go index 49c9bd558bc..9cf224fb262 100644 --- a/src/coordinator/coordinator.go +++ b/src/coordinator/coordinator.go @@ -286,7 +286,9 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri log.Debug("READING: shard: ", shards[i].String()) for { response := <-responseChan - log.Debug("GOT RESPONSE: ", response.Type, response.Series) + + //log.Debug("GOT RESPONSE: ", response.Type, response.Series) + log.Debug("GOT RESPONSE: ", response.Type) if *response.Type == endStreamResponse || *response.Type == accessDeniedResponse { if response.ErrorMessage != nil && err == nil { err = common.NewQueryError(common.InvalidArgument, *response.ErrorMessage) @@ -304,12 +306,10 @@ func (self *CoordinatorImpl) runQuerySpec(querySpec *parser.QuerySpec, seriesWri // if the data wasn't aggregated at the shard level, aggregate // the data here - log.Debug("YIELDING: ", len(response.Series.Points)) if response.Series != nil { - for _, p := range response.Series.Points { - processor.YieldPoint(response.Series.Name, response.Series.Fields, p) - } - } + log.Debug("YIELDING: ", len(response.Series.Points)) + processor.YieldSeries(response.Series.Name, response.Series.Fields, response.Series) + } } log.Debug("DONE: shard: ", shards[i].String()) } diff --git a/src/datastore/leveldb_shard.go b/src/datastore/leveldb_shard.go index 5fe558359bb..ee1135c3ec8 100644 --- a/src/datastore/leveldb_shard.go +++ b/src/datastore/leveldb_shard.go @@ -20,15 +20,16 @@ import ( ) type LevelDbShard struct { - db *levigo.DB - readOptions *levigo.ReadOptions - writeOptions *levigo.WriteOptions - lastIdUsed uint64 - columnIdMutex sync.Mutex - closed bool + db *levigo.DB + readOptions *levigo.ReadOptions + writeOptions *levigo.WriteOptions + lastIdUsed uint64 + columnIdMutex sync.Mutex + closed bool + pointBatchSize int } -func NewLevelDbShard(db *levigo.DB) (*LevelDbShard, error) { +func NewLevelDbShard(db *levigo.DB, pointBatchSize int) (*LevelDbShard, error) { ro := levigo.NewReadOptions() lastIdBytes, err2 := db.Get(ro, NEXT_ID_KEY) if err2 != nil { @@ -44,10 +45,11 @@ func NewLevelDbShard(db *levigo.DB) (*LevelDbShard, error) { } return &LevelDbShard{ - db: db, - writeOptions: levigo.NewWriteOptions(), - readOptions: ro, - lastIdUsed: lastId, + db: db, + writeOptions: levigo.NewWriteOptions(), + readOptions: ro, + lastIdUsed: lastId, + pointBatchSize: pointBatchSize, }, nil } @@ -184,6 +186,8 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser } }() + seriesOutgoing := &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)} + // TODO: clean up, this is super gnarly // optimize for the case where we're pulling back only a single column or aggregate for { @@ -272,11 +276,17 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser } shouldContinue := true - for _, alias := range aliases { - _alias := alias - if !processor.YieldPoint(&_alias, fieldNames, point) { - shouldContinue = false + + seriesOutgoing.Points = append(seriesOutgoing.Points, point) + + if len(seriesOutgoing.Points) >= self.pointBatchSize { + for _, alias := range aliases { + _alias := alias + if !processor.YieldSeries(&_alias, fieldNames, seriesOutgoing) { + shouldContinue = false + } } + seriesOutgoing = &protocol.Series{Name: protocol.String(seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, self.pointBatchSize)} } if !shouldContinue { @@ -284,6 +294,15 @@ func (self *LevelDbShard) executeQueryForSeries(querySpec *parser.QuerySpec, ser } } + //Yield remaining data + for _, alias := range aliases { + _alias := alias + log.Debug("Final Flush %s", _alias) + if !processor.YieldSeries(&_alias, fieldNames, seriesOutgoing) { + log.Debug("Cancelled...") + } + } + return nil } diff --git a/src/datastore/leveldb_shard_datastore.go b/src/datastore/leveldb_shard_datastore.go index 4d39e588648..b45992dd5a4 100644 --- a/src/datastore/leveldb_shard_datastore.go +++ b/src/datastore/leveldb_shard_datastore.go @@ -26,6 +26,7 @@ type LevelDbShardDatastore struct { levelDbOptions *levigo.Options writeBuffer *cluster.WriteBuffer maxOpenShards int + pointBatchSize int } const ( @@ -92,6 +93,7 @@ func NewLevelDbShardDatastore(config *configuration.Configuration) (*LevelDbShar lastAccess: make(map[uint32]int64), shardRefCounts: make(map[uint32]int), shardsToClose: make(map[uint32]bool), + pointBatchSize: config.LevelDbPointBatchSize, }, nil } @@ -123,7 +125,7 @@ func (self *LevelDbShardDatastore) GetOrCreateShard(id uint32) (cluster.LocalSha return nil, err } - db, err = NewLevelDbShard(ldb) + db, err = NewLevelDbShard(ldb, self.pointBatchSize) if err != nil { return nil, err } diff --git a/src/engine/aggregator.go b/src/engine/aggregator.go index d46c3749778..45a00bdbf93 100644 --- a/src/engine/aggregator.go +++ b/src/engine/aggregator.go @@ -1,6 +1,6 @@ package engine -import ( +import ( "common" "fmt" "math" @@ -15,6 +15,7 @@ type PointSlice []protocol.Point type Aggregator interface { AggregatePoint(series string, group interface{}, p *protocol.Point) error + AggregateSeries(series string, group interface{}, s *protocol.Series) error InitializeFieldsMetadata(series *protocol.Series) error GetValues(series string, group interface{}) [][]*protocol.FieldValue ColumnNames() []string @@ -76,6 +77,10 @@ func (self *CompositeAggregator) AggregatePoint(series string, group interface{} return self.right.AggregatePoint(series, group, p) } +func (self *CompositeAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { + return self.right.AggregateSeries(series, group, s) +} + func (self *CompositeAggregator) ColumnNames() []string { return self.left.ColumnNames() } @@ -148,6 +153,14 @@ func (self *StandardDeviationAggregator) AggregatePoint(series string, group int return nil } +//TODO: to be optimized +func (self *StandardDeviationAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { + for _, p := range s.Points { + self.AggregatePoint(series, group, p) + } + return nil +} + func (self *StandardDeviationAggregator) ColumnNames() []string { if self.alias != "" { return []string{self.alias} @@ -267,6 +280,14 @@ func (self *DerivativeAggregator) AggregatePoint(series string, group interface{ return nil } +//TODO: to be optimized +func (self *DerivativeAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { + for _, p := range s.Points { + self.AggregatePoint(series, group, p) + } + return nil +} + func (self *DerivativeAggregator) ColumnNames() []string { if self.alias != "" { return []string{self.alias} @@ -363,6 +384,14 @@ func (self *HistogramAggregator) AggregatePoint(series string, group interface{} return nil } +//TODO: to be optimized +func (self *HistogramAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { + for _, p := range s.Points { + self.AggregatePoint(series, group, p) + } + return nil +} + func (self *HistogramAggregator) ColumnNames() []string { return self.columnNames } @@ -433,20 +462,30 @@ func NewHistogramAggregator(q *parser.SelectQuery, v *parser.Value, defaultValue type CountAggregator struct { defaultValue *protocol.FieldValue - counts map[string]map[interface{}]int32 + counts map[string]map[interface{}]int64 alias string } func (self *CountAggregator) AggregatePoint(series string, group interface{}, p *protocol.Point) error { counts := self.counts[series] if counts == nil { - counts = make(map[interface{}]int32) + counts = make(map[interface{}]int64) self.counts[series] = counts } counts[group]++ return nil } +func (self *CountAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { + counts := self.counts[series] + if counts == nil { + counts = make(map[interface{}]int64) + self.counts[series] = counts + } + counts[group] += int64(len(s.Points)) + return nil +} + func (self *CountAggregator) ColumnNames() []string { if self.alias != "" { return []string{self.alias} @@ -495,10 +534,10 @@ func NewCountAggregator(q *parser.SelectQuery, v *parser.Value, defaultValue *pa if err != nil { return nil, err } - return NewCompositeAggregator(&CountAggregator{wrappedDefaultValue, make(map[string]map[interface{}]int32), v.Alias}, inner) + return NewCompositeAggregator(&CountAggregator{wrappedDefaultValue, make(map[string]map[interface{}]int64), v.Alias}, inner) } - return &CountAggregator{wrappedDefaultValue, make(map[string]map[interface{}]int32), v.Alias}, nil + return &CountAggregator{wrappedDefaultValue, make(map[string]map[interface{}]int64), v.Alias}, nil } // @@ -515,7 +554,7 @@ func (self *TimestampAggregator) AggregatePoint(series string, group interface{} if timestamps == nil { timestamps = make(map[interface{}]int64) self.timestamps[series] = timestamps - } + } if self.duration != nil { timestamps[group] = *p.GetTimestampInMicroseconds() / *self.duration * *self.duration } else { @@ -524,6 +563,33 @@ func (self *TimestampAggregator) AggregatePoint(series string, group interface{} return nil } +func (self *TimestampAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { + timestamps := self.timestamps[series] + if timestamps == nil { + timestamps = make(map[interface{}]int64) + self.timestamps[series] = timestamps + } + if len(s.Points) > 0 { + if self.duration != nil { + timestamps[group] = *(s.Points[len(s.Points) - 1]).GetTimestampInMicroseconds() / *self.duration * *self.duration + } else { + timestamps[group] = *(s.Points[len(s.Points) - 1]).GetTimestampInMicroseconds() + } + } + return nil +} +/* +//TODO: to be optimized +func (self *TimestampAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { + //log.Error("Timestamp: ", len(s.Points)) + for _, p := range s.Points { + //log.Error("Point: ", p) + self.AggregatePoint(series, group, p) + } + return nil +} +*/ + func (self *TimestampAggregator) ColumnNames() []string { return []string{"count"} } @@ -548,9 +614,12 @@ func NewTimestampAggregator(query *parser.SelectQuery, _ *parser.Value) (Aggrega var durationPtr *int64 + //log.Error("Duration: ", duration) + if duration != nil { newDuration := int64(*duration / time.Microsecond) durationPtr = &newDuration + // log.Error("Woohoo! ", durationPtr) } return &TimestampAggregator{ @@ -605,6 +674,14 @@ func (self *MeanAggregator) AggregatePoint(series string, group interface{}, p * return nil } +//TODO: to be optimized +func (self *MeanAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { + for _, p := range s.Points { + self.AggregatePoint(series, group, p) + } + return nil +} + func (self *MeanAggregator) ColumnNames() []string { if self.alias != "" { return []string{self.alias} @@ -714,6 +791,14 @@ func (self *PercentileAggregator) AggregatePoint(series string, group interface{ return nil } +//TODO: to be optimized +func (self *PercentileAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { + for _, p := range s.Points { + self.AggregatePoint(series, group, p) + } + return nil +} + func (self *PercentileAggregator) ColumnNames() []string { if self.alias != "" { return []string{self.alias} @@ -816,6 +901,14 @@ func (self *ModeAggregator) AggregatePoint(series string, group interface{}, p * return nil } +//TODO: to be optimized +func (self *ModeAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { + for _, p := range s.Points { + self.AggregatePoint(series, group, p) + } + return nil +} + func (self *ModeAggregator) ColumnNames() []string { if self.alias != "" { return []string{self.alias} @@ -924,6 +1017,14 @@ func (self *DistinctAggregator) AggregatePoint(series string, group interface{}, return nil } +//TODO: to be optimized +func (self *DistinctAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { + for _, p := range s.Points { + self.AggregatePoint(series, group, p) + } + return nil +} + func (self *DistinctAggregator) ColumnNames() []string { if self.alias != "" { return []string{self.alias} @@ -1005,6 +1106,14 @@ func (self *CumulativeArithmeticAggregator) AggregatePoint(series string, group return nil } +//TODO: to be optimized +func (self *CumulativeArithmeticAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { + for _, p := range s.Points { + self.AggregatePoint(series, group, p) + } + return nil +} + func (self *CumulativeArithmeticAggregator) ColumnNames() []string { return []string{self.name} } @@ -1109,6 +1218,14 @@ func (self *FirstOrLastAggregator) AggregatePoint(series string, group interface return nil } +//TODO: to be optimized +func (self *FirstOrLastAggregator) AggregateSeries(series string, group interface{}, s *protocol.Series) error { + for _, p := range s.Points { + self.AggregatePoint(series, group, p) + } + return nil +} + func (self *FirstOrLastAggregator) ColumnNames() []string { return []string{self.name} } diff --git a/src/engine/engine.go b/src/engine/engine.go index f5d40f59789..60d4c6f68b2 100644 --- a/src/engine/engine.go +++ b/src/engine/engine.go @@ -32,7 +32,7 @@ type QueryEngine struct { } const ( - POINT_BATCH_SIZE = 100 + POINT_BATCH_SIZE = 64 ) var ( @@ -69,7 +69,7 @@ func NewQueryEngine(query *parser.SelectQuery, responseChan chan *protocol.Respo where: query.GetWhereCondition(), limiter: NewLimiter(limit), responseChan: responseChan, - seriesToPoints: make(map[string]*protocol.Series), + seriesToPoints: make(map[string]*protocol.Series), } yield := func(series *protocol.Series) error { @@ -110,6 +110,10 @@ func (self *QueryEngine) YieldPoint(seriesName *string, fieldNames []string, poi return shouldContinue } +func (self *QueryEngine) YieldSeries(seriesName *string, fieldNames []string, seriesIncoming *protocol.Series) (shouldContinue bool) { + return self.yieldSeriesData(seriesIncoming) +} + func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool { var err error if self.where != nil { @@ -132,9 +136,7 @@ func (self *QueryEngine) yieldSeriesData(series *protocol.Series) bool { self.limiter.calculateLimitAndSlicePoints(series) if len(series.Points) > 0 { - if err = self.yield(series); err != nil { - return false - } + err = self.yield(series) } } if err != nil { @@ -376,6 +378,8 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(query *parser.SelectQuery, return nil } + seriesGroups := make(map[Group]*protocol.Series) + var mapper Mapper mapper, err = createValuesToInterface(self.groupBy, series.Fields) if err != nil { @@ -389,29 +393,36 @@ func (self *QueryEngine) executeCountQueryWithGroupBy(query *parser.SelectQuery, } currentRange := self.pointsRange[*series.Name] + if currentRange == nil { + currentRange = &PointRange{*series.Points[0].Timestamp, *series.Points[0].Timestamp} + self.pointsRange[*series.Name] = currentRange + } for _, point := range series.Points { + currentRange.UpdateRange(point) value := mapper(point) + seriesGroup := seriesGroups[value] + if seriesGroup == nil { + seriesGroup = &protocol.Series{Name: series.Name, Fields: series.Fields, Points: make([]*protocol.Point, 0)} + seriesGroups[value] = seriesGroup + } + seriesGroup.Points = append(seriesGroup.Points, point) + } + + for value, seriesGroup := range seriesGroups { for _, aggregator := range self.aggregators { - err := aggregator.AggregatePoint(*series.Name, value, point) + err := aggregator.AggregateSeries(*series.Name, value, seriesGroup) if err != nil { return err } } + self.timestampAggregator.AggregateSeries(*series.Name, value, seriesGroup) - self.timestampAggregator.AggregatePoint(*series.Name, value, point) - seriesGroups := self.groups[*series.Name] - if seriesGroups == nil { - seriesGroups = make(map[Group]bool) - self.groups[*series.Name] = seriesGroups - } - seriesGroups[value] = true - - if currentRange == nil { - currentRange = &PointRange{*point.Timestamp, *point.Timestamp} - self.pointsRange[*series.Name] = currentRange - } else { - currentRange.UpdateRange(point) + _groups := self.groups[*seriesGroup.Name] + if _groups == nil { + _groups = make(map[Group]bool) + self.groups[*seriesGroup.Name] = _groups } + _groups[value] = true } return nil diff --git a/src/engine/limiter.go b/src/engine/limiter.go index 11f0fe83d7c..f96d48cdf1b 100644 --- a/src/engine/limiter.go +++ b/src/engine/limiter.go @@ -1,7 +1,7 @@ package engine import ( - "protocol" + "protocol" ) type Limiter struct { @@ -18,10 +18,10 @@ func NewLimiter(limit int) *Limiter { } } -func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) { +func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) { if self.shouldLimit { - // if the limit is 0, stop returning any points - limit := self.limitForSeries(*series.Name) + // if the limit is 0, stop returning any points + limit := self.limitForSeries(*series.Name) defer func() { self.limits[*series.Name] = limit }() if limit == 0 { series.Points = nil @@ -33,7 +33,7 @@ func (self *Limiter) calculateLimitAndSlicePoints(series *protocol.Series) { series.Points = series.Points[0:sliceTo] limit = 0 } - } + } } func (self *Limiter) hitLimit(seriesName string) bool { diff --git a/src/engine/list_series_engine.go b/src/engine/list_series_engine.go index 09449d25890..99b36afb6b6 100644 --- a/src/engine/list_series_engine.go +++ b/src/engine/list_series_engine.go @@ -42,6 +42,18 @@ func (self *ListSeriesEngine) YieldPoint(seriesName *string, columnNames []strin return true } +func (self *ListSeriesEngine) YieldSeries(seriesName *string, columnNames []string, seriesIncoming *protocol.Series) bool { + if len(self.response.MultiSeries) > MAX_SERIES_IN_RESPONSE { + self.responseChan <- self.response + self.response = &protocol.Response{ + Type: &queryResponse, + MultiSeries: make([]*protocol.Series, 0), + } + } + self.response.MultiSeries = append(self.response.MultiSeries, &protocol.Series{Name: seriesName}) + return true +} + func (self *ListSeriesEngine) Close() { if len(self.response.MultiSeries) > 0 { self.responseChan <- self.response diff --git a/src/engine/passthrough_engine.go b/src/engine/passthrough_engine.go index 85252beaa18..d45b9ef57c7 100644 --- a/src/engine/passthrough_engine.go +++ b/src/engine/passthrough_engine.go @@ -3,6 +3,7 @@ package engine // This engine buffers points and passes them through without modification. Works for queries // that can't be aggregated locally or queries that don't require it like deletes and drops. import ( + log "code.google.com/p/log4go" "protocol" ) @@ -55,6 +56,54 @@ func (self *PassthroughEngine) YieldPoint(seriesName *string, columnNames []stri return !self.limiter.hitLimit(*seriesName) } +func (self *PassthroughEngine) YieldSeries(seriesName *string, fieldNames []string, seriesIncoming *protocol.Series) bool { + log.Debug("PassthroughEngine YieldSeries %d", len(seriesIncoming.Points)) +/* + seriesCopy := &protocol.Series{Name: protocol.String(*seriesName), Fields: fieldNames, Points: make([]*protocol.Point, 0, POINT_BATCH_SIZE)} + for _, point := range seriesIncoming.Points { + seriesCopy.Points = append(seriesCopy.Points, point) + } +*/ + //log.Debug("PT Copied %d %d", len(seriesIncoming.Points), POINT_BATCH_SIZE) + self.limiter.calculateLimitAndSlicePoints(seriesIncoming) + if len(seriesIncoming.Points) == 0 { + log.Error("Not sent == 0") + return false + } + + //log.Debug("PassthroughEngine", seriesCopy) + /* + self.response = &protocol.Response{ + Type: &queryResponse, + Series: seriesIncoming, + } + self.responseChan <- self.response + */ + if self.response == nil { + self.response = &protocol.Response{ + Type: &queryResponse, + Series: seriesIncoming, + } + } else if *self.response.Series.Name != *seriesName { + self.responseChan <- self.response + self.response = &protocol.Response{ + Type: &queryResponse, + Series: seriesIncoming, + } + } else if len(self.response.Series.Points) > self.maxPointsInResponse { + self.responseChan <- self.response + self.response = &protocol.Response{ + Type: &queryResponse, + Series: seriesIncoming, + } + } else { + self.response.Series.Points = append(self.response.Series.Points, seriesIncoming.Points...) + } + return !self.limiter.hitLimit(*seriesName) + //return true +} + + func (self *PassthroughEngine) Close() { if self.response != nil && self.response.Series != nil && self.response.Series.Name != nil { self.responseChan <- self.response