From d82cd75930c57ab34d46df1d74a169f98368c6b8 Mon Sep 17 00:00:00 2001 From: Alexander Matyushentsev Date: Fri, 23 Feb 2018 11:40:26 -0800 Subject: [PATCH] Synchronize repo processing in app manager --- application/manager.go | 9 +++++- application/manager_test.go | 62 ++++++++++++++++++++++++++++++++----- util/git/client.go | 2 +- util/keylock.go | 48 ++++++++++++++++++++++++++++ 4 files changed, 112 insertions(+), 9 deletions(-) create mode 100644 util/keylock.go diff --git a/application/manager.go b/application/manager.go index 2305e45f582b4..4595c74f7c415 100644 --- a/application/manager.go +++ b/application/manager.go @@ -9,11 +9,14 @@ import ( "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1" "github.com/argoproj/argo-cd/server/repository" + "github.com/argoproj/argo-cd/util" "github.com/argoproj/argo-cd/util/git" "os" "path" + "strings" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -23,6 +26,7 @@ type Manager struct { repoService repository.RepositoryServiceServer statusRefreshTimeout time.Duration appComparator AppComparator + repoLock *util.KeyLock } // NeedRefreshAppStatus answers if application status needs to be refreshed. Returns true if application never been compared, has changed or comparison result has expired. @@ -54,7 +58,9 @@ func (m *Manager) tryRefreshAppStatus(app *v1alpha1.Application) (*v1alpha1.Appl return nil, err } - appRepoPath := path.Join(os.TempDir(), app.Name) + appRepoPath := path.Join(os.TempDir(), strings.Replace(repo.Repo, "/", "_", -1)) + m.repoLock.Lock(appRepoPath) + defer m.repoLock.Unlock(appRepoPath) err = m.gitClient.CloneOrFetch(repo.Repo, repo.Username, repo.Password, appRepoPath) if err != nil { @@ -85,5 +91,6 @@ func NewAppManager( repoService: repoService, statusRefreshTimeout: statusRefreshTimeout, appComparator: appComparator, + repoLock: util.NewKeyLock(), } } diff --git a/application/manager_test.go b/application/manager_test.go index c3a99e3b373f6..bedec7b3b285e 100644 --- a/application/manager_test.go +++ b/application/manager_test.go @@ -7,6 +7,8 @@ import ( "context" + "sync" + "github.com/argoproj/argo-cd/application" appMocks "github.com/argoproj/argo-cd/application/mocks" "github.com/argoproj/argo-cd/pkg/apis/application/v1alpha1" @@ -18,6 +20,14 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +type appComparatorStub struct { + compareAppState func(appRepoPath string, app *v1alpha1.Application) (*v1alpha1.ComparisonResult, error) +} + +func (stub *appComparatorStub) CompareAppState(appRepoPath string, app *v1alpha1.Application) (*v1alpha1.ComparisonResult, error) { + return stub.compareAppState(appRepoPath, app) +} + func TestManager(t *testing.T) { refreshTimeout := time.Second * 10 @@ -103,12 +113,50 @@ func TestManager(t *testing.T) { return repoPath == receivedRepoPath }), appSource.TargetRevision).Return(nil) - appComparatorMock.On("CompareAppState", mock.MatchedBy(func(receivedRepoPath string) bool { - return repoPath == receivedRepoPath - }), &app).Return(&v1alpha1.ComparisonResult{ - Status: v1alpha1.ComparisonStatusEqual, - }, nil) - updatedAppStatus := manager.RefreshAppStatus(&app) - assert.Equal(t, updatedAppStatus.ComparisonResult.Status, v1alpha1.ComparisonStatusEqual) + t.Run("TestCheckoutRepoAndCompareStart", func(t *testing.T) { + appComparatorMock.On("CompareAppState", mock.MatchedBy(func(receivedRepoPath string) bool { + return repoPath == receivedRepoPath + }), &app).Return(&v1alpha1.ComparisonResult{ + Status: v1alpha1.ComparisonStatusEqual, + }, nil) + + updatedAppStatus := manager.RefreshAppStatus(&app) + assert.Equal(t, updatedAppStatus.ComparisonResult.Status, v1alpha1.ComparisonStatusEqual) + }) + + t.Run("TestDoesNotProcessSameRepoSimultaneously", func(t *testing.T) { + cnt := 3 + processingCnt := 0 + completeProcessing := make(chan bool) + + comparatorStub := appComparatorStub{ + compareAppState: func(appRepoPath string, app *v1alpha1.Application) (*v1alpha1.ComparisonResult, error) { + processingCnt++ + assert.Equal(t, 1, processingCnt) + <-completeProcessing + processingCnt-- + return &v1alpha1.ComparisonResult{ + Status: v1alpha1.ComparisonStatusEqual, + }, nil + }, + } + manager := application.NewAppManager(&gitClientMock, &repoServiceMock, &comparatorStub, refreshTimeout) + var wg sync.WaitGroup + + wg.Add(cnt) + for i := 0; i < cnt; i++ { + go func() { + defer wg.Done() + manager.RefreshAppStatus(&app) + }() + } + + for i := 1; i <= cnt; i++ { + time.Sleep(10 * time.Millisecond) + completeProcessing <- true + } + + wg.Wait() + }) }) } diff --git a/util/git/client.go b/util/git/client.go index b805424723e2a..9f4c45d8f3e25 100644 --- a/util/git/client.go +++ b/util/git/client.go @@ -26,7 +26,7 @@ func (m *NativeGitClient) CloneOrFetch(repo string, username string, password st needClone = true } else { cmd := exec.Command("git", "status") - cmd.Dir = repo + cmd.Dir = repoPath _, err = cmd.Output() needClone = err != nil } diff --git a/util/keylock.go b/util/keylock.go new file mode 100644 index 0000000000000..3290b358c87d7 --- /dev/null +++ b/util/keylock.go @@ -0,0 +1,48 @@ +package util + +import "sync" + +// Allows to lock by string key +type KeyLock struct { + giantLock sync.RWMutex + locks map[string]*sync.Mutex +} + +// NewKeyLock creates new instance of KeyLock +func NewKeyLock() *KeyLock { + return &KeyLock{ + giantLock: sync.RWMutex{}, + locks: map[string]*sync.Mutex{}, + } +} + +func (keyLock *KeyLock) getLock(key string) *sync.Mutex { + keyLock.giantLock.RLock() + if lock, ok := keyLock.locks[key]; ok { + keyLock.giantLock.RUnlock() + return lock + } + + keyLock.giantLock.RUnlock() + keyLock.giantLock.Lock() + + if lock, ok := keyLock.locks[key]; ok { + keyLock.giantLock.Unlock() + return lock + } + + lock := &sync.Mutex{} + keyLock.locks[key] = lock + keyLock.giantLock.Unlock() + return lock +} + +// Lock blocks goroutine using key specific mutex +func (keyLock *KeyLock) Lock(key string) { + keyLock.getLock(key).Lock() +} + +// Unlock releases key specific mutex +func (keyLock *KeyLock) Unlock(key string) { + keyLock.getLock(key).Unlock() +}