Skip to content

Commit ef8c111

Browse files
authored
Merge pull request #2231 from hashicorp/b-close-ch
Locking appropriately before closing the channel to indicate migration
2 parents 6f8852e + c254fbf commit ef8c111

File tree

1 file changed

+33
-7
lines changed

1 file changed

+33
-7
lines changed

client/client.go

+33-7
Original file line numberDiff line numberDiff line change
@@ -150,14 +150,40 @@ type Client struct {
150150
vaultClient vaultclient.VaultClient
151151

152152
// migratingAllocs is the set of allocs whose data migration is in flight
153-
migratingAllocs map[string]chan struct{}
153+
migratingAllocs map[string]*migrateAllocCtrl
154154
migratingAllocsLock sync.Mutex
155155

156156
// garbageCollector is used to garbage collect terminal allocations present
157157
// in the node automatically
158158
garbageCollector *AllocGarbageCollector
159159
}
160160

161+
// migrateAllocCtrl indicates whether migration is complete
162+
type migrateAllocCtrl struct {
163+
ch chan struct{}
164+
closed bool
165+
chLock sync.Mutex
166+
}
167+
168+
func newMigrateAllocCtrl() *migrateAllocCtrl {
169+
return &migrateAllocCtrl{
170+
ch: make(chan struct{}),
171+
}
172+
}
173+
174+
func (m *migrateAllocCtrl) closeCh() {
175+
m.chLock.Lock()
176+
defer m.chLock.Unlock()
177+
178+
if m.closed {
179+
return
180+
}
181+
182+
// If channel is not closed then close it
183+
m.closed = true
184+
close(m.ch)
185+
}
186+
161187
var (
162188
// noServersErr is returned by the RPC method when the client has no
163189
// configured servers. This is used to trigger Consul discovery if
@@ -188,7 +214,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
188214
blockedAllocations: make(map[string]*structs.Allocation),
189215
allocUpdates: make(chan *structs.Allocation, 64),
190216
shutdownCh: make(chan struct{}),
191-
migratingAllocs: make(map[string]chan struct{}),
217+
migratingAllocs: make(map[string]*migrateAllocCtrl),
192218
servers: newServerList(),
193219
triggerDiscoveryCh: make(chan struct{}),
194220
serversDiscoveredCh: make(chan struct{}),
@@ -1420,7 +1446,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
14201446
// Stopping the migration if the allocation doesn't need any
14211447
// migration
14221448
if !update.updated.ShouldMigrate() {
1423-
close(ch)
1449+
ch.closeCh()
14241450
}
14251451
}
14261452
}
@@ -1455,7 +1481,7 @@ func (c *Client) runAllocs(update *allocUpdates) {
14551481
// prevents a race between a finishing blockForRemoteAlloc and
14561482
// another invocation of runAllocs
14571483
if _, ok := c.getAllocRunners()[add.PreviousAllocation]; !ok {
1458-
c.migratingAllocs[add.ID] = make(chan struct{})
1484+
c.migratingAllocs[add.ID] = newMigrateAllocCtrl()
14591485
go c.blockForRemoteAlloc(add)
14601486
}
14611487
}
@@ -1533,7 +1559,7 @@ ADDALLOC:
15331559

15341560
// waitForAllocTerminal waits for an allocation with the given alloc id to
15351561
// transition to terminal state and blocks the caller until then.
1536-
func (c *Client) waitForAllocTerminal(allocID string, stopCh chan struct{}) (*structs.Allocation, error) {
1562+
func (c *Client) waitForAllocTerminal(allocID string, stopCh *migrateAllocCtrl) (*structs.Allocation, error) {
15371563
req := structs.AllocSpecificRequest{
15381564
AllocID: allocID,
15391565
QueryOptions: structs.QueryOptions{
@@ -1551,7 +1577,7 @@ func (c *Client) waitForAllocTerminal(allocID string, stopCh chan struct{}) (*st
15511577
select {
15521578
case <-time.After(retry):
15531579
continue
1554-
case <-stopCh:
1580+
case <-stopCh.ch:
15551581
return nil, fmt.Errorf("giving up waiting on alloc %v since migration is not needed", allocID)
15561582
case <-c.shutdownCh:
15571583
return nil, fmt.Errorf("aborting because client is shutting down")
@@ -1665,7 +1691,7 @@ func (c *Client) unarchiveAllocDir(resp io.ReadCloser, allocID string, pathToAll
16651691
for {
16661692
// See if the alloc still needs migration
16671693
select {
1668-
case <-stopMigrating:
1694+
case <-stopMigrating.ch:
16691695
os.RemoveAll(pathToAllocDir)
16701696
c.logger.Printf("[INFO] client: stopping migration of allocdir for alloc: %v", allocID)
16711697
return nil

0 commit comments

Comments
 (0)