Skip to content

Commit

Permalink
Flesh out inactive purge
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Oct 23, 2015
1 parent d61480e commit 574a108
Showing 1 changed file with 48 additions and 2 deletions.
50 changes: 48 additions & 2 deletions services/hh/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ func (s *Service) Open() error {
for _, file := range files {
nodeID, err := strconv.ParseUint(file.Name(), 10, 64)
if err != nil {
return err
// Not a number? Skip it.
continue
}

n := NewNodeProcessor(nodeID, s.pathforNode(nodeID), s.shardWriter, s.metastore)
Expand Down Expand Up @@ -170,8 +171,53 @@ func (s *Service) WriteShard(shardID, ownerID uint64, points []models.Point) err
// deleteInactiveQueues will cause the service to remove queues for inactive nodes.
func (s *Service) purgeInactiveProcessors() {
defer s.wg.Done()
ticker := time.NewTicker(time.Hour)
ticker := time.NewTicker(time.Duration(s.cfg.PurgeInterval))
defer ticker.Stop()

for {
select {
case <-s.closing:
return
case <-ticker.C:
func() {
s.mu.Lock()
defer s.mu.Unlock()

for k, v := range s.processors {
lm, err := v.LastModified()
if err != nil {
s.Logger.Println("failed to determine LastModified for processor %d: %s", k, err.Error())
continue
}

ni, err := s.metastore.Node(k)
if err != nil {
s.Logger.Println("failed to determine if node %d is active: %s", k, err.Error())
continue
}
if ni != nil {
// Node is active.
continue
}

if !lm.Before(time.Now().Add(-time.Duration(s.cfg.MaxAge))) {
// Node processor contains too-young data.
continue
}

if err := v.Close(); err != nil {
s.Logger.Println("failed to close node processor %d: %s", k, err.Error())
continue
}
if err := v.Purge(); err != nil {
s.Logger.Println("failed to close node processor %d: %s", k, err.Error())
continue
}
delete(s.processors, k)
}
}()
}
}
}

// pathforNode returns the directory for HH data, for the given node.
Expand Down

0 comments on commit 574a108

Please sign in to comment.