Skip to content

Commit

Permalink
Synchronize repo processing in app manager
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Matyushentsev authored and Alexander Matyushentsev committed Feb 23, 2018
1 parent dd47c74 commit d82cd75
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 9 deletions.
9 changes: 8 additions & 1 deletion application/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (

"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
"github.com/argoproj/argo-cd/server/repository"
"github.com/argoproj/argo-cd/util"
"github.com/argoproj/argo-cd/util/git"

"os"
"path"

"strings"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand All @@ -23,6 +26,7 @@ type Manager struct {
repoService repository.RepositoryServiceServer
statusRefreshTimeout time.Duration
appComparator AppComparator
repoLock *util.KeyLock
}

// NeedRefreshAppStatus answers if application status needs to be refreshed. Returns true if application never been compared, has changed or comparison result has expired.
Expand Down Expand Up @@ -54,7 +58,9 @@ func (m *Manager) tryRefreshAppStatus(app *v1alpha1.Application) (*v1alpha1.Appl
return nil, err
}

appRepoPath := path.Join(os.TempDir(), app.Name)
appRepoPath := path.Join(os.TempDir(), strings.Replace(repo.Repo, "/", "_", -1))
m.repoLock.Lock(appRepoPath)
defer m.repoLock.Unlock(appRepoPath)

err = m.gitClient.CloneOrFetch(repo.Repo, repo.Username, repo.Password, appRepoPath)
if err != nil {
Expand Down Expand Up @@ -85,5 +91,6 @@ func NewAppManager(
repoService: repoService,
statusRefreshTimeout: statusRefreshTimeout,
appComparator: appComparator,
repoLock: util.NewKeyLock(),
}
}
62 changes: 55 additions & 7 deletions application/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (

"context"

"sync"

"github.com/argoproj/argo-cd/application"
appMocks "github.com/argoproj/argo-cd/application/mocks"
"github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1"
Expand All @@ -18,6 +20,14 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type appComparatorStub struct {
compareAppState func(appRepoPath string, app *v1alpha1.Application) (*v1alpha1.ComparisonResult, error)
}

func (stub *appComparatorStub) CompareAppState(appRepoPath string, app *v1alpha1.Application) (*v1alpha1.ComparisonResult, error) {
return stub.compareAppState(appRepoPath, app)
}

func TestManager(t *testing.T) {

refreshTimeout := time.Second * 10
Expand Down Expand Up @@ -103,12 +113,50 @@ func TestManager(t *testing.T) {
return repoPath == receivedRepoPath
}), appSource.TargetRevision).Return(nil)

appComparatorMock.On("CompareAppState", mock.MatchedBy(func(receivedRepoPath string) bool {
return repoPath == receivedRepoPath
}), &app).Return(&v1alpha1.ComparisonResult{
Status: v1alpha1.ComparisonStatusEqual,
}, nil)
updatedAppStatus := manager.RefreshAppStatus(&app)
assert.Equal(t, updatedAppStatus.ComparisonResult.Status, v1alpha1.ComparisonStatusEqual)
t.Run("TestCheckoutRepoAndCompareStart", func(t *testing.T) {
appComparatorMock.On("CompareAppState", mock.MatchedBy(func(receivedRepoPath string) bool {
return repoPath == receivedRepoPath
}), &app).Return(&v1alpha1.ComparisonResult{
Status: v1alpha1.ComparisonStatusEqual,
}, nil)

updatedAppStatus := manager.RefreshAppStatus(&app)
assert.Equal(t, updatedAppStatus.ComparisonResult.Status, v1alpha1.ComparisonStatusEqual)
})

t.Run("TestDoesNotProcessSameRepoSimultaneously", func(t *testing.T) {
cnt := 3
processingCnt := 0
completeProcessing := make(chan bool)

comparatorStub := appComparatorStub{
compareAppState: func(appRepoPath string, app *v1alpha1.Application) (*v1alpha1.ComparisonResult, error) {
processingCnt++
assert.Equal(t, 1, processingCnt)
<-completeProcessing
processingCnt--
return &v1alpha1.ComparisonResult{
Status: v1alpha1.ComparisonStatusEqual,
}, nil
},
}
manager := application.NewAppManager(&gitClientMock, &repoServiceMock, &comparatorStub, refreshTimeout)
var wg sync.WaitGroup

wg.Add(cnt)
for i := 0; i < cnt; i++ {
go func() {
defer wg.Done()
manager.RefreshAppStatus(&app)
}()
}

for i := 1; i <= cnt; i++ {
time.Sleep(10 * time.Millisecond)
completeProcessing <- true
}

wg.Wait()
})
})
}
2 changes: 1 addition & 1 deletion util/git/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (m *NativeGitClient) CloneOrFetch(repo string, username string, password st
needClone = true
} else {
cmd := exec.Command("git", "status")
cmd.Dir = repo
cmd.Dir = repoPath
_, err = cmd.Output()
needClone = err != nil
}
Expand Down
48 changes: 48 additions & 0 deletions util/keylock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package util

import "sync"

// Allows to lock by string key
type KeyLock struct {
giantLock sync.RWMutex
locks map[string]*sync.Mutex
}

// NewKeyLock creates new instance of KeyLock
func NewKeyLock() *KeyLock {
return &KeyLock{
giantLock: sync.RWMutex{},
locks: map[string]*sync.Mutex{},
}
}

func (keyLock *KeyLock) getLock(key string) *sync.Mutex {
keyLock.giantLock.RLock()
if lock, ok := keyLock.locks[key]; ok {
keyLock.giantLock.RUnlock()
return lock
}

keyLock.giantLock.RUnlock()
keyLock.giantLock.Lock()

if lock, ok := keyLock.locks[key]; ok {
keyLock.giantLock.Unlock()
return lock
}

lock := &sync.Mutex{}
keyLock.locks[key] = lock
keyLock.giantLock.Unlock()
return lock
}

// Lock blocks goroutine using key specific mutex
func (keyLock *KeyLock) Lock(key string) {
keyLock.getLock(key).Lock()
}

// Unlock releases key specific mutex
func (keyLock *KeyLock) Unlock(key string) {
keyLock.getLock(key).Unlock()
}

0 comments on commit d82cd75

Please sign in to comment.