diff --git a/CHANGELOG.md b/CHANGELOG.md index 45b28d1f294..ce67c3c346c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Bugfixes - [#2374](https://github.com/influxdb/influxdb/issues/2374): Two different panics during SELECT percentile - [#2404](https://github.com/influxdb/influxdb/pull/2404): Mean and percentile function fixes +- [#2408](https://github.com/influxdb/influxdb/pull/2408): Fix snapshot 500 error ## v0.9.0-rc27 [04-23-2015] diff --git a/internal_test.go b/internal_test.go index 9fe27a5a4c1..1f837299270 100644 --- a/internal_test.go +++ b/internal_test.go @@ -271,9 +271,8 @@ func Test_seriesIDs_reject(t *testing.T) { // Test shard group selection. func TestShardGroup_Contains(t *testing.T) { // Make a shard group 1 hour in duration - g := newShardGroup() - g.StartTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z") - g.EndTime = g.StartTime.Add(time.Hour) + tm, _ := time.Parse(time.RFC3339, "2000-01-01T00:00:00Z") + g := newShardGroup(tm, time.Hour) if !g.Contains(g.StartTime.Add(-time.Minute), g.EndTime) { t.Fatal("shard group not selected when min before start time") diff --git a/server.go b/server.go index 076e4cb147d..0654468c442 100644 --- a/server.go +++ b/server.go @@ -1128,7 +1128,7 @@ func (s *Server) CreateShardGroupIfNotExists(database, policy string, timestamp return err } -func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err error) { +func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) error { var c createShardGroupIfNotExistsCommand mustUnmarshalJSON(m.Data, &c) @@ -1149,11 +1149,6 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err return nil } - // If no shards match then create a new one. - g := newShardGroup() - g.StartTime = c.Timestamp.Truncate(rp.ShardGroupDuration).UTC() - g.EndTime = g.StartTime.Add(rp.ShardGroupDuration).UTC() - // Sort nodes so they're consistently assigned to the shards. nodes := make([]*DataNode, 0, len(s.dataNodes)) for _, n := range s.dataNodes { @@ -1174,44 +1169,14 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err // replicated the correct number of times. shardN := len(nodes) / replicaN - // Create a shard based on the node count and replication factor. - g.Shards = make([]*Shard, shardN) - for i := range g.Shards { - g.Shards[i] = newShard() - } - - // Persist to metastore if a shard was created. - if err = s.meta.mustUpdate(m.Index, func(tx *metatx) error { - // Generate an ID for the group. - g.ID = tx.nextShardGroupID() - - // Generate an ID for each shard. - for _, sh := range g.Shards { - sh.ID = tx.nextShardID() - sh.stats = NewStats(fmt.Sprintf("shard %d", sh.ID)) - sh.stats.Inc("create") - } - - // Assign data nodes to shards via round robin. - // Start from a repeatably "random" place in the node list. - nodeIndex := int(m.Index % uint64(len(nodes))) - for _, sh := range g.Shards { - for i := 0; i < replicaN; i++ { - node := nodes[nodeIndex%len(nodes)] - sh.DataNodeIDs = append(sh.DataNodeIDs, node.ID) - nodeIndex++ - } - } - s.stats.Add("shardsCreated", int64(len(g.Shards))) - - // Retention policy has a new shard group, so update the policy. - rp.shardGroups = append(rp.shardGroups, g) + g := newShardGroup(c.Timestamp, rp.ShardGroupDuration) - return tx.saveDatabase(db) - }); err != nil { - g.close() - return + // Create and intialize shards based on the node count and replication factor. + if err := g.initialize(m.Index, shardN, replicaN, db, rp, nodes, s.meta); err != nil { + g.close(s.id) + return err } + s.stats.Add("shardsCreated", int64(shardN)) // Open shards assigned to this server. for _, sh := range g.Shards { @@ -1231,7 +1196,7 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err s.shards[sh.ID] = sh } - return + return nil } // DeleteShardGroup deletes the shard group identified by shardID. @@ -1265,19 +1230,11 @@ func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) { return nil } - for _, shard := range g.Shards { - // Ignore shards not on this server. - if !shard.HasDataNodeID(s.id) { - continue - } - - path := shard.store.Path() - shard.close() - if err := os.Remove(path); err != nil { - // Log, but keep going. This can happen if shards were deleted, but the server exited - // before it acknowledged the delete command. - log.Printf("error deleting shard %s, group ID %d, policy %s: %s", path, g.ID, rp.Name, err.Error()) - } + // close the shard group + if err := g.close(s.id); err != nil { + // Log, but keep going. This can happen if shards were deleted, but the server exited + // before it acknowledged the delete command. + log.Printf("error deleting shard: policy %s, group ID %d, %s", rp.Name, g.ID, err) } // Remove from metastore. @@ -1286,6 +1243,12 @@ func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) { s.stats.Add("shardsDeleted", int64(len(g.Shards))) return tx.saveDatabase(db) }) + + // remove from lookups. + for _, sh := range g.Shards { + delete(s.shards, sh.ID) + } + return } diff --git a/shard.go b/shard.go index e5749ba61b4..536db95bb17 100644 --- a/shard.go +++ b/shard.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "fmt" + "os" "sync" "time" @@ -20,33 +21,79 @@ type ShardGroup struct { } // newShardGroup returns a new initialized ShardGroup instance. -func newShardGroup() *ShardGroup { return &ShardGroup{} } +func newShardGroup(t time.Time, d time.Duration) *ShardGroup { + sg := ShardGroup{} + sg.StartTime = t.Truncate(d).UTC() + sg.EndTime = sg.StartTime.Add(d).UTC() -// close closes all shards. -func (g *ShardGroup) close() { - for _, sh := range g.Shards { - _ = sh.close() + return &sg +} + +func (sg *ShardGroup) initialize(index uint64, shardN, replicaN int, db *database, rp *RetentionPolicy, nodes []*DataNode, meta *metastore) error { + sg.Shards = make([]*Shard, shardN) + + // Persist to metastore if a shard was created. + return meta.mustUpdate(index, func(tx *metatx) error { + // Generate an ID for the group. + sg.ID = tx.nextShardGroupID() + + // Generate an ID for each shard. + for i := range sg.Shards { + sg.Shards[i] = newShard(tx.nextShardID()) + } + + // Assign data nodes to shards via round robin. + // Start from a repeatably "random" place in the node list. + nodeIndex := int(index % uint64(len(nodes))) + for _, sh := range sg.Shards { + for i := 0; i < replicaN; i++ { + node := nodes[nodeIndex%len(nodes)] + sh.DataNodeIDs = append(sh.DataNodeIDs, node.ID) + nodeIndex++ + } + } + + // Retention policy has a new shard group, so update the policy. + rp.shardGroups = append(rp.shardGroups, sg) + + return tx.saveDatabase(db) + }) +} + +func (sg *ShardGroup) close(id uint64) error { + for _, shard := range sg.Shards { + // Ignore shards not on this server. + if !shard.HasDataNodeID(id) { + continue + } + + path := shard.store.Path() + shard.close() + if err := os.Remove(path); err != nil { + return fmt.Errorf("shard id %d, path %s : %s", shard.ID, path, err) + } } + return nil } // ShardBySeriesID returns the shard that a series is assigned to in the group. -func (g *ShardGroup) ShardBySeriesID(seriesID uint64) *Shard { - return g.Shards[int(seriesID)%len(g.Shards)] +func (sg *ShardGroup) ShardBySeriesID(seriesID uint64) *Shard { + return sg.Shards[int(seriesID)%len(sg.Shards)] } // Duration returns the duration between the shard group's start and end time. -func (g *ShardGroup) Duration() time.Duration { return g.EndTime.Sub(g.StartTime) } +func (sg *ShardGroup) Duration() time.Duration { return sg.EndTime.Sub(sg.StartTime) } // Contains return whether the shard group contains data for the time between min and max -func (g *ShardGroup) Contains(min, max time.Time) bool { - return timeBetweenInclusive(g.StartTime, min, max) || - timeBetweenInclusive(g.EndTime, min, max) || - (g.StartTime.Before(min) && g.EndTime.After(max)) +func (sg *ShardGroup) Contains(min, max time.Time) bool { + return timeBetweenInclusive(sg.StartTime, min, max) || + timeBetweenInclusive(sg.EndTime, min, max) || + (sg.StartTime.Before(min) && sg.EndTime.After(max)) } // dropSeries will delete all data with the seriesID -func (g *ShardGroup) dropSeries(seriesIDs ...uint64) error { - for _, s := range g.Shards { +func (sg *ShardGroup) dropSeries(seriesIDs ...uint64) error { + for _, s := range sg.Shards { err := s.dropSeries(seriesIDs...) if err != nil { return err @@ -79,7 +126,14 @@ type Shard struct { } // newShard returns a new initialized Shard instance. -func newShard() *Shard { return &Shard{} } +func newShard(id uint64) *Shard { + s := &Shard{} + s.ID = id + s.stats = NewStats(fmt.Sprintf("shard %d", id)) + s.stats.Inc("initialize") + + return s +} // open initializes and opens the shard's store. func (s *Shard) open(path string, conn MessagingConn) error { @@ -180,9 +234,7 @@ func (s *Shard) close() error { if s.store != nil { _ = s.store.Close() } - if s.stats != nil { - s.stats.Inc("close") - } + s.stats.Inc("close") return nil }