Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix snapshot 500 error #2408

Merged
merged 3 commits into from
Apr 23, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Bugfixes
- [#2374](https://github.com/influxdb/influxdb/issues/2374): Two different panics during SELECT percentile
- [#2404](https://github.com/influxdb/influxdb/pull/2404): Mean and percentile function fixes
- [#2408](https://github.com/influxdb/influxdb/pull/2408): Fix snapshot 500 error

## v0.9.0-rc27 [04-23-2015]

Expand Down
5 changes: 2 additions & 3 deletions internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,8 @@ func Test_seriesIDs_reject(t *testing.T) {
// Test shard group selection.
func TestShardGroup_Contains(t *testing.T) {
// Make a shard group 1 hour in duration
g := newShardGroup()
g.StartTime, _ = time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
g.EndTime = g.StartTime.Add(time.Hour)
tm, _ := time.Parse(time.RFC3339, "2000-01-01T00:00:00Z")
g := newShardGroup(tm, time.Hour)

if !g.Contains(g.StartTime.Add(-time.Minute), g.EndTime) {
t.Fatal("shard group not selected when min before start time")
Expand Down
75 changes: 19 additions & 56 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,7 @@ func (s *Server) CreateShardGroupIfNotExists(database, policy string, timestamp
return err
}

func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err error) {
func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) error {
var c createShardGroupIfNotExistsCommand
mustUnmarshalJSON(m.Data, &c)

Expand All @@ -1149,11 +1149,6 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err
return nil
}

// If no shards match then create a new one.
g := newShardGroup()
g.StartTime = c.Timestamp.Truncate(rp.ShardGroupDuration).UTC()
g.EndTime = g.StartTime.Add(rp.ShardGroupDuration).UTC()

// Sort nodes so they're consistently assigned to the shards.
nodes := make([]*DataNode, 0, len(s.dataNodes))
for _, n := range s.dataNodes {
Expand All @@ -1174,44 +1169,14 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err
// replicated the correct number of times.
shardN := len(nodes) / replicaN

// Create a shard based on the node count and replication factor.
g.Shards = make([]*Shard, shardN)
for i := range g.Shards {
g.Shards[i] = newShard()
}

// Persist to metastore if a shard was created.
if err = s.meta.mustUpdate(m.Index, func(tx *metatx) error {
// Generate an ID for the group.
g.ID = tx.nextShardGroupID()

// Generate an ID for each shard.
for _, sh := range g.Shards {
sh.ID = tx.nextShardID()
sh.stats = NewStats(fmt.Sprintf("shard %d", sh.ID))
sh.stats.Inc("create")
}

// Assign data nodes to shards via round robin.
// Start from a repeatably "random" place in the node list.
nodeIndex := int(m.Index % uint64(len(nodes)))
for _, sh := range g.Shards {
for i := 0; i < replicaN; i++ {
node := nodes[nodeIndex%len(nodes)]
sh.DataNodeIDs = append(sh.DataNodeIDs, node.ID)
nodeIndex++
}
}
s.stats.Add("shardsCreated", int64(len(g.Shards)))

// Retention policy has a new shard group, so update the policy.
rp.shardGroups = append(rp.shardGroups, g)
g := newShardGroup(c.Timestamp, rp.ShardGroupDuration)

return tx.saveDatabase(db)
}); err != nil {
g.close()
return
// Create and intialize shards based on the node count and replication factor.
if err := g.initialize(m.Index, shardN, replicaN, db, rp, nodes, s.meta); err != nil {
g.close(s.id)
return err
}
s.stats.Add("shardsCreated", int64(shardN))

// Open shards assigned to this server.
for _, sh := range g.Shards {
Expand All @@ -1231,7 +1196,7 @@ func (s *Server) applyCreateShardGroupIfNotExists(m *messaging.Message) (err err
s.shards[sh.ID] = sh
}

return
return nil
}

// DeleteShardGroup deletes the shard group identified by shardID.
Expand Down Expand Up @@ -1265,19 +1230,11 @@ func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) {
return nil
}

for _, shard := range g.Shards {
// Ignore shards not on this server.
if !shard.HasDataNodeID(s.id) {
continue
}

path := shard.store.Path()
shard.close()
if err := os.Remove(path); err != nil {
// Log, but keep going. This can happen if shards were deleted, but the server exited
// before it acknowledged the delete command.
log.Printf("error deleting shard %s, group ID %d, policy %s: %s", path, g.ID, rp.Name, err.Error())
}
// close the shard group
if err := g.close(s.id); err != nil {
// Log, but keep going. This can happen if shards were deleted, but the server exited
// before it acknowledged the delete command.
log.Printf("error deleting shard: policy %s, group ID %d, %s", rp.Name, g.ID, err)
}

// Remove from metastore.
Expand All @@ -1286,6 +1243,12 @@ func (s *Server) applyDeleteShardGroup(m *messaging.Message) (err error) {
s.stats.Add("shardsDeleted", int64(len(g.Shards)))
return tx.saveDatabase(db)
})

// remove from lookups.
for _, sh := range g.Shards {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was causing snapshotting to break.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This "lookup" map has caused me problems before. We should think about seeing if there is a way to solve the problems it is solving in another way...later, of course.

delete(s.shards, sh.ID)
}

return
}

Expand Down
88 changes: 70 additions & 18 deletions shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"os"
"sync"
"time"

Expand All @@ -20,33 +21,79 @@ type ShardGroup struct {
}

// newShardGroup returns a new initialized ShardGroup instance.
func newShardGroup() *ShardGroup { return &ShardGroup{} }
func newShardGroup(t time.Time, d time.Duration) *ShardGroup {
sg := ShardGroup{}
sg.StartTime = t.Truncate(d).UTC()
sg.EndTime = sg.StartTime.Add(d).UTC()

// close closes all shards.
func (g *ShardGroup) close() {
for _, sh := range g.Shards {
_ = sh.close()
return &sg
}

func (sg *ShardGroup) initialize(index uint64, shardN, replicaN int, db *database, rp *RetentionPolicy, nodes []*DataNode, meta *metastore) error {
sg.Shards = make([]*Shard, shardN)

// Persist to metastore if a shard was created.
return meta.mustUpdate(index, func(tx *metatx) error {
// Generate an ID for the group.
sg.ID = tx.nextShardGroupID()

// Generate an ID for each shard.
for i := range sg.Shards {
sg.Shards[i] = newShard(tx.nextShardID())
}

// Assign data nodes to shards via round robin.
// Start from a repeatably "random" place in the node list.
nodeIndex := int(index % uint64(len(nodes)))
for _, sh := range sg.Shards {
for i := 0; i < replicaN; i++ {
node := nodes[nodeIndex%len(nodes)]
sh.DataNodeIDs = append(sh.DataNodeIDs, node.ID)
nodeIndex++
}
}

// Retention policy has a new shard group, so update the policy.
rp.shardGroups = append(rp.shardGroups, sg)

return tx.saveDatabase(db)
})
}

func (sg *ShardGroup) close(id uint64) error {
for _, shard := range sg.Shards {
// Ignore shards not on this server.
if !shard.HasDataNodeID(id) {
continue
}

path := shard.store.Path()
shard.close()
if err := os.Remove(path); err != nil {
return fmt.Errorf("shard id %d, path %s : %s", shard.ID, path, err)
}
}
return nil
}

// ShardBySeriesID returns the shard that a series is assigned to in the group.
func (g *ShardGroup) ShardBySeriesID(seriesID uint64) *Shard {
return g.Shards[int(seriesID)%len(g.Shards)]
func (sg *ShardGroup) ShardBySeriesID(seriesID uint64) *Shard {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is g to sg considered more idiomatic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the letter to typically be the first letter of the variable I'm referencing. If the variable is made of two words, I usually use the abbreviation. Idiomatic might be a strong word for this, and wasn't really the reason I did it. The real reason is that sometimes I have shards, and like to use s for the shard variable name, and I don't want that to get confused for the ShardGroup which also starts with an s.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

return sg.Shards[int(seriesID)%len(sg.Shards)]
}

// Duration returns the duration between the shard group's start and end time.
func (g *ShardGroup) Duration() time.Duration { return g.EndTime.Sub(g.StartTime) }
func (sg *ShardGroup) Duration() time.Duration { return sg.EndTime.Sub(sg.StartTime) }

// Contains return whether the shard group contains data for the time between min and max
func (g *ShardGroup) Contains(min, max time.Time) bool {
return timeBetweenInclusive(g.StartTime, min, max) ||
timeBetweenInclusive(g.EndTime, min, max) ||
(g.StartTime.Before(min) && g.EndTime.After(max))
func (sg *ShardGroup) Contains(min, max time.Time) bool {
return timeBetweenInclusive(sg.StartTime, min, max) ||
timeBetweenInclusive(sg.EndTime, min, max) ||
(sg.StartTime.Before(min) && sg.EndTime.After(max))
}

// dropSeries will delete all data with the seriesID
func (g *ShardGroup) dropSeries(seriesIDs ...uint64) error {
for _, s := range g.Shards {
func (sg *ShardGroup) dropSeries(seriesIDs ...uint64) error {
for _, s := range sg.Shards {
err := s.dropSeries(seriesIDs...)
if err != nil {
return err
Expand Down Expand Up @@ -79,7 +126,14 @@ type Shard struct {
}

// newShard returns a new initialized Shard instance.
func newShard() *Shard { return &Shard{} }
func newShard(id uint64) *Shard {
s := &Shard{}
s.ID = id
s.stats = NewStats(fmt.Sprintf("shard %d", id))
s.stats.Inc("initialize")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there much point to this stat?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you trying to track shard state via the stats? Definitely a valid thing to do, but I guess might think about other ways to do it, long-term.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This stat is a complete hack for debugging right now. If this does indeed fix the problem, we should remove it. It did already help as it showed me that a shard had been created, opened, and closed, and they tried to read from a snapshot. So, if this fixes the bug, then it has outlived it's purpose (which it served well for).


return s
}

// open initializes and opens the shard's store.
func (s *Shard) open(path string, conn MessagingConn) error {
Expand Down Expand Up @@ -180,9 +234,7 @@ func (s *Shard) close() error {
if s.store != nil {
_ = s.store.Close()
}
if s.stats != nil {
s.stats.Inc("close")
}
s.stats.Inc("close")
return nil
}

Expand Down