From 6b19bc2da9a25db9405c5804877f14e199b08c2e Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 5 Dec 2016 18:42:13 -0800 Subject: [PATCH 01/19] Changes to how ECS AWS API is used to minimize API calls Due to AWS API rate limits, we need to minimize API calls as much as possible. Our stated objectives: * for all displayed tasks and services to have up-to-date metadata * for all tasks to map to services if able My approach here: * Tasks only contain immutable fields (that we care about). We cache tasks forever. We only DescribeTasks the first time we see a new task. * We attempt to match tasks to services with what info we have. Any "referenced" services, ie. a service with at least one matching task, needs to be updated to refresh changing data. * In the event that a task doesn't match any of the (updated) services, ie. a new service entirely needs to be found, we do a full list and detail of all services (we don't re-detail ones we just refreshed). * To avoid unbounded memory usage, we evict tasks and services from the cache after 1 minute without use. This should be long enough for things like temporary failures to be glossed over. This gives us exactly one call per task, and one call per referenced service per report, which is unavoidable to maintain fresh data. Expensive "describe all" service queries are kept to only when newly-referenced services appear, which should be rare. We could make a few very minor improvements here, such as trying to refresh unreferenced but known services before doing a list query, or getting details one by one when "describing all" and stopping when all matches have been found, but I believe these would produce very minor, if any, gains in number of calls while having an unjustifiable effect on latency since we wouldn't be able to do requests as concurrently. Speaking of which, this change has a minor performance impact. Even though we're now doing less calls, we can't do them as concurrently. Old code: concurrently: describe tasks (1 call) sequentially: list services (1 call) describe services (N calls concurrently) Assuming full concurrency, total latency: 2 end-to-end calls New code (worst case): sequentially: describe tasks (1 call) describe services (N calls concurrently) list services (1 call) describe services (N calls concurrently) Assuming full concurrency, total latency: 4 end-to-end calls In practical terms, I don't expect this to matter. --- probe/awsecs/client.go | 351 ++++++++++++++++++++++++++++++--------- probe/awsecs/reporter.go | 11 +- 2 files changed, 281 insertions(+), 81 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index 2fb3071f5c..a387939f54 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -2,6 +2,7 @@ package awsecs import ( "sync" + "time" log "github.com/Sirupsen/logrus" "github.com/aws/aws-sdk-go/aws" @@ -14,11 +15,49 @@ import ( type ecsClient struct { client *ecs.ECS cluster string + taskCache map[string]ecsTask + serviceCache map[string]ecsService +} + +// Since we're caching tasks heavily, we ensure no mistakes by casting into a structure +// that only contains immutable attributes of the resource. +type ecsTask struct { + taskARN string + createdAt time.Time + taskDefinitionARN string + + // These started fields are immutable once set, and guarenteed to be set once the task is running, + // which we know it is because otherwise we wouldn't be looking at it. + startedAt time.Time + startedBy string // tag or deployment id + + // Metadata about this cache copy + fetchedAt time.Time + lastUsedAt time.Time +} + +// Services are highly mutable and so we can only cache them on a best-effort basis. +// We have to refresh referenced (ie. has an associated task) services each report +// but we avoid re-listing services unless we can't find a service for a task. +type ecsService struct { + serviceName string + createdAt time.Time + + // The following values may be stale in a cached copy + deploymentIDs []string + desiredCount int64 + pendingCount int64 + runningCount int64 + taskDefinitionARN string + + // Metadata about this cache copy + fetchedAt time.Time + lastUsedAt time.Time } type ecsInfo struct { - tasks map[string]*ecs.Task - services map[string]*ecs.Service + tasks map[string]ecsTask + services map[string]ecsService taskServiceMap map[string]string } @@ -33,67 +72,116 @@ func newClient(cluster string) (*ecsClient, error) { return &ecsClient{ client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), cluster: cluster, + taskCache: map[string]ecsTask{}, + serviceCache: map[string]ecsService{}, }, nil } -// returns a map from deployment ids to service names -func (c ecsClient) getDeploymentMap(services map[string]*ecs.Service) map[string]string { - results := map[string]string{} - for serviceName, service := range services { - for _, deployment := range service.Deployments { - results[*deployment.Id] = serviceName - } +func newECSTask(task *ecs.Task) ecsTask { + now = time.Now() + return ecsTask{ + taskARN: *task.TaskARN, + createdAt: *task.CreatedAt, + taskDefinitionARN: *task.TaskDefinitionArn, + startedAt: *task.StartedAt, + startedBy: *task.StartedBy, + fetchedAt: now, + lastUsedAt: now, } - return results } -// cannot fail as it will attempt to deliver partial results, though that may end up being no results -func (c ecsClient) getServices() map[string]*ecs.Service { - results := map[string]*ecs.Service{} - lock := sync.Mutex{} // lock mediates access to results - - group := sync.WaitGroup{} - - err := c.client.ListServicesPages( - &ecs.ListServicesInput{Cluster: &c.cluster}, - func(page *ecs.ListServicesOutput, lastPage bool) bool { - // describe each page of 10 (the max for one describe command) concurrently - group.Add(1) - serviceArns := page.ServiceArns - go func() { - defer group.Done() - - resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{ - Cluster: &c.cluster, - Services: serviceArns, - }) - if err != nil { - log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err) - return - } +func newECSService(service *ecs.Service) ecsService { + now = time.Now() + deploymentIDs = make([]string, 0, len(service.Deployments)) + for _, deployment := range service.Deployments { + deploymentIDs = append(deploymentIDs, *deployment.ID) + } + return ecsService{ + serviceName: *service.ServiceName, + createdAt: *service.CreatedAt, + deploymentIDs: deploymentIDs, + desiredCount: *service.DesiredCount, + pendingCount: *service.PendingCount, + runningCount: *service.RunningCount, + taskDefinitionARN: *service.TaskDefinitionARN, + fetchedAt: now, + lastUsedAt: now, + } +} - for _, failure := range resp.Failures { - log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, *failure.Reason) +// Returns a channel from which service ARNs can be read. +// Cannot fail as it will attempt to deliver partial results, though that may end up being no results. +func (c ecsClient) listServices() <-chan string { + results := make(chan string) + go func() { + err := c.client.ListServicesPages( + &ecs.ListServicesInput{Cluster: &c.cluster}, + func(page *ecs.ListServicesOutput, lastPage bool) bool { + for _, arn := range page.ServiceArns { + results <- arn } + } + ) + if err != nil { + log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err) + } + close(results) + }() + return results +} - lock.Lock() - for _, service := range resp.Services { - results[*service.ServiceName] = service - } - lock.Unlock() - }() - return true - }, - ) - group.Wait() +// Returns (input, done) channels. Service ARNs given to input are batched and details are fetched, +// with full ecsService objects being put into the cache. Closes done when finished. +func (c ecsClient) describeServices() (chan<- string, <-chan bool) { + input := make(chan string) - if err != nil { - log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err) - } - return results + go func() { + const MAX_SERVICES = 10 // How many services we can put in one Describe command + group := sync.WaitGroup{} + lock := sync.Mutex{} // mediates access to the service cache when writing results + + page := make([]string, 0, MAX_SERVICES) + for arn := range input { + page = append(page, arn) + if len(page) == MAX_SERVICES { + group.Add(1) + + go func(arns []string) { + defer group.Done() + + resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{ + Cluster: &c.cluster, + Services: arns, + }) + if err != nil { + log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err) + return + } + + for _, failure := range resp.Failures { + log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason) + } + + mutex.Lock() + for _, service := range resp.Services { + c.serviceCache[*service.ServiceName] = newECSService(service) + } + mutex.Unlock() + }(page) + + page = make([]string, 0, MAX_SERVICES) + } + } + + group.Wait() + close(done) + }() + + return input, done } -func (c ecsClient) getTasks(taskArns []string) (map[string]*ecs.Task, error) { +// get details on given tasks, updating cache with the results +func (c ecsClient) getTasks(taskArns []string) { taskPtrs := make([]*string, len(taskArns)) for i := range taskArns { taskPtrs[i] = &taskArns[i] @@ -106,45 +194,160 @@ func (c ecsClient) getTasks(taskArns []string) (map[string]*ecs.Task, error) { Tasks: taskPtrs, }) if err != nil { - return nil, err + log.Warnf("Failed to describe ECS tasks, ECS service report may be incomplete: %v", err) + return } for _, failure := range resp.Failures { log.Warnf("Failed to describe ECS task %s, ECS service report may be incomplete: %s", *failure.Arn, *failure.Reason) } - results := make(map[string]*ecs.Task, len(resp.Tasks)) for _, task := range resp.Tasks { - results[*task.TaskArn] = task + c.taskCache[*task.TaskArn] = newECSTask(task) } - return results, nil } -// returns a map from task ARNs to service names -func (c ecsClient) getInfo(taskArns []string) (ecsInfo, error) { - servicesChan := make(chan map[string]*ecs.Service) - go func() { - servicesChan <- c.getServices() - }() +// Evict entries from the caches which have not been used within the eviction interval. +func (c ecsClient) evictOldCacheItems() { + const EVICT_TIME = time.Minute + now = time.Now() - // do these two fetches in parallel - tasks, err := c.getTasks(taskArns) - services := <-servicesChan + for arn, task := range c.taskCache { + if now - task.lastUsedAt > EVICT_TIME { + delete(c.taskCache, arn) + } + } - if err != nil { - return ecsInfo{}, err + for name, service := range c.serviceCache { + if now - service.lastUsedAt > EVICT_TIME { + delete(c.serviceCache, name) + } + } +} + +// Try to match a list of task ARNs to service names using cached info. +// Returns (task to service map, unmatched tasks). Ignores tasks whose startedby values +// don't appear to point to a service. +func (c ecsClient) matchTasksServices(taskARNs []string) (map[string]string, []string) { + const SERVICE_PREFIX = "aws-svc" // TODO confirm this + + deploymentMap := map[string]string{} + for serviceName, service := range c.serviceCache { + for _, deployment := range service.DeploymentIDs { + deploymentMap[deployment] = serviceName + } + } + + results := map[string]string{} + unmatched := []string{} + for _, taskARN := range taskARNs { + task, ok := c.taskCache[taskARN] + if ! ok { + // this can happen if we have a failure while describing tasks, just pretend the task doesn't exist + continue + } + if ! strings.HasPrefix(task.startedBy, SERVICE_PREFIX) { + // task was not started by a service + continue + } + if service, ok := deploymentMap[task.startedBy]; ok { + results[taskARN] = service.serviceName + } else { + unmatched = append(unmatched, taskARN) + } + } + + return results, unmatched +} + +// Returns a ecsInfo struct containing data needed for a report. +func (c ecsClient) getInfo(taskARNs []string) ecsInfo { + + // We do a weird order of operations here to minimize unneeded cache refreshes. + // First, we ensure we have all the tasks we need, and fetch the ones we don't. + // We also mark the tasks as being used here to prevent eviction. + tasksToFetch := []string{} + now = Time.Now() + for _, taskARN := range taskARNs { + if task, ok := c.taskCache[taskARN]; ok { + task.lastUsedAt = now + } else { + tasksToFetch = append(tasksToFetch, taskARN) + } + } + // This might not fully succeed, but we only try once and ignore any further missing tasks. + c.getTasks(tasksToFetch) + + // We're going to do this matching process potentially several times, but that's ok - it's quite cheap. + // First, we want to see how far we get with existing data, and identify the set of services + // we'll need to refresh regardless. + taskServiceMap, unmatched := c.matchTasksServices(taskARNs) + + // In order to ensure service details are fresh, we need to refresh any referenced services + toDescribe, done := describeServices() + servicesRefreshed := map[string]bool{} + for taskARN, serviceName := range taskServiceMap { + if servicesRefreshed[serviceName] { + continue + } + toDescribe <- serviceName + servicesRefreshed[serviceName] = true + } + close(toDescribe) + <-done + + // In refreshing, we may have picked up any new deployment ids. + // If we still have tasks unmatched, we try again. + if len(unmatched) > 0 { + taskServiceMap, unmatched = c.matchTasksServices(taskARNs) } - deploymentMap := c.getDeploymentMap(services) + // If we still have tasks unmatched, we'll have to try harder. Get a list of all services and, + // if not already refreshed, fetch them. + if len(unmatched) > 0 { + serviceNamesChan := listServices() + toDescribe, done := describeServices() + go func() { + for serviceName := range serviceNamesChan { + if ! servicesRefreshed[serviceName] { + toDescribe <- serviceName + servicesRefreshed[serviceName] = true + } + close(toDescribe) + } + }() + <-done - taskServiceMap := map[string]string{} - for taskArn, task := range tasks { - // Note not all tasks map to a deployment, or we could otherwise mismatch due to races. - // It's safe to just ignore all these cases and consider them "non-service" tasks. - if serviceName, ok := deploymentMap[*task.StartedBy]; ok { - taskServiceMap[taskArn] = serviceName + taskServiceMap, unmatched = c.matchTasksServices(taskARNs) + // If we still have unmatched at this point, we don't care - this may be due to partial failures, + // race conditions, and other weirdness. + } + + // The maps to return are the referenced subsets of the full caches + tasks := map[string]ecsTask{} + for _, taskARN := range taskARNs { + // It's possible that tasks could still be missing from the cache if describe tasks failed. + // We'll just pretend they don't exist. + if task, ok := c.taskCache[taskARN]; ok { + tasks[taskARN] = task } } - return ecsInfo{services: services, tasks: tasks, taskServiceMap: taskServiceMap}, nil + services := map[string]ecsService{} + for taskARN, serviceName := range taskServiceMap { + if _, ok := taskServiceMap[serviceName]; ok { + // Already present. This is expected since multiple tasks can map to the same service. + continue + } + if service, ok := c.serviceCache[serviceName]; ok { + services[serviceName] = service + } else { + log.Errorf("Service %s referenced by task %s in service map but not found in cache, this shouldn't be able to happen. Removing task and continuing.", serviceName, taskARN) + delete(taskServiceMap, taskARN) + } + } + + c.evictOldCacheItems() + + return ecsInfo{services: services, tasks: tasks, taskServiceMap: taskServiceMap} } diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index 9a6e2321cb..af1b762234 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -99,18 +99,15 @@ func (Reporter) Tag(rpt report.Report) (report.Report, error) { taskArns = append(taskArns, taskArn) } - ecsInfo, err := client.getInfo(taskArns) - if err != nil { - return rpt, err - } + ecsInfo := client.getInfo(taskArns) // Create all the services first for serviceName, service := range ecsInfo.services { serviceID := report.MakeECSServiceNodeID(serviceName) rpt.ECSService = rpt.ECSService.AddNode(report.MakeNodeWith(serviceID, map[string]string{ Cluster: cluster, - ServiceDesiredCount: fmt.Sprintf("%d", *service.DesiredCount), - ServiceRunningCount: fmt.Sprintf("%d", *service.RunningCount), + ServiceDesiredCount: fmt.Sprintf("%d", service.desiredCount), + ServiceRunningCount: fmt.Sprintf("%d", service.runningCount), })) } log.Debugf("Created %v ECS service nodes", len(ecsInfo.services)) @@ -127,7 +124,7 @@ func (Reporter) Tag(rpt report.Report) (report.Report, error) { node := report.MakeNodeWith(taskID, map[string]string{ TaskFamily: info.family, Cluster: cluster, - CreatedAt: task.CreatedAt.Format(time.RFC3339Nano), + CreatedAt: task.createdAt.Format(time.RFC3339Nano), }) rpt.ECSTask = rpt.ECSTask.AddNode(node) From 9d1e46f81b92c9a11eab67ecfd3bec3f78251dd2 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 7 Dec 2016 01:18:27 -0800 Subject: [PATCH 02/19] ECS reporter: Use persistent client objects across reports Not only does this allow us to re-use connections, but vitally it allows us to make use of the new task and service caching within the client object. --- probe/awsecs/reporter.go | 18 ++++++++++++++---- prog/probe.go | 2 +- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index af1b762234..dda0d31f2b 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -78,10 +78,17 @@ func getLabelInfo(rpt report.Report) map[string]map[string]*taskLabelInfo { // Reporter implements Tagger, Reporter type Reporter struct { + clients map[string]ecsClient +} + +func New() Reporter { + return Reporter{ + clients: map[string]ecsClient{}, + } } // Tag needed for Tagger -func (Reporter) Tag(rpt report.Report) (report.Report, error) { +func (r Reporter) Tag(rpt report.Report) (report.Report, error) { rpt = rpt.Copy() clusterMap := getLabelInfo(rpt) @@ -89,9 +96,12 @@ func (Reporter) Tag(rpt report.Report) (report.Report, error) { for cluster, taskMap := range clusterMap { log.Debugf("Fetching ECS info for cluster %v with %v tasks", cluster, len(taskMap)) - client, err := newClient(cluster) - if err != nil { - return rpt, err + client, ok := r.clients[cluster] + if !ok { + client, err := newClient(cluster) + if err != nil { + return rpt, err + } } taskArns := make([]string, 0, len(taskMap)) diff --git a/prog/probe.go b/prog/probe.go index f4030ea219..cd8ca164ad 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -206,7 +206,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { } if flags.ecsEnabled { - reporter := awsecs.Reporter{} + reporter := awsecs.New() p.AddReporter(reporter) p.AddTagger(reporter) } From 357136721d74af184f3fcc3b4771eea3c9fad090 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 7 Dec 2016 02:24:55 -0800 Subject: [PATCH 03/19] Fix compile errors and go fmt --- probe/awsecs/client.go | 120 ++++++++++++++++++++------------------- probe/awsecs/reporter.go | 7 ++- 2 files changed, 67 insertions(+), 60 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index a387939f54..cb0811230f 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -1,6 +1,7 @@ package awsecs import ( + "strings" "sync" "time" @@ -13,17 +14,17 @@ import ( // a wrapper around an AWS client that makes all the needed calls and just exposes the final results type ecsClient struct { - client *ecs.ECS - cluster string - taskCache map[string]ecsTask + client *ecs.ECS + cluster string + taskCache map[string]ecsTask serviceCache map[string]ecsService } // Since we're caching tasks heavily, we ensure no mistakes by casting into a structure // that only contains immutable attributes of the resource. type ecsTask struct { - taskARN string - createdAt time.Time + taskARN string + createdAt time.Time taskDefinitionARN string // These started fields are immutable once set, and guarenteed to be set once the task is running, @@ -32,7 +33,7 @@ type ecsTask struct { startedBy string // tag or deployment id // Metadata about this cache copy - fetchedAt time.Time + fetchedAt time.Time lastUsedAt time.Time } @@ -41,17 +42,16 @@ type ecsTask struct { // but we avoid re-listing services unless we can't find a service for a task. type ecsService struct { serviceName string - createdAt time.Time // The following values may be stale in a cached copy - deploymentIDs []string - desiredCount int64 - pendingCount int64 - runningCount int64 + deploymentIDs []string + desiredCount int64 + pendingCount int64 + runningCount int64 taskDefinitionARN string // Metadata about this cache copy - fetchedAt time.Time + fetchedAt time.Time lastUsedAt time.Time } @@ -70,42 +70,41 @@ func newClient(cluster string) (*ecsClient, error) { } return &ecsClient{ - client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), - cluster: cluster, - taskCache: map[string]ecsTask{}, + client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), + cluster: cluster, + taskCache: map[string]ecsTask{}, serviceCache: map[string]ecsService{}, }, nil } func newECSTask(task *ecs.Task) ecsTask { - now = time.Now() + now := time.Now() return ecsTask{ - taskARN: *task.TaskARN, - createdAt: *task.CreatedAt, + taskARN: *task.TaskArn, + createdAt: *task.CreatedAt, taskDefinitionARN: *task.TaskDefinitionArn, - startedAt: *task.StartedAt, - startedBy: *task.StartedBy, - fetchedAt: now, - lastUsedAt: now, + startedAt: *task.StartedAt, + startedBy: *task.StartedBy, + fetchedAt: now, + lastUsedAt: now, } } func newECSService(service *ecs.Service) ecsService { - now = time.Now() - deploymentIDs = make([]string, 0, len(service.Deployments)) + now := time.Now() + deploymentIDs := make([]string, 0, len(service.Deployments)) for _, deployment := range service.Deployments { - deploymentIDs = append(deploymentIDs, *deployment.ID) + deploymentIDs = append(deploymentIDs, *deployment.Id) } return ecsService{ - serviceName: *service.ServiceName, - createdAt: *service.CreatedAt, - deploymentIDs: deploymentIDs, - desiredCount: *service.DesiredCount, - pendingCount: *service.PendingCount, - runningCount: *service.RunningCount, - taskDefinitionARN: *service.TaskDefinitionARN, - fetchedAt: now, - lastUsedAt: now, + serviceName: *service.ServiceName, + deploymentIDs: deploymentIDs, + desiredCount: *service.DesiredCount, + pendingCount: *service.PendingCount, + runningCount: *service.RunningCount, + taskDefinitionARN: *service.TaskDefinition, + fetchedAt: now, + lastUsedAt: now, } } @@ -118,9 +117,10 @@ func (c ecsClient) listServices() <-chan string { &ecs.ListServicesInput{Cluster: &c.cluster}, func(page *ecs.ListServicesOutput, lastPage bool) bool { for _, arn := range page.ServiceArns { - results <- arn + results <- *arn } - } + return true + }, ) if err != nil { log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err) @@ -134,6 +134,7 @@ func (c ecsClient) listServices() <-chan string { // with full ecsService objects being put into the cache. Closes done when finished. func (c ecsClient) describeServices() (chan<- string, <-chan bool) { input := make(chan string) + done := make(chan bool) go func() { const MAX_SERVICES = 10 // How many services we can put in one Describe command @@ -149,9 +150,14 @@ func (c ecsClient) describeServices() (chan<- string, <-chan bool) { go func(arns []string) { defer group.Done() + arnPtrs := make([]*string, 0, len(arns)) + for i := range arns { + arnPtrs = append(arnPtrs, &arns[i]) + } + resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{ Cluster: &c.cluster, - Services: arns, + Services: arnPtrs, }) if err != nil { log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err) @@ -162,11 +168,11 @@ func (c ecsClient) describeServices() (chan<- string, <-chan bool) { log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason) } - mutex.Lock() + lock.Lock() for _, service := range resp.Services { c.serviceCache[*service.ServiceName] = newECSService(service) } - mutex.Unlock() + lock.Unlock() }(page) page = make([]string, 0, MAX_SERVICES) @@ -181,10 +187,10 @@ func (c ecsClient) describeServices() (chan<- string, <-chan bool) { } // get details on given tasks, updating cache with the results -func (c ecsClient) getTasks(taskArns []string) { - taskPtrs := make([]*string, len(taskArns)) - for i := range taskArns { - taskPtrs[i] = &taskArns[i] +func (c ecsClient) getTasks(taskARNs []string) { + taskPtrs := make([]*string, len(taskARNs)) + for i := range taskARNs { + taskPtrs[i] = &taskARNs[i] } // You'd think there's a limit on how many tasks can be described here, @@ -210,16 +216,16 @@ func (c ecsClient) getTasks(taskArns []string) { // Evict entries from the caches which have not been used within the eviction interval. func (c ecsClient) evictOldCacheItems() { const EVICT_TIME = time.Minute - now = time.Now() + now := time.Now() for arn, task := range c.taskCache { - if now - task.lastUsedAt > EVICT_TIME { + if now.Sub(task.lastUsedAt) > EVICT_TIME { delete(c.taskCache, arn) } } for name, service := range c.serviceCache { - if now - service.lastUsedAt > EVICT_TIME { + if now.Sub(service.lastUsedAt) > EVICT_TIME { delete(c.serviceCache, name) } } @@ -233,7 +239,7 @@ func (c ecsClient) matchTasksServices(taskARNs []string) (map[string]string, []s deploymentMap := map[string]string{} for serviceName, service := range c.serviceCache { - for _, deployment := range service.DeploymentIDs { + for _, deployment := range service.deploymentIDs { deploymentMap[deployment] = serviceName } } @@ -242,16 +248,16 @@ func (c ecsClient) matchTasksServices(taskARNs []string) (map[string]string, []s unmatched := []string{} for _, taskARN := range taskARNs { task, ok := c.taskCache[taskARN] - if ! ok { + if !ok { // this can happen if we have a failure while describing tasks, just pretend the task doesn't exist continue } - if ! strings.HasPrefix(task.startedBy, SERVICE_PREFIX) { + if !strings.HasPrefix(task.startedBy, SERVICE_PREFIX) { // task was not started by a service continue } - if service, ok := deploymentMap[task.startedBy]; ok { - results[taskARN] = service.serviceName + if serviceName, ok := deploymentMap[task.startedBy]; ok { + results[taskARN] = serviceName } else { unmatched = append(unmatched, taskARN) } @@ -267,7 +273,7 @@ func (c ecsClient) getInfo(taskARNs []string) ecsInfo { // First, we ensure we have all the tasks we need, and fetch the ones we don't. // We also mark the tasks as being used here to prevent eviction. tasksToFetch := []string{} - now = Time.Now() + now := time.Now() for _, taskARN := range taskARNs { if task, ok := c.taskCache[taskARN]; ok { task.lastUsedAt = now @@ -284,9 +290,9 @@ func (c ecsClient) getInfo(taskARNs []string) ecsInfo { taskServiceMap, unmatched := c.matchTasksServices(taskARNs) // In order to ensure service details are fresh, we need to refresh any referenced services - toDescribe, done := describeServices() + toDescribe, done := c.describeServices() servicesRefreshed := map[string]bool{} - for taskARN, serviceName := range taskServiceMap { + for _, serviceName := range taskServiceMap { if servicesRefreshed[serviceName] { continue } @@ -305,11 +311,11 @@ func (c ecsClient) getInfo(taskARNs []string) ecsInfo { // If we still have tasks unmatched, we'll have to try harder. Get a list of all services and, // if not already refreshed, fetch them. if len(unmatched) > 0 { - serviceNamesChan := listServices() - toDescribe, done := describeServices() + serviceNamesChan := c.listServices() + toDescribe, done := c.describeServices() go func() { for serviceName := range serviceNamesChan { - if ! servicesRefreshed[serviceName] { + if !servicesRefreshed[serviceName] { toDescribe <- serviceName servicesRefreshed[serviceName] = true } diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index dda0d31f2b..e46f7e01a8 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -78,12 +78,12 @@ func getLabelInfo(rpt report.Report) map[string]map[string]*taskLabelInfo { // Reporter implements Tagger, Reporter type Reporter struct { - clients map[string]ecsClient + clients map[string]*ecsClient } func New() Reporter { return Reporter{ - clients: map[string]ecsClient{}, + clients: map[string]*ecsClient{}, } } @@ -98,7 +98,8 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) { client, ok := r.clients[cluster] if !ok { - client, err := newClient(cluster) + var err error // can't use := on the next line without shadowing outer client var + client, err = newClient(cluster) if err != nil { return rpt, err } From 4234888bf4c4cde71a2d2eb6101b585cfbf7d358 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 7 Dec 2016 02:33:22 -0800 Subject: [PATCH 04/19] ecs: Linter fixes --- probe/awsecs/client.go | 20 ++++++++++---------- probe/awsecs/reporter.go | 1 + 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index cb0811230f..2fd526448c 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -27,7 +27,7 @@ type ecsTask struct { createdAt time.Time taskDefinitionARN string - // These started fields are immutable once set, and guarenteed to be set once the task is running, + // These started fields are immutable once set, and guaranteed to be set once the task is running, // which we know it is because otherwise we wouldn't be looking at it. startedAt time.Time startedBy string // tag or deployment id @@ -137,14 +137,14 @@ func (c ecsClient) describeServices() (chan<- string, <-chan bool) { done := make(chan bool) go func() { - const MAX_SERVICES = 10 // How many services we can put in one Describe command + const maxServices = 10 // How many services we can put in one Describe command group := sync.WaitGroup{} lock := sync.Mutex{} // mediates access to the service cache when writing results - page := make([]string, 0, MAX_SERVICES) + page := make([]string, 0, maxServices) for arn := range input { page = append(page, arn) - if len(page) == MAX_SERVICES { + if len(page) == maxServices { group.Add(1) go func(arns []string) { @@ -175,7 +175,7 @@ func (c ecsClient) describeServices() (chan<- string, <-chan bool) { lock.Unlock() }(page) - page = make([]string, 0, MAX_SERVICES) + page = make([]string, 0, maxServices) } } @@ -215,17 +215,17 @@ func (c ecsClient) getTasks(taskARNs []string) { // Evict entries from the caches which have not been used within the eviction interval. func (c ecsClient) evictOldCacheItems() { - const EVICT_TIME = time.Minute + const evictTime = time.Minute now := time.Now() for arn, task := range c.taskCache { - if now.Sub(task.lastUsedAt) > EVICT_TIME { + if now.Sub(task.lastUsedAt) > evictTime { delete(c.taskCache, arn) } } for name, service := range c.serviceCache { - if now.Sub(service.lastUsedAt) > EVICT_TIME { + if now.Sub(service.lastUsedAt) > evictTime { delete(c.serviceCache, name) } } @@ -235,7 +235,7 @@ func (c ecsClient) evictOldCacheItems() { // Returns (task to service map, unmatched tasks). Ignores tasks whose startedby values // don't appear to point to a service. func (c ecsClient) matchTasksServices(taskARNs []string) (map[string]string, []string) { - const SERVICE_PREFIX = "aws-svc" // TODO confirm this + const servicePrefix = "aws-svc" // TODO confirm this deploymentMap := map[string]string{} for serviceName, service := range c.serviceCache { @@ -252,7 +252,7 @@ func (c ecsClient) matchTasksServices(taskARNs []string) (map[string]string, []s // this can happen if we have a failure while describing tasks, just pretend the task doesn't exist continue } - if !strings.HasPrefix(task.startedBy, SERVICE_PREFIX) { + if !strings.HasPrefix(task.startedBy, servicePrefix) { // task was not started by a service continue } diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index e46f7e01a8..e8d992616f 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -81,6 +81,7 @@ type Reporter struct { clients map[string]*ecsClient } +// New creates a new Reporter func New() Reporter { return Reporter{ clients: map[string]*ecsClient{}, From 1d638307923a612d67628db80fed0fc5a37e0381 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Fri, 9 Dec 2016 18:23:55 -0800 Subject: [PATCH 05/19] awsecs reporter: Add lots of debug logging and fix bugs - describeServices wasn't describing the partial page left over at the end, which would cause incorrect results - the shim between listServices and describeServices was closing the channel every iteration, which would cause panic for write to closed channel - client was not being saved when created, so it gets recreated each time - we were describeTasks'ing even if we had no tasks to describe --- probe/awsecs/client.go | 98 ++++++++++++++++++++++++++-------------- probe/awsecs/reporter.go | 3 ++ 2 files changed, 68 insertions(+), 33 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index 2fd526448c..9dfe58a1c2 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -111,12 +111,15 @@ func newECSService(service *ecs.Service) ecsService { // Returns a channel from which service ARNs can be read. // Cannot fail as it will attempt to deliver partial results, though that may end up being no results. func (c ecsClient) listServices() <-chan string { + log.Debugf("Listing ECS services") results := make(chan string) go func() { + count := 0 err := c.client.ListServicesPages( &ecs.ListServicesInput{Cluster: &c.cluster}, func(page *ecs.ListServicesOutput, lastPage bool) bool { for _, arn := range page.ServiceArns { + count++ results <- *arn } return true @@ -125,6 +128,7 @@ func (c ecsClient) listServices() <-chan string { if err != nil { log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err) } + log.Debugf("Listed %d services", count) close(results) }() return results @@ -136,49 +140,62 @@ func (c ecsClient) describeServices() (chan<- string, <-chan bool) { input := make(chan string) done := make(chan bool) + log.Debugf("Describing ECS services") + go func() { const maxServices = 10 // How many services we can put in one Describe command group := sync.WaitGroup{} lock := sync.Mutex{} // mediates access to the service cache when writing results + describePage := func(arns []string) { + defer group.Done() + + arnPtrs := make([]*string, 0, len(arns)) + for i := range arns { + arnPtrs = append(arnPtrs, &arns[i]) + } + + resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{ + Cluster: &c.cluster, + Services: arnPtrs, + }) + if err != nil { + log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err) + return + } + + for _, failure := range resp.Failures { + log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason) + } + + lock.Lock() + for _, service := range resp.Services { + c.serviceCache[*service.ServiceName] = newECSService(service) + } + lock.Unlock() + } + + count := 0 // this is just for logging + calls := 0 // this is just for logging page := make([]string, 0, maxServices) for arn := range input { page = append(page, arn) if len(page) == maxServices { group.Add(1) - - go func(arns []string) { - defer group.Done() - - arnPtrs := make([]*string, 0, len(arns)) - for i := range arns { - arnPtrs = append(arnPtrs, &arns[i]) - } - - resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{ - Cluster: &c.cluster, - Services: arnPtrs, - }) - if err != nil { - log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err) - return - } - - for _, failure := range resp.Failures { - log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason) - } - - lock.Lock() - for _, service := range resp.Services { - c.serviceCache[*service.ServiceName] = newECSService(service) - } - lock.Unlock() - }(page) - + go describePage(page) + count += len(page) + calls++ page = make([]string, 0, maxServices) } } + if len(page) > 0 { + group.Add(1) + go describePage(page) + count += len(page) + calls++ + } + log.Debugf("Described %d services in %d calls", count, calls) group.Wait() close(done) }() @@ -188,6 +205,8 @@ func (c ecsClient) describeServices() (chan<- string, <-chan bool) { // get details on given tasks, updating cache with the results func (c ecsClient) getTasks(taskARNs []string) { + log.Debugf("Describing %d ECS tasks", len(taskARNs)) + taskPtrs := make([]*string, len(taskARNs)) for i := range taskARNs { taskPtrs[i] = &taskARNs[i] @@ -218,24 +237,30 @@ func (c ecsClient) evictOldCacheItems() { const evictTime = time.Minute now := time.Now() + count := 0 for arn, task := range c.taskCache { if now.Sub(task.lastUsedAt) > evictTime { delete(c.taskCache, arn) + count++ } } + log.Debugf("Evicted %d old tasks", count) + count = 0 for name, service := range c.serviceCache { if now.Sub(service.lastUsedAt) > evictTime { delete(c.serviceCache, name) + count++ } } + log.Debugf("Evicted %d old services", count) } // Try to match a list of task ARNs to service names using cached info. // Returns (task to service map, unmatched tasks). Ignores tasks whose startedby values // don't appear to point to a service. func (c ecsClient) matchTasksServices(taskARNs []string) (map[string]string, []string) { - const servicePrefix = "aws-svc" // TODO confirm this + const servicePrefix = "ecs-svc" deploymentMap := map[string]string{} for serviceName, service := range c.serviceCache { @@ -243,6 +268,7 @@ func (c ecsClient) matchTasksServices(taskARNs []string) (map[string]string, []s deploymentMap[deployment] = serviceName } } + log.Debugf("Mapped %d deployments from %d services", len(deploymentMap), len(c.serviceCache)) results := map[string]string{} unmatched := []string{} @@ -263,11 +289,13 @@ func (c ecsClient) matchTasksServices(taskARNs []string) (map[string]string, []s } } + log.Debugf("Matched %d from %d tasks, %d unmatched", len(results), len(taskARNs), len(unmatched)) return results, unmatched } // Returns a ecsInfo struct containing data needed for a report. func (c ecsClient) getInfo(taskARNs []string) ecsInfo { + log.Debugf("Getting ECS info on %d tasks", len(taskARNs)) // We do a weird order of operations here to minimize unneeded cache refreshes. // First, we ensure we have all the tasks we need, and fetch the ones we don't. @@ -281,8 +309,10 @@ func (c ecsClient) getInfo(taskARNs []string) ecsInfo { tasksToFetch = append(tasksToFetch, taskARN) } } - // This might not fully succeed, but we only try once and ignore any further missing tasks. - c.getTasks(tasksToFetch) + if len(tasksToFetch) > 0 { + // This might not fully succeed, but we only try once and ignore any further missing tasks. + c.getTasks(tasksToFetch) + } // We're going to do this matching process potentially several times, but that's ok - it's quite cheap. // First, we want to see how far we get with existing data, and identify the set of services @@ -290,6 +320,7 @@ func (c ecsClient) getInfo(taskARNs []string) ecsInfo { taskServiceMap, unmatched := c.matchTasksServices(taskARNs) // In order to ensure service details are fresh, we need to refresh any referenced services + log.Debugf("Refreshing ECS services") toDescribe, done := c.describeServices() servicesRefreshed := map[string]bool{} for _, serviceName := range taskServiceMap { @@ -310,6 +341,7 @@ func (c ecsClient) getInfo(taskARNs []string) ecsInfo { // If we still have tasks unmatched, we'll have to try harder. Get a list of all services and, // if not already refreshed, fetch them. + log.Debugf("After refreshing services, %d tasks unmatched", len(unmatched)) if len(unmatched) > 0 { serviceNamesChan := c.listServices() toDescribe, done := c.describeServices() @@ -319,8 +351,8 @@ func (c ecsClient) getInfo(taskARNs []string) ecsInfo { toDescribe <- serviceName servicesRefreshed[serviceName] = true } - close(toDescribe) } + close(toDescribe) }() <-done diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index e8d992616f..1f1263355e 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -99,11 +99,13 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) { client, ok := r.clients[cluster] if !ok { + log.Debugf("Creating new ECS client") var err error // can't use := on the next line without shadowing outer client var client, err = newClient(cluster) if err != nil { return rpt, err } + r.clients[cluster] = client } taskArns := make([]string, 0, len(taskMap)) @@ -112,6 +114,7 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) { } ecsInfo := client.getInfo(taskArns) + log.Debugf("Got info from ECS: %d tasks, %d services", len(ecsInfo.tasks), len(ecsInfo.services)) // Create all the services first for serviceName, service := range ecsInfo.services { From 7ebb76d0a3082c36a2f2e4f95c3a3a20f3269594 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 12 Dec 2016 13:49:26 -0800 Subject: [PATCH 06/19] ecs reporter: Move some code around to break up large function --- probe/awsecs/client.go | 105 ++++++++++++++++++++++++----------------- 1 file changed, 62 insertions(+), 43 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index 9dfe58a1c2..54fb515317 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -16,8 +16,8 @@ import ( type ecsClient struct { client *ecs.ECS cluster string - taskCache map[string]ecsTask - serviceCache map[string]ecsService + taskCache map[string]ecsTask // keys are task ARNs + serviceCache map[string]ecsService // keys are service names } // Since we're caching tasks heavily, we ensure no mistakes by casting into a structure @@ -293,13 +293,7 @@ func (c ecsClient) matchTasksServices(taskARNs []string) (map[string]string, []s return results, unmatched } -// Returns a ecsInfo struct containing data needed for a report. -func (c ecsClient) getInfo(taskARNs []string) ecsInfo { - log.Debugf("Getting ECS info on %d tasks", len(taskARNs)) - - // We do a weird order of operations here to minimize unneeded cache refreshes. - // First, we ensure we have all the tasks we need, and fetch the ones we don't. - // We also mark the tasks as being used here to prevent eviction. +func (c ecsClient) ensureTasks(taskARNs []string) { tasksToFetch := []string{} now := time.Now() for _, taskARN := range taskARNs { @@ -313,14 +307,9 @@ func (c ecsClient) getInfo(taskARNs []string) ecsInfo { // This might not fully succeed, but we only try once and ignore any further missing tasks. c.getTasks(tasksToFetch) } +} - // We're going to do this matching process potentially several times, but that's ok - it's quite cheap. - // First, we want to see how far we get with existing data, and identify the set of services - // we'll need to refresh regardless. - taskServiceMap, unmatched := c.matchTasksServices(taskARNs) - - // In order to ensure service details are fresh, we need to refresh any referenced services - log.Debugf("Refreshing ECS services") +func (c ecsClient) refreshServices(taskServiceMap map[string]string) map[string]bool { toDescribe, done := c.describeServices() servicesRefreshed := map[string]bool{} for _, serviceName := range taskServiceMap { @@ -332,35 +321,25 @@ func (c ecsClient) getInfo(taskARNs []string) ecsInfo { } close(toDescribe) <-done + return servicesRefreshed +} - // In refreshing, we may have picked up any new deployment ids. - // If we still have tasks unmatched, we try again. - if len(unmatched) > 0 { - taskServiceMap, unmatched = c.matchTasksServices(taskARNs) - } - - // If we still have tasks unmatched, we'll have to try harder. Get a list of all services and, - // if not already refreshed, fetch them. - log.Debugf("After refreshing services, %d tasks unmatched", len(unmatched)) - if len(unmatched) > 0 { - serviceNamesChan := c.listServices() - toDescribe, done := c.describeServices() - go func() { - for serviceName := range serviceNamesChan { - if !servicesRefreshed[serviceName] { - toDescribe <- serviceName - servicesRefreshed[serviceName] = true - } +func (c ecsClient) describeAllServices(servicesRefreshed map[string]bool) { + serviceNamesChan := c.listServices() + toDescribe, done := c.describeServices() + go func() { + for serviceName := range serviceNamesChan { + if !servicesRefreshed[serviceName] { + toDescribe <- serviceName + servicesRefreshed[serviceName] = true } - close(toDescribe) - }() - <-done - - taskServiceMap, unmatched = c.matchTasksServices(taskARNs) - // If we still have unmatched at this point, we don't care - this may be due to partial failures, - // race conditions, and other weirdness. - } + } + close(toDescribe) + }() + <-done +} +func (c ecsClient) makeECSInfo(taskARNs []string, taskServiceMap map[string]string) ecsInfo { // The maps to return are the referenced subsets of the full caches tasks := map[string]ecsTask{} for _, taskARN := range taskARNs { @@ -385,7 +364,47 @@ func (c ecsClient) getInfo(taskARNs []string) ecsInfo { } } + return ecsInfo{services: services, tasks: tasks, taskServiceMap: taskServiceMap} +} + +// Returns a ecsInfo struct containing data needed for a report. +func (c ecsClient) getInfo(taskARNs []string) ecsInfo { + log.Debugf("Getting ECS info on %d tasks", len(taskARNs)) + + // We do a weird order of operations here to minimize unneeded cache refreshes. + // First, we ensure we have all the tasks we need, and fetch the ones we don't. + // We also mark the tasks as being used here to prevent eviction. + c.ensureTasks(taskARNs) + + // We're going to do this matching process potentially several times, but that's ok - it's quite cheap. + // First, we want to see how far we get with existing data, and identify the set of services + // we'll need to refresh regardless. + taskServiceMap, unmatched := c.matchTasksServices(taskARNs) + + // In order to ensure service details are fresh, we need to refresh any referenced services + log.Debugf("Refreshing ECS services") + servicesRefreshed := c.refreshServices(taskServiceMap) + + // In refreshing, we may have picked up any new deployment ids. + // If we still have tasks unmatched, we try again. + if len(unmatched) > 0 { + taskServiceMap, unmatched = c.matchTasksServices(taskARNs) + } + + // If we still have tasks unmatched, we'll have to try harder. Get a list of all services and, + // if not already refreshed, fetch them. + log.Debugf("After refreshing services, %d tasks unmatched", len(unmatched)) + if len(unmatched) > 0 { + c.describeAllServices(servicesRefreshed) + + taskServiceMap, unmatched = c.matchTasksServices(taskARNs) + // If we still have unmatched at this point, we don't care - this may be due to partial failures, + // race conditions, and other weirdness. + } + + info := c.makeECSInfo(taskARNs, taskServiceMap) + c.evictOldCacheItems() - return ecsInfo{services: services, tasks: tasks, taskServiceMap: taskServiceMap} + return info } From 7d845f9130be5ef60610c9069765d8ec50745114 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 12 Dec 2016 13:54:33 -0800 Subject: [PATCH 07/19] ecs reporter: Review feedback, some trivial renames --- probe/awsecs/reporter.go | 10 +++++----- prog/probe.go | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index 1f1263355e..346d18609b 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -78,13 +78,13 @@ func getLabelInfo(rpt report.Report) map[string]map[string]*taskLabelInfo { // Reporter implements Tagger, Reporter type Reporter struct { - clients map[string]*ecsClient + clientsByCluster map[string]*ecsClient } // New creates a new Reporter -func New() Reporter { +func Make() Reporter { return Reporter{ - clients: map[string]*ecsClient{}, + clientsByCluster: map[string]*ecsClient{}, } } @@ -97,7 +97,7 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) { for cluster, taskMap := range clusterMap { log.Debugf("Fetching ECS info for cluster %v with %v tasks", cluster, len(taskMap)) - client, ok := r.clients[cluster] + client, ok := r.clientsByCluster[cluster] if !ok { log.Debugf("Creating new ECS client") var err error // can't use := on the next line without shadowing outer client var @@ -105,7 +105,7 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) { if err != nil { return rpt, err } - r.clients[cluster] = client + r.clientsByCluster[cluster] = client } taskArns := make([]string, 0, len(taskMap)) diff --git a/prog/probe.go b/prog/probe.go index cd8ca164ad..fd183a30bb 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -206,7 +206,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { } if flags.ecsEnabled { - reporter := awsecs.New() + reporter := awsecs.Make() p.AddReporter(reporter) p.AddTagger(reporter) } From 6f2efca968bfc6f8527fe919bd6dddfd781c43ef Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 12 Dec 2016 14:13:38 -0800 Subject: [PATCH 08/19] more review feedback --- probe/awsecs/reporter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index 346d18609b..caf63493f0 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -100,7 +100,7 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) { client, ok := r.clientsByCluster[cluster] if !ok { log.Debugf("Creating new ECS client") - var err error // can't use := on the next line without shadowing outer client var + var err error client, err = newClient(cluster) if err != nil { return rpt, err From adb6f9d4a1f2717457ea39ce6f24b05e770d9792 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 12 Dec 2016 14:45:09 -0800 Subject: [PATCH 09/19] Appease linter --- probe/awsecs/reporter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index caf63493f0..a3199ba0be 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -81,7 +81,7 @@ type Reporter struct { clientsByCluster map[string]*ecsClient } -// New creates a new Reporter +// Make creates a new Reporter func Make() Reporter { return Reporter{ clientsByCluster: map[string]*ecsClient{}, From 0fb74d6781516a005eeebee75de9063bc1d24cda Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 14 Dec 2016 16:42:15 -0800 Subject: [PATCH 10/19] ecs client: more refactoring for nice code pulls the inner function of describeServices into its own top-level function, makes the lock part of the client object as a result --- probe/awsecs/client.go | 107 +++++++++++++++++++++-------------------- 1 file changed, 56 insertions(+), 51 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index 54fb515317..cb1eeb9de4 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -16,8 +16,11 @@ import ( type ecsClient struct { client *ecs.ECS cluster string - taskCache map[string]ecsTask // keys are task ARNs - serviceCache map[string]ecsService // keys are service names + taskCache map[string]ecsTask // Keys are task ARNs. + serviceCache map[string]ecsService // Keys are service names. + // Governs write access to serviceCache. Note care must still be taken + // that no-one is also trying to read. + serviceCacheLock *sync.Mutex } // Since we're caching tasks heavily, we ensure no mistakes by casting into a structure @@ -70,10 +73,11 @@ func newClient(cluster string) (*ecsClient, error) { } return &ecsClient{ - client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), - cluster: cluster, - taskCache: map[string]ecsTask{}, - serviceCache: map[string]ecsService{}, + client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), + cluster: cluster, + taskCache: map[string]ecsTask{}, + serviceCache: map[string]ecsService{}, + serviceCacheLock: &sync.Mutex{}, }, nil } @@ -92,9 +96,9 @@ func newECSTask(task *ecs.Task) ecsTask { func newECSService(service *ecs.Service) ecsService { now := time.Now() - deploymentIDs := make([]string, 0, len(service.Deployments)) - for _, deployment := range service.Deployments { - deploymentIDs = append(deploymentIDs, *deployment.Id) + deploymentIDs := make([]string, len(service.Deployments)) + for i, deployment := range service.Deployments { + deploymentIDs[i] = *deployment.Id } return ecsService{ serviceName: *service.ServiceName, @@ -136,62 +140,37 @@ func (c ecsClient) listServices() <-chan string { // Returns (input, done) channels. Service ARNs given to input are batched and details are fetched, // with full ecsService objects being put into the cache. Closes done when finished. -func (c ecsClient) describeServices() (chan<- string, <-chan bool) { +func (c ecsClient) describeServices() (chan<- string, <-chan struct{}) { input := make(chan string) - done := make(chan bool) + done := make(chan struct{}) log.Debugf("Describing ECS services") go func() { const maxServices = 10 // How many services we can put in one Describe command group := sync.WaitGroup{} - lock := sync.Mutex{} // mediates access to the service cache when writing results - describePage := func(arns []string) { - defer group.Done() - - arnPtrs := make([]*string, 0, len(arns)) - for i := range arns { - arnPtrs = append(arnPtrs, &arns[i]) - } - - resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{ - Cluster: &c.cluster, - Services: arnPtrs, - }) - if err != nil { - log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err) - return - } - - for _, failure := range resp.Failures { - log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason) - } - - lock.Lock() - for _, service := range resp.Services { - c.serviceCache[*service.ServiceName] = newECSService(service) - } - lock.Unlock() - } + // count and calls is just for logging + count := 0 + calls := 0 - count := 0 // this is just for logging - calls := 0 // this is just for logging - page := make([]string, 0, maxServices) + batch := make([]string, 0, maxServices) for arn := range input { - page = append(page, arn) - if len(page) == maxServices { + batch = append(batch, arn) + if len(batch) == maxServices { group.Add(1) - go describePage(page) - count += len(page) + go func(arns []string) { + defer group.Done() + c.describeServicesBatch(arns) + }(batch) + count += len(batch) calls++ - page = make([]string, 0, maxServices) + batch = make([]string, 0, maxServices) } } - if len(page) > 0 { - group.Add(1) - go describePage(page) - count += len(page) + if len(batch) > 0 { + c.describeServicesBatch(batch) + count += len(batch) calls++ } @@ -203,6 +182,32 @@ func (c ecsClient) describeServices() (chan<- string, <-chan bool) { return input, done } +func (c ecsClient) describeServicesBatch(arns []string) { + arnPtrs := make([]*string, 0, len(arns)) + for i := range arns { + arnPtrs = append(arnPtrs, &arns[i]) + } + + resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{ + Cluster: &c.cluster, + Services: arnPtrs, + }) + if err != nil { + log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err) + return + } + + for _, failure := range resp.Failures { + log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason) + } + + c.serviceCacheLock.Lock() + for _, service := range resp.Services { + c.serviceCache[*service.ServiceName] = newECSService(service) + } + c.serviceCacheLock.Unlock() +} + // get details on given tasks, updating cache with the results func (c ecsClient) getTasks(taskARNs []string) { log.Debugf("Describing %d ECS tasks", len(taskARNs)) From 49d3e7bbd3759b968c311cf2990a0d4923fa02b6 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Fri, 16 Dec 2016 17:00:57 -0800 Subject: [PATCH 11/19] wip: --- probe/awsecs/reporter_test.go | 45 +++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 probe/awsecs/reporter_test.go diff --git a/probe/awsecs/reporter_test.go b/probe/awsecs/reporter_test.go new file mode 100644 index 0000000000..3a92b48ff6 --- /dev/null +++ b/probe/awsecs/reporter_test.go @@ -0,0 +1,45 @@ +package awsecs + +import ( + "reflect" + "testing" +) + +func TestGetLabelInfo(t *testing.T) { + r := Make() + rpt, err := r.Report() + if err != nil { + t.Fatal("Error making report", err) + } + labelInfo := r.getLabelInfo(rpt) + expected := map[string]map[string]*taskLabelInfo{} + if !reflect.DeepEqual(labelInfo, expected) { + t.Error("Empty report did not produce empty label info: %v != %v", labelInfo, expected) + } + + rpt.Containers = rpt.Containers.AddNode( + report.MakeNodeWith( + report.MakeContainerNodeID("test-container"), + map[string]string{ + docker.LabelPrefix + "com.amazonaws.ecs.task-arn": + "arn:aws:ecs:us-east-1:123456789012:task/12345678-9abc-def0-1234-56789abcdef0", + docker.LabelPrefix + "com.amazonaws.ecs.cluster": + "test-cluster", + docker.LabelPrefix + "com.amazonaws.ecs.task-definition-family": + "test-family", + } + ) + ) + labelInfo = r.getLabelInfo(rpt) + expected = map[string]map[string]*taskLabelInfo{ + "test-cluster": map[string]*taskLabelInfo{ + "arn:aws:ecs:us-east-1:123456789012:task/12345678-9abc-def0-1234-56789abcdef0": &taskLabelInfo{ + containerIDs: []string{"test-container"}, + family: "test-family", + } + } + } + if !reflect.DeepEqual(labelInfo, expected) { + t.Error("Did not get expected label info: %v != %v", labelInfo, expected) + } +} From e220ae822f5cd6cf8df90cf6cda1142339cf74a6 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 12 Jan 2017 07:11:12 -0800 Subject: [PATCH 12/19] wip: --- probe/awsecs/client.go | 37 +++++++++------ probe/awsecs/reporter_test.go | 88 ++++++++++++++++++++++++++++------- 2 files changed, 93 insertions(+), 32 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index cb1eeb9de4..b9d3a10e36 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -12,8 +12,15 @@ import ( "github.com/aws/aws-sdk-go/service/ecs" ) -// a wrapper around an AWS client that makes all the needed calls and just exposes the final results -type ecsClient struct { +// A wrapper around an AWS client that makes all the needed calls and just exposes the final results. +// We create an interface so we can mock for testing +type ecsClient interface { + // Returns a ecsInfo struct containing data needed for a report. + getInfo([]string) ecsInfo +} + +// actual implementation +type ecsClientImpl struct { client *ecs.ECS cluster string taskCache map[string]ecsTask // Keys are task ARNs. @@ -72,7 +79,7 @@ func newClient(cluster string) (*ecsClient, error) { return nil, err } - return &ecsClient{ + return &ecsClientImpl{ client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), cluster: cluster, taskCache: map[string]ecsTask{}, @@ -114,7 +121,7 @@ func newECSService(service *ecs.Service) ecsService { // Returns a channel from which service ARNs can be read. // Cannot fail as it will attempt to deliver partial results, though that may end up being no results. -func (c ecsClient) listServices() <-chan string { +func (c ecsClientImpl) listServices() <-chan string { log.Debugf("Listing ECS services") results := make(chan string) go func() { @@ -140,7 +147,7 @@ func (c ecsClient) listServices() <-chan string { // Returns (input, done) channels. Service ARNs given to input are batched and details are fetched, // with full ecsService objects being put into the cache. Closes done when finished. -func (c ecsClient) describeServices() (chan<- string, <-chan struct{}) { +func (c ecsClientImpl) describeServices() (chan<- string, <-chan struct{}) { input := make(chan string) done := make(chan struct{}) @@ -182,7 +189,7 @@ func (c ecsClient) describeServices() (chan<- string, <-chan struct{}) { return input, done } -func (c ecsClient) describeServicesBatch(arns []string) { +func (c ecsClientImpl) describeServicesBatch(arns []string) { arnPtrs := make([]*string, 0, len(arns)) for i := range arns { arnPtrs = append(arnPtrs, &arns[i]) @@ -209,7 +216,7 @@ func (c ecsClient) describeServicesBatch(arns []string) { } // get details on given tasks, updating cache with the results -func (c ecsClient) getTasks(taskARNs []string) { +func (c ecsClientImpl) getTasks(taskARNs []string) { log.Debugf("Describing %d ECS tasks", len(taskARNs)) taskPtrs := make([]*string, len(taskARNs)) @@ -238,7 +245,7 @@ func (c ecsClient) getTasks(taskARNs []string) { } // Evict entries from the caches which have not been used within the eviction interval. -func (c ecsClient) evictOldCacheItems() { +func (c ecsClientImpl) evictOldCacheItems() { const evictTime = time.Minute now := time.Now() @@ -264,7 +271,7 @@ func (c ecsClient) evictOldCacheItems() { // Try to match a list of task ARNs to service names using cached info. // Returns (task to service map, unmatched tasks). Ignores tasks whose startedby values // don't appear to point to a service. -func (c ecsClient) matchTasksServices(taskARNs []string) (map[string]string, []string) { +func (c ecsClientImpl) matchTasksServices(taskARNs []string) (map[string]string, []string) { const servicePrefix = "ecs-svc" deploymentMap := map[string]string{} @@ -298,7 +305,7 @@ func (c ecsClient) matchTasksServices(taskARNs []string) (map[string]string, []s return results, unmatched } -func (c ecsClient) ensureTasks(taskARNs []string) { +func (c ecsClientImpl) ensureTasks(taskARNs []string) { tasksToFetch := []string{} now := time.Now() for _, taskARN := range taskARNs { @@ -314,7 +321,7 @@ func (c ecsClient) ensureTasks(taskARNs []string) { } } -func (c ecsClient) refreshServices(taskServiceMap map[string]string) map[string]bool { +func (c ecsClientImpl) refreshServices(taskServiceMap map[string]string) map[string]bool { toDescribe, done := c.describeServices() servicesRefreshed := map[string]bool{} for _, serviceName := range taskServiceMap { @@ -329,7 +336,7 @@ func (c ecsClient) refreshServices(taskServiceMap map[string]string) map[string] return servicesRefreshed } -func (c ecsClient) describeAllServices(servicesRefreshed map[string]bool) { +func (c ecsClientImpl) describeAllServices(servicesRefreshed map[string]bool) { serviceNamesChan := c.listServices() toDescribe, done := c.describeServices() go func() { @@ -344,7 +351,7 @@ func (c ecsClient) describeAllServices(servicesRefreshed map[string]bool) { <-done } -func (c ecsClient) makeECSInfo(taskARNs []string, taskServiceMap map[string]string) ecsInfo { +func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string]string) ecsInfo { // The maps to return are the referenced subsets of the full caches tasks := map[string]ecsTask{} for _, taskARN := range taskARNs { @@ -372,8 +379,8 @@ func (c ecsClient) makeECSInfo(taskARNs []string, taskServiceMap map[string]stri return ecsInfo{services: services, tasks: tasks, taskServiceMap: taskServiceMap} } -// Returns a ecsInfo struct containing data needed for a report. -func (c ecsClient) getInfo(taskARNs []string) ecsInfo { +// Implements ecsClient.getInfo +func (c ecsClientImpl) getInfo(taskARNs []string) ecsInfo { log.Debugf("Getting ECS info on %d tasks", len(taskARNs)) // We do a weird order of operations here to minimize unneeded cache refreshes. diff --git a/probe/awsecs/reporter_test.go b/probe/awsecs/reporter_test.go index 3a92b48ff6..ddcc55e408 100644 --- a/probe/awsecs/reporter_test.go +++ b/probe/awsecs/reporter_test.go @@ -5,6 +5,28 @@ import ( "testing" ) +const ( + testCluster = "test-cluster" + testFamily = "test-family" + testTaskARN = "arn:aws:ecs:us-east-1:123456789012:task/12345678-9abc-def0-1234-56789abcdef0" + testContainer = "test-container" + testContainerData = map[string]string{ + docker.LabelPrefix + "com.amazonaws.ecs.task-arn": + testTaskARN, + docker.LabelPrefix + "com.amazonaws.ecs.cluster": + testCluster, + docker.LabelPrefix + "com.amazonaws.ecs.task-definition-family": + testFamily, + } +) + +func getTestContainerNode() report.Node { + return report.MakeNodeWith( + report.MakeContainerNodeID("test-container"), + testContainerData + ) +} + func TestGetLabelInfo(t *testing.T) { r := Make() rpt, err := r.Report() @@ -17,25 +39,13 @@ func TestGetLabelInfo(t *testing.T) { t.Error("Empty report did not produce empty label info: %v != %v", labelInfo, expected) } - rpt.Containers = rpt.Containers.AddNode( - report.MakeNodeWith( - report.MakeContainerNodeID("test-container"), - map[string]string{ - docker.LabelPrefix + "com.amazonaws.ecs.task-arn": - "arn:aws:ecs:us-east-1:123456789012:task/12345678-9abc-def0-1234-56789abcdef0", - docker.LabelPrefix + "com.amazonaws.ecs.cluster": - "test-cluster", - docker.LabelPrefix + "com.amazonaws.ecs.task-definition-family": - "test-family", - } - ) - ) + rpt.Containers = rpt.Containers.AddNode(getTestContainerNode()) labelInfo = r.getLabelInfo(rpt) expected = map[string]map[string]*taskLabelInfo{ - "test-cluster": map[string]*taskLabelInfo{ - "arn:aws:ecs:us-east-1:123456789012:task/12345678-9abc-def0-1234-56789abcdef0": &taskLabelInfo{ - containerIDs: []string{"test-container"}, - family: "test-family", + testCluster: map[string]*taskLabelInfo{ + testTaskARN: &taskLabelInfo{ + containerIDs: []string{testContainer}, + family: testFamily, } } } @@ -43,3 +53,47 @@ func TestGetLabelInfo(t *testing.T) { t.Error("Did not get expected label info: %v != %v", labelInfo, expected) } } + +// Implements ecsClient +type mockEcsClient { + t *testing.T + expectedARNs []string + info ecsInfo +} + +func newMockEcsClient(t *testing.T, expectedARNs []string, info ecsInfo) *ecsClient { + return &mockEcsClient{ + t, + expectedARNs, + info, + } +} + +func (c mockEcsClient) getInfo(taskARNs []string) ecsInfo { + if !reflect.DeepEqual(taskARNs, c.expectedARNs) { + c.t.Fatal("getInfo called with wrong ARNs: %v != %v", taskARNs, c.expectedARNs) + } + return c.info +} + +func TestTagReport(t *testing.T) { + r := Make() + + r.clientsByCluster[testCluster] = newMockEcsClient( + t, + []string{}, + ecsInfo{ + // TODO fill in values below + tasks: map[string]ecsTask{}, + services: map[string]ecsService{}, + taskServiceMap: map[string]string{}, + }, + ) + + rpt, err := r.Report() + if err != nil { + t.Fatal("Error making report") + } + rpt = r.Tag(rpt) + // TODO check it matches +} From 513977081d446c7eeb06dabbf6f60643a0cff1b2 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Wed, 11 Jan 2017 18:19:24 -0800 Subject: [PATCH 13/19] aws ecs probe: Use a size and time bound LRU gcache for caching instead of our own hand-rolled size-unbound cache --- probe/awsecs/client.go | 93 ++++++++++++---------------------------- probe/awsecs/reporter.go | 2 +- 2 files changed, 28 insertions(+), 67 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index b9d3a10e36..6eeccee608 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -10,6 +10,7 @@ import ( "github.com/aws/aws-sdk-go/aws/ec2metadata" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ecs" + "github.com/bluele/gcache" ) // A wrapper around an AWS client that makes all the needed calls and just exposes the final results. @@ -23,11 +24,8 @@ type ecsClient interface { type ecsClientImpl struct { client *ecs.ECS cluster string - taskCache map[string]ecsTask // Keys are task ARNs. - serviceCache map[string]ecsService // Keys are service names. - // Governs write access to serviceCache. Note care must still be taken - // that no-one is also trying to read. - serviceCacheLock *sync.Mutex + taskCache gcache.Cache // Keys are task ARNs. + serviceCache gcache.Cache // Keys are service names. } // Since we're caching tasks heavily, we ensure no mistakes by casting into a structure @@ -41,10 +39,6 @@ type ecsTask struct { // which we know it is because otherwise we wouldn't be looking at it. startedAt time.Time startedBy string // tag or deployment id - - // Metadata about this cache copy - fetchedAt time.Time - lastUsedAt time.Time } // Services are highly mutable and so we can only cache them on a best-effort basis. @@ -52,17 +46,12 @@ type ecsTask struct { // but we avoid re-listing services unless we can't find a service for a task. type ecsService struct { serviceName string - // The following values may be stale in a cached copy deploymentIDs []string desiredCount int64 pendingCount int64 runningCount int64 taskDefinitionARN string - - // Metadata about this cache copy - fetchedAt time.Time - lastUsedAt time.Time } type ecsInfo struct { @@ -71,7 +60,7 @@ type ecsInfo struct { taskServiceMap map[string]string } -func newClient(cluster string) (*ecsClient, error) { +func newClient(cluster string, cacheSize int, cacheExpiry time.Duration) (*ecsClient, error) { sess := session.New() region, err := ec2metadata.New(sess).Region() @@ -80,29 +69,24 @@ func newClient(cluster string) (*ecsClient, error) { } return &ecsClientImpl{ - client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), - cluster: cluster, - taskCache: map[string]ecsTask{}, - serviceCache: map[string]ecsService{}, - serviceCacheLock: &sync.Mutex{}, + client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), + cluster: cluster, + taskCache: gcache.New(cacheSize).LRU().Expiration(cacheExpiry).Build(), + serviceCache: gcache.New(cacheSize).LRU().Expiration(cacheExpiry).Build(), }, nil } func newECSTask(task *ecs.Task) ecsTask { - now := time.Now() return ecsTask{ taskARN: *task.TaskArn, createdAt: *task.CreatedAt, taskDefinitionARN: *task.TaskDefinitionArn, startedAt: *task.StartedAt, startedBy: *task.StartedBy, - fetchedAt: now, - lastUsedAt: now, } } func newECSService(service *ecs.Service) ecsService { - now := time.Now() deploymentIDs := make([]string, len(service.Deployments)) for i, deployment := range service.Deployments { deploymentIDs[i] = *deployment.Id @@ -114,8 +98,6 @@ func newECSService(service *ecs.Service) ecsService { pendingCount: *service.PendingCount, runningCount: *service.RunningCount, taskDefinitionARN: *service.TaskDefinition, - fetchedAt: now, - lastUsedAt: now, } } @@ -208,11 +190,9 @@ func (c ecsClientImpl) describeServicesBatch(arns []string) { log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason) } - c.serviceCacheLock.Lock() for _, service := range resp.Services { - c.serviceCache[*service.ServiceName] = newECSService(service) + c.serviceCache.Set(*service.ServiceName, newECSService(service)) } - c.serviceCacheLock.Unlock() } // get details on given tasks, updating cache with the results @@ -240,34 +220,10 @@ func (c ecsClientImpl) getTasks(taskARNs []string) { } for _, task := range resp.Tasks { - c.taskCache[*task.TaskArn] = newECSTask(task) + c.taskCache.Set(*task.TaskArn, newECSTask(task)) } } -// Evict entries from the caches which have not been used within the eviction interval. -func (c ecsClientImpl) evictOldCacheItems() { - const evictTime = time.Minute - now := time.Now() - - count := 0 - for arn, task := range c.taskCache { - if now.Sub(task.lastUsedAt) > evictTime { - delete(c.taskCache, arn) - count++ - } - } - log.Debugf("Evicted %d old tasks", count) - - count = 0 - for name, service := range c.serviceCache { - if now.Sub(service.lastUsedAt) > evictTime { - delete(c.serviceCache, name) - count++ - } - } - log.Debugf("Evicted %d old services", count) -} - // Try to match a list of task ARNs to service names using cached info. // Returns (task to service map, unmatched tasks). Ignores tasks whose startedby values // don't appear to point to a service. @@ -275,21 +231,29 @@ func (c ecsClientImpl) matchTasksServices(taskARNs []string) (map[string]string, const servicePrefix = "ecs-svc" deploymentMap := map[string]string{} - for serviceName, service := range c.serviceCache { + for _, serviceNameRaw := range c.serviceCache.Keys() { + serviceRaw, err := c.serviceCache.Get(serviceNameRaw) + if err != nil { + // This is rare, but possible if service was evicted after the loop began + continue + } + serviceName := serviceNameRaw.(string) + service := serviceRaw.(ecsService) for _, deployment := range service.deploymentIDs { deploymentMap[deployment] = serviceName } } - log.Debugf("Mapped %d deployments from %d services", len(deploymentMap), len(c.serviceCache)) + log.Debugf("Mapped %d deployments from %d services", len(deploymentMap), c.serviceCache.Len()) results := map[string]string{} unmatched := []string{} for _, taskARN := range taskARNs { - task, ok := c.taskCache[taskARN] - if !ok { + taskRaw, err := c.taskCache.Get(taskARN) + if err != nil { // this can happen if we have a failure while describing tasks, just pretend the task doesn't exist continue } + task := taskRaw.(ecsTask) if !strings.HasPrefix(task.startedBy, servicePrefix) { // task was not started by a service continue @@ -307,11 +271,8 @@ func (c ecsClientImpl) matchTasksServices(taskARNs []string) (map[string]string, func (c ecsClientImpl) ensureTasks(taskARNs []string) { tasksToFetch := []string{} - now := time.Now() for _, taskARN := range taskARNs { - if task, ok := c.taskCache[taskARN]; ok { - task.lastUsedAt = now - } else { + if _, err := c.taskCache.Get(taskARN); err != nil { tasksToFetch = append(tasksToFetch, taskARN) } } @@ -357,7 +318,8 @@ func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string] for _, taskARN := range taskARNs { // It's possible that tasks could still be missing from the cache if describe tasks failed. // We'll just pretend they don't exist. - if task, ok := c.taskCache[taskARN]; ok { + if taskRaw, err := c.taskCache.Get(taskARN); err == nil { + task := taskRaw.(ecsTask) tasks[taskARN] = task } } @@ -368,7 +330,8 @@ func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string] // Already present. This is expected since multiple tasks can map to the same service. continue } - if service, ok := c.serviceCache[serviceName]; ok { + if serviceRaw, err := c.serviceCache.Get(serviceName); err == nil { + service := serviceRaw.(ecsService) services[serviceName] = service } else { log.Errorf("Service %s referenced by task %s in service map but not found in cache, this shouldn't be able to happen. Removing task and continuing.", serviceName, taskARN) @@ -416,7 +379,5 @@ func (c ecsClientImpl) getInfo(taskARNs []string) ecsInfo { info := c.makeECSInfo(taskARNs, taskServiceMap) - c.evictOldCacheItems() - return info } diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index a3199ba0be..982f5d7040 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -101,7 +101,7 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) { if !ok { log.Debugf("Creating new ECS client") var err error - client, err = newClient(cluster) + client, err = newClient(cluster, 1e6, time.Hour) // TODO remove these temporary magic values if err != nil { return rpt, err } From 685af493bf0b2a71b64827c7786bb07db900d124 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Thu, 12 Jan 2017 11:37:23 -0800 Subject: [PATCH 14/19] ecs probe: Allow cache settings to be tweaked --- probe/awsecs/client.go | 2 +- probe/awsecs/reporter.go | 12 ++++++++---- prog/main.go | 6 +++++- prog/probe.go | 2 +- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index 6eeccee608..b21444013c 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -60,7 +60,7 @@ type ecsInfo struct { taskServiceMap map[string]string } -func newClient(cluster string, cacheSize int, cacheExpiry time.Duration) (*ecsClient, error) { +func newClient(cluster string, cacheSize int, cacheExpiry time.Duration) (ecsClient, error) { sess := session.New() region, err := ec2metadata.New(sess).Region() diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index 982f5d7040..67d539989d 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -78,13 +78,17 @@ func getLabelInfo(rpt report.Report) map[string]map[string]*taskLabelInfo { // Reporter implements Tagger, Reporter type Reporter struct { - clientsByCluster map[string]*ecsClient + clientsByCluster map[string]ecsClient + cacheSize int + cacheExpiry time.Duration } // Make creates a new Reporter -func Make() Reporter { +func Make(cacheSize int, cacheExpiry time.Duration) Reporter { return Reporter{ - clientsByCluster: map[string]*ecsClient{}, + clientsByCluster: map[string]ecsClient{}, + cacheSize: cacheSize, + cacheExpiry: cacheExpiry, } } @@ -101,7 +105,7 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) { if !ok { log.Debugf("Creating new ECS client") var err error - client, err = newClient(cluster, 1e6, time.Hour) // TODO remove these temporary magic values + client, err = newClient(cluster, r.cacheSize, r.cacheExpiry) if err != nil { return rpt, err } diff --git a/prog/main.go b/prog/main.go index ac1ca13a79..ed4d65d28a 100644 --- a/prog/main.go +++ b/prog/main.go @@ -104,7 +104,9 @@ type probeFlags struct { kubernetesEnabled bool kubernetesConfig kubernetes.ClientConfig - ecsEnabled bool + ecsEnabled bool + ecsCacheSize int + ecsCacheExpiry time.Duration weaveEnabled bool weaveAddr string @@ -287,6 +289,8 @@ func main() { // AWS ECS flag.BoolVar(&flags.probe.ecsEnabled, "probe.ecs", false, "Collect ecs-related attributes for containers on this node") + flag.IntVar(&flags.probe.ecsCacheSize, "probe.ecs.cache.size", 1024*1024, "Max size of cached info for each ECS cluster") + flag.DurationVar(&flags.probe.ecsCacheExpiry, "probe.ecs.cache.expiry", time.Hour, "How long to keep cached ECS info") // Weave flag.StringVar(&flags.probe.weaveAddr, "probe.weave.addr", "127.0.0.1:6784", "IP address & port of the Weave router") diff --git a/prog/probe.go b/prog/probe.go index fd183a30bb..946e93889c 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -206,7 +206,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { } if flags.ecsEnabled { - reporter := awsecs.Make() + reporter := awsecs.Make(flags.ecsCacheSize, flags.ecsCacheExpiry) p.AddReporter(reporter) p.AddTagger(reporter) } From 5c19dc792ee7ec9a6565e7a719f4c559fd09a9ee Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Fri, 13 Jan 2017 16:15:37 -0800 Subject: [PATCH 15/19] ecs probe: add tests for reporter --- probe/awsecs/reporter_test.go | 142 ++++++++++++++++++++++++++++------ 1 file changed, 117 insertions(+), 25 deletions(-) diff --git a/probe/awsecs/reporter_test.go b/probe/awsecs/reporter_test.go index ddcc55e408..06331ded93 100644 --- a/probe/awsecs/reporter_test.go +++ b/probe/awsecs/reporter_test.go @@ -3,12 +3,22 @@ package awsecs import ( "reflect" "testing" + "time" + + "github.com/weaveworks/scope/probe/docker" + "github.com/weaveworks/scope/report" ) -const ( +var ( testCluster = "test-cluster" testFamily = "test-family" testTaskARN = "arn:aws:ecs:us-east-1:123456789012:task/12345678-9abc-def0-1234-56789abcdef0" + testTaskCreatedAt = time.Unix(1483228800, 0) + testTaskDefinitionARN = "arn:aws:ecs:us-east-1:123456789012:task-definition/deadbeef-dead-beef-dead-beefdeadbeef" + testTaskStartedAt = time.Unix(1483228805, 0) + testDeploymentID = "ecs-svc/1121123211234321" + testServiceName = "test-service" + testServiceCount = 1 testContainer = "test-container" testContainerData = map[string]string{ docker.LabelPrefix + "com.amazonaws.ecs.task-arn": @@ -22,46 +32,46 @@ const ( func getTestContainerNode() report.Node { return report.MakeNodeWith( - report.MakeContainerNodeID("test-container"), - testContainerData + report.MakeContainerNodeID(testContainer), + testContainerData, ) } func TestGetLabelInfo(t *testing.T) { - r := Make() + r := Make(1e6, time.Hour) rpt, err := r.Report() if err != nil { - t.Fatal("Error making report", err) + t.Fatalf("Error making report", err) } - labelInfo := r.getLabelInfo(rpt) + labelInfo := getLabelInfo(rpt) expected := map[string]map[string]*taskLabelInfo{} if !reflect.DeepEqual(labelInfo, expected) { - t.Error("Empty report did not produce empty label info: %v != %v", labelInfo, expected) + t.Errorf("Empty report did not produce empty label info: %v != %v", labelInfo, expected) } - rpt.Containers = rpt.Containers.AddNode(getTestContainerNode()) - labelInfo = r.getLabelInfo(rpt) + rpt.Container = rpt.Container.AddNode(getTestContainerNode()) + labelInfo = getLabelInfo(rpt) expected = map[string]map[string]*taskLabelInfo{ testCluster: map[string]*taskLabelInfo{ testTaskARN: &taskLabelInfo{ - containerIDs: []string{testContainer}, + containerIDs: []string{report.MakeContainerNodeID(testContainer)}, family: testFamily, - } - } + }, + }, } if !reflect.DeepEqual(labelInfo, expected) { - t.Error("Did not get expected label info: %v != %v", labelInfo, expected) + t.Errorf("Did not get expected label info: %v != %v", labelInfo, expected) } } // Implements ecsClient -type mockEcsClient { +type mockEcsClient struct { t *testing.T expectedARNs []string info ecsInfo } -func newMockEcsClient(t *testing.T, expectedARNs []string, info ecsInfo) *ecsClient { +func newMockEcsClient(t *testing.T, expectedARNs []string, info ecsInfo) ecsClient { return &mockEcsClient{ t, expectedARNs, @@ -71,29 +81,111 @@ func newMockEcsClient(t *testing.T, expectedARNs []string, info ecsInfo) *ecsCli func (c mockEcsClient) getInfo(taskARNs []string) ecsInfo { if !reflect.DeepEqual(taskARNs, c.expectedARNs) { - c.t.Fatal("getInfo called with wrong ARNs: %v != %v", taskARNs, c.expectedARNs) + c.t.Fatalf("getInfo called with wrong ARNs: %v != %v", taskARNs, c.expectedARNs) } return c.info } func TestTagReport(t *testing.T) { - r := Make() + r := Make(1e6, time.Hour) r.clientsByCluster[testCluster] = newMockEcsClient( t, - []string{}, + []string{testTaskARN}, ecsInfo{ - // TODO fill in values below - tasks: map[string]ecsTask{}, - services: map[string]ecsService{}, - taskServiceMap: map[string]string{}, + tasks: map[string]ecsTask{ + testTaskARN: ecsTask{ + taskARN: testTaskARN, + createdAt: testTaskCreatedAt, + taskDefinitionARN: testTaskDefinitionARN, + startedAt: testTaskStartedAt, + startedBy: testDeploymentID, + }, + }, + services: map[string]ecsService{ + testServiceName: ecsService{ + serviceName: testServiceName, + deploymentIDs: []string{testDeploymentID}, + desiredCount: 1, + pendingCount: 0, + runningCount: 1, + taskDefinitionARN: testTaskDefinitionARN, + }, + }, + taskServiceMap: map[string]string{ + testTaskARN: testServiceName, + }, }, ) rpt, err := r.Report() if err != nil { - t.Fatal("Error making report") + t.Fatalf("Error making report") + } + rpt.Container = rpt.Container.AddNode(getTestContainerNode()) + rpt, err = r.Tag(rpt) + if err != nil { + t.Fatalf("Failed to tag: %v", err) + } + + // Check task node is present and contains expected values + task, ok := rpt.ECSTask.Nodes[report.MakeECSTaskNodeID(testTaskARN)] + if !ok { + t.Fatalf("Result report did not contain task %v: %v", testTaskARN, rpt.ECSTask.Nodes) + } + taskExpected := map[string]string{ + TaskFamily: testFamily, + Cluster: testCluster, + CreatedAt: testTaskCreatedAt.Format(time.RFC3339Nano), + } + for key, expectedValue := range taskExpected { + value, ok := task.Latest.Lookup(key) + if !ok { + t.Errorf("Result task did not contain expected key %v: %v", key, task.Latest) + continue + } + if value != expectedValue { + t.Errorf("Result task did not contain expected value for key %v: %v != %v", key, value, expectedValue) + } + } + + // Check service node is present and contains expected values + service, ok := rpt.ECSService.Nodes[report.MakeECSServiceNodeID(testServiceName)] + if !ok { + t.Fatalf("Result report did not contain service %v: %v", testServiceName, rpt.ECSService.Nodes) + } + serviceExpected := map[string]string{ + Cluster: testCluster, + ServiceDesiredCount: "1", + ServiceRunningCount: "1", + } + for key, expectedValue := range serviceExpected { + value, ok := service.Latest.Lookup(key) + if !ok { + t.Errorf("Result service did not contain expected key %v: %v", key, service.Latest) + continue + } + if value != expectedValue { + t.Errorf("Result service did not contain expected value for key %v: %v != %v", key, value, expectedValue) + } + } + + // Check container node is present and contains expected parents + container, ok := rpt.Container.Nodes[report.MakeContainerNodeID(testContainer)] + if !ok { + t.Fatalf("Result report did not contain container %v: %v", testContainer, rpt.Container.Nodes) + } + containerParentsExpected := map[string]string{ + report.ECSTask: report.MakeECSTaskNodeID(testTaskARN), + report.ECSService: report.MakeECSServiceNodeID(testServiceName), + } + for key, expectedValue := range containerParentsExpected { + values, ok := container.Parents.Lookup(key) + if !ok { + t.Errorf("Result container did not have any parents for key %v: %v", key, container.Parents) + } + if !values.Contains(expectedValue) { + t.Errorf("Result container did not contain expected value %v as a parent for key %v: %v", expectedValue, key, values) + } } - rpt = r.Tag(rpt) - // TODO check it matches } From 2b7662a3c6a7cb1b77908e9d169b3f44c651e4d3 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 17 Jan 2017 03:02:47 -0800 Subject: [PATCH 16/19] Make reporter tests a seperate package to appease linter This requires making All The Things public. Yuck. --- probe/awsecs/client.go | 105 +++++++++++++++--------------- probe/awsecs/reporter.go | 52 +++++++-------- probe/awsecs/reporter_test.go | 116 +++++++++++++++++----------------- 3 files changed, 138 insertions(+), 135 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index b21444013c..e27f5b748f 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -14,10 +14,10 @@ import ( ) // A wrapper around an AWS client that makes all the needed calls and just exposes the final results. -// We create an interface so we can mock for testing -type ecsClient interface { - // Returns a ecsInfo struct containing data needed for a report. - getInfo([]string) ecsInfo +// We create an interface so we can mock for testing. +type EcsClient interface { + // Returns a EcsInfo struct containing data needed for a report. + GetInfo([]string) EcsInfo } // actual implementation @@ -30,37 +30,40 @@ type ecsClientImpl struct { // Since we're caching tasks heavily, we ensure no mistakes by casting into a structure // that only contains immutable attributes of the resource. -type ecsTask struct { - taskARN string - createdAt time.Time - taskDefinitionARN string +// Exported for test. +type EcsTask struct { + TaskARN string + CreatedAt time.Time + TaskDefinitionARN string // These started fields are immutable once set, and guaranteed to be set once the task is running, // which we know it is because otherwise we wouldn't be looking at it. - startedAt time.Time - startedBy string // tag or deployment id + StartedAt time.Time + StartedBy string // tag or deployment id } // Services are highly mutable and so we can only cache them on a best-effort basis. // We have to refresh referenced (ie. has an associated task) services each report // but we avoid re-listing services unless we can't find a service for a task. -type ecsService struct { - serviceName string +// Exported for test. +type EcsService struct { + ServiceName string // The following values may be stale in a cached copy - deploymentIDs []string - desiredCount int64 - pendingCount int64 - runningCount int64 - taskDefinitionARN string + DeploymentIDs []string + DesiredCount int64 + PendingCount int64 + RunningCount int64 + TaskDefinitionARN string } -type ecsInfo struct { - tasks map[string]ecsTask - services map[string]ecsService - taskServiceMap map[string]string +// Exported for test +type EcsInfo struct { + Tasks map[string]EcsTask + Services map[string]EcsService + TaskServiceMap map[string]string } -func newClient(cluster string, cacheSize int, cacheExpiry time.Duration) (ecsClient, error) { +func newClient(cluster string, cacheSize int, cacheExpiry time.Duration) (EcsClient, error) { sess := session.New() region, err := ec2metadata.New(sess).Region() @@ -76,28 +79,28 @@ func newClient(cluster string, cacheSize int, cacheExpiry time.Duration) (ecsCli }, nil } -func newECSTask(task *ecs.Task) ecsTask { - return ecsTask{ - taskARN: *task.TaskArn, - createdAt: *task.CreatedAt, - taskDefinitionARN: *task.TaskDefinitionArn, - startedAt: *task.StartedAt, - startedBy: *task.StartedBy, +func newECSTask(task *ecs.Task) EcsTask { + return EcsTask{ + TaskARN: *task.TaskArn, + CreatedAt: *task.CreatedAt, + TaskDefinitionARN: *task.TaskDefinitionArn, + StartedAt: *task.StartedAt, + StartedBy: *task.StartedBy, } } -func newECSService(service *ecs.Service) ecsService { +func newECSService(service *ecs.Service) EcsService { deploymentIDs := make([]string, len(service.Deployments)) for i, deployment := range service.Deployments { deploymentIDs[i] = *deployment.Id } - return ecsService{ - serviceName: *service.ServiceName, - deploymentIDs: deploymentIDs, - desiredCount: *service.DesiredCount, - pendingCount: *service.PendingCount, - runningCount: *service.RunningCount, - taskDefinitionARN: *service.TaskDefinition, + return EcsService{ + ServiceName: *service.ServiceName, + DeploymentIDs: deploymentIDs, + DesiredCount: *service.DesiredCount, + PendingCount: *service.PendingCount, + RunningCount: *service.RunningCount, + TaskDefinitionARN: *service.TaskDefinition, } } @@ -128,7 +131,7 @@ func (c ecsClientImpl) listServices() <-chan string { } // Returns (input, done) channels. Service ARNs given to input are batched and details are fetched, -// with full ecsService objects being put into the cache. Closes done when finished. +// with full EcsService objects being put into the cache. Closes done when finished. func (c ecsClientImpl) describeServices() (chan<- string, <-chan struct{}) { input := make(chan string) done := make(chan struct{}) @@ -238,8 +241,8 @@ func (c ecsClientImpl) matchTasksServices(taskARNs []string) (map[string]string, continue } serviceName := serviceNameRaw.(string) - service := serviceRaw.(ecsService) - for _, deployment := range service.deploymentIDs { + service := serviceRaw.(EcsService) + for _, deployment := range service.DeploymentIDs { deploymentMap[deployment] = serviceName } } @@ -253,12 +256,12 @@ func (c ecsClientImpl) matchTasksServices(taskARNs []string) (map[string]string, // this can happen if we have a failure while describing tasks, just pretend the task doesn't exist continue } - task := taskRaw.(ecsTask) - if !strings.HasPrefix(task.startedBy, servicePrefix) { + task := taskRaw.(EcsTask) + if !strings.HasPrefix(task.StartedBy, servicePrefix) { // task was not started by a service continue } - if serviceName, ok := deploymentMap[task.startedBy]; ok { + if serviceName, ok := deploymentMap[task.StartedBy]; ok { results[taskARN] = serviceName } else { unmatched = append(unmatched, taskARN) @@ -312,26 +315,26 @@ func (c ecsClientImpl) describeAllServices(servicesRefreshed map[string]bool) { <-done } -func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string]string) ecsInfo { +func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string]string) EcsInfo { // The maps to return are the referenced subsets of the full caches - tasks := map[string]ecsTask{} + tasks := map[string]EcsTask{} for _, taskARN := range taskARNs { // It's possible that tasks could still be missing from the cache if describe tasks failed. // We'll just pretend they don't exist. if taskRaw, err := c.taskCache.Get(taskARN); err == nil { - task := taskRaw.(ecsTask) + task := taskRaw.(EcsTask) tasks[taskARN] = task } } - services := map[string]ecsService{} + services := map[string]EcsService{} for taskARN, serviceName := range taskServiceMap { if _, ok := taskServiceMap[serviceName]; ok { // Already present. This is expected since multiple tasks can map to the same service. continue } if serviceRaw, err := c.serviceCache.Get(serviceName); err == nil { - service := serviceRaw.(ecsService) + service := serviceRaw.(EcsService) services[serviceName] = service } else { log.Errorf("Service %s referenced by task %s in service map but not found in cache, this shouldn't be able to happen. Removing task and continuing.", serviceName, taskARN) @@ -339,11 +342,11 @@ func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string] } } - return ecsInfo{services: services, tasks: tasks, taskServiceMap: taskServiceMap} + return EcsInfo{Services: services, Tasks: tasks, TaskServiceMap: taskServiceMap} } -// Implements ecsClient.getInfo -func (c ecsClientImpl) getInfo(taskARNs []string) ecsInfo { +// Implements EcsClient.GetInfo +func (c ecsClientImpl) GetInfo(taskARNs []string) EcsInfo { log.Debugf("Getting ECS info on %d tasks", len(taskARNs)) // We do a weird order of operations here to minimize unneeded cache refreshes. diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index 67d539989d..344c0ba5aa 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -32,14 +32,16 @@ var ( } ) -type taskLabelInfo struct { - containerIDs []string - family string +// Used in return value of GetLabelInfo. Exported for test. +type TaskLabelInfo struct { + ContainerIDs []string + Family string } -// return map from cluster to map of task arns to task infos -func getLabelInfo(rpt report.Report) map[string]map[string]*taskLabelInfo { - results := map[string]map[string]*taskLabelInfo{} +// Return map from cluster to map of task arns to task infos. +// Exported for test. +func GetLabelInfo(rpt report.Report) map[string]map[string]*TaskLabelInfo { + results := map[string]map[string]*TaskLabelInfo{} log.Debug("scanning for ECS containers") for nodeID, node := range rpt.Container.Nodes { @@ -60,17 +62,17 @@ func getLabelInfo(rpt report.Report) map[string]map[string]*taskLabelInfo { taskMap, ok := results[cluster] if !ok { - taskMap = map[string]*taskLabelInfo{} + taskMap = map[string]*TaskLabelInfo{} results[cluster] = taskMap } task, ok := taskMap[taskArn] if !ok { - task = &taskLabelInfo{containerIDs: []string{}, family: family} + task = &TaskLabelInfo{ContainerIDs: []string{}, Family: family} taskMap[taskArn] = task } - task.containerIDs = append(task.containerIDs, nodeID) + task.ContainerIDs = append(task.ContainerIDs, nodeID) } log.Debug("Got ECS container info: %v", results) return results @@ -78,7 +80,7 @@ func getLabelInfo(rpt report.Report) map[string]map[string]*taskLabelInfo { // Reporter implements Tagger, Reporter type Reporter struct { - clientsByCluster map[string]ecsClient + ClientsByCluster map[string]EcsClient // Exported for test cacheSize int cacheExpiry time.Duration } @@ -86,7 +88,7 @@ type Reporter struct { // Make creates a new Reporter func Make(cacheSize int, cacheExpiry time.Duration) Reporter { return Reporter{ - clientsByCluster: map[string]ecsClient{}, + ClientsByCluster: map[string]EcsClient{}, cacheSize: cacheSize, cacheExpiry: cacheExpiry, } @@ -96,12 +98,12 @@ func Make(cacheSize int, cacheExpiry time.Duration) Reporter { func (r Reporter) Tag(rpt report.Report) (report.Report, error) { rpt = rpt.Copy() - clusterMap := getLabelInfo(rpt) + clusterMap := GetLabelInfo(rpt) for cluster, taskMap := range clusterMap { log.Debugf("Fetching ECS info for cluster %v with %v tasks", cluster, len(taskMap)) - client, ok := r.clientsByCluster[cluster] + client, ok := r.ClientsByCluster[cluster] if !ok { log.Debugf("Creating new ECS client") var err error @@ -109,7 +111,7 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) { if err != nil { return rpt, err } - r.clientsByCluster[cluster] = client + r.ClientsByCluster[cluster] = client } taskArns := make([]string, 0, len(taskMap)) @@ -117,22 +119,22 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) { taskArns = append(taskArns, taskArn) } - ecsInfo := client.getInfo(taskArns) - log.Debugf("Got info from ECS: %d tasks, %d services", len(ecsInfo.tasks), len(ecsInfo.services)) + ecsInfo := client.GetInfo(taskArns) + log.Debugf("Got info from ECS: %d tasks, %d services", len(ecsInfo.Tasks), len(ecsInfo.Services)) // Create all the services first - for serviceName, service := range ecsInfo.services { + for serviceName, service := range ecsInfo.Services { serviceID := report.MakeECSServiceNodeID(serviceName) rpt.ECSService = rpt.ECSService.AddNode(report.MakeNodeWith(serviceID, map[string]string{ Cluster: cluster, - ServiceDesiredCount: fmt.Sprintf("%d", service.desiredCount), - ServiceRunningCount: fmt.Sprintf("%d", service.runningCount), + ServiceDesiredCount: fmt.Sprintf("%d", service.DesiredCount), + ServiceRunningCount: fmt.Sprintf("%d", service.RunningCount), })) } - log.Debugf("Created %v ECS service nodes", len(ecsInfo.services)) + log.Debugf("Created %v ECS service nodes", len(ecsInfo.Services)) for taskArn, info := range taskMap { - task, ok := ecsInfo.tasks[taskArn] + task, ok := ecsInfo.Tasks[taskArn] if !ok { // can happen due to partial failures, just skip it continue @@ -141,20 +143,20 @@ func (r Reporter) Tag(rpt report.Report) (report.Report, error) { // new task node taskID := report.MakeECSTaskNodeID(taskArn) node := report.MakeNodeWith(taskID, map[string]string{ - TaskFamily: info.family, + TaskFamily: info.Family, Cluster: cluster, - CreatedAt: task.createdAt.Format(time.RFC3339Nano), + CreatedAt: task.CreatedAt.Format(time.RFC3339Nano), }) rpt.ECSTask = rpt.ECSTask.AddNode(node) // parents sets to merge into all matching container nodes parentsSets := report.MakeSets() parentsSets = parentsSets.Add(report.ECSTask, report.MakeStringSet(taskID)) - if serviceName, ok := ecsInfo.taskServiceMap[taskArn]; ok { + if serviceName, ok := ecsInfo.TaskServiceMap[taskArn]; ok { serviceID := report.MakeECSServiceNodeID(serviceName) parentsSets = parentsSets.Add(report.ECSService, report.MakeStringSet(serviceID)) } - for _, containerID := range info.containerIDs { + for _, containerID := range info.ContainerIDs { if containerNode, ok := rpt.Container.Nodes[containerID]; ok { rpt.Container.Nodes[containerID] = containerNode.WithParents(parentsSets) } else { diff --git a/probe/awsecs/reporter_test.go b/probe/awsecs/reporter_test.go index 06331ded93..117b5364d0 100644 --- a/probe/awsecs/reporter_test.go +++ b/probe/awsecs/reporter_test.go @@ -1,32 +1,30 @@ -package awsecs +package awsecs_test import ( "reflect" "testing" "time" + "github.com/weaveworks/scope/probe/awsecs" "github.com/weaveworks/scope/probe/docker" "github.com/weaveworks/scope/report" ) var ( - testCluster = "test-cluster" - testFamily = "test-family" - testTaskARN = "arn:aws:ecs:us-east-1:123456789012:task/12345678-9abc-def0-1234-56789abcdef0" - testTaskCreatedAt = time.Unix(1483228800, 0) + testCluster = "test-cluster" + testFamily = "test-family" + testTaskARN = "arn:aws:ecs:us-east-1:123456789012:task/12345678-9abc-def0-1234-56789abcdef0" + testTaskCreatedAt = time.Unix(1483228800, 0) testTaskDefinitionARN = "arn:aws:ecs:us-east-1:123456789012:task-definition/deadbeef-dead-beef-dead-beefdeadbeef" - testTaskStartedAt = time.Unix(1483228805, 0) - testDeploymentID = "ecs-svc/1121123211234321" - testServiceName = "test-service" - testServiceCount = 1 - testContainer = "test-container" - testContainerData = map[string]string{ - docker.LabelPrefix + "com.amazonaws.ecs.task-arn": - testTaskARN, - docker.LabelPrefix + "com.amazonaws.ecs.cluster": - testCluster, - docker.LabelPrefix + "com.amazonaws.ecs.task-definition-family": - testFamily, + testTaskStartedAt = time.Unix(1483228805, 0) + testDeploymentID = "ecs-svc/1121123211234321" + testServiceName = "test-service" + testServiceCount = 1 + testContainer = "test-container" + testContainerData = map[string]string{ + docker.LabelPrefix + "com.amazonaws.ecs.task-arn": testTaskARN, + docker.LabelPrefix + "com.amazonaws.ecs.cluster": testCluster, + docker.LabelPrefix + "com.amazonaws.ecs.task-definition-family": testFamily, } ) @@ -38,24 +36,24 @@ func getTestContainerNode() report.Node { } func TestGetLabelInfo(t *testing.T) { - r := Make(1e6, time.Hour) + r := awsecs.Make(1e6, time.Hour) rpt, err := r.Report() if err != nil { - t.Fatalf("Error making report", err) + t.Fatalf("Error making report: %v", err) } - labelInfo := getLabelInfo(rpt) - expected := map[string]map[string]*taskLabelInfo{} + labelInfo := awsecs.GetLabelInfo(rpt) + expected := map[string]map[string]*awsecs.TaskLabelInfo{} if !reflect.DeepEqual(labelInfo, expected) { t.Errorf("Empty report did not produce empty label info: %v != %v", labelInfo, expected) } rpt.Container = rpt.Container.AddNode(getTestContainerNode()) - labelInfo = getLabelInfo(rpt) - expected = map[string]map[string]*taskLabelInfo{ - testCluster: map[string]*taskLabelInfo{ - testTaskARN: &taskLabelInfo{ - containerIDs: []string{report.MakeContainerNodeID(testContainer)}, - family: testFamily, + labelInfo = awsecs.GetLabelInfo(rpt) + expected = map[string]map[string]*awsecs.TaskLabelInfo{ + testCluster: { + testTaskARN: { + ContainerIDs: []string{report.MakeContainerNodeID(testContainer)}, + Family: testFamily, }, }, } @@ -64,14 +62,14 @@ func TestGetLabelInfo(t *testing.T) { } } -// Implements ecsClient +// Implements EcsClient type mockEcsClient struct { - t *testing.T + t *testing.T expectedARNs []string - info ecsInfo + info awsecs.EcsInfo } -func newMockEcsClient(t *testing.T, expectedARNs []string, info ecsInfo) ecsClient { +func newMockEcsClient(t *testing.T, expectedARNs []string, info awsecs.EcsInfo) awsecs.EcsClient { return &mockEcsClient{ t, expectedARNs, @@ -79,40 +77,40 @@ func newMockEcsClient(t *testing.T, expectedARNs []string, info ecsInfo) ecsClie } } -func (c mockEcsClient) getInfo(taskARNs []string) ecsInfo { +func (c mockEcsClient) GetInfo(taskARNs []string) awsecs.EcsInfo { if !reflect.DeepEqual(taskARNs, c.expectedARNs) { - c.t.Fatalf("getInfo called with wrong ARNs: %v != %v", taskARNs, c.expectedARNs) + c.t.Fatalf("GetInfo called with wrong ARNs: %v != %v", taskARNs, c.expectedARNs) } return c.info } func TestTagReport(t *testing.T) { - r := Make(1e6, time.Hour) + r := awsecs.Make(1e6, time.Hour) - r.clientsByCluster[testCluster] = newMockEcsClient( + r.ClientsByCluster[testCluster] = newMockEcsClient( t, []string{testTaskARN}, - ecsInfo{ - tasks: map[string]ecsTask{ - testTaskARN: ecsTask{ - taskARN: testTaskARN, - createdAt: testTaskCreatedAt, - taskDefinitionARN: testTaskDefinitionARN, - startedAt: testTaskStartedAt, - startedBy: testDeploymentID, + awsecs.EcsInfo{ + Tasks: map[string]awsecs.EcsTask{ + testTaskARN: { + TaskARN: testTaskARN, + CreatedAt: testTaskCreatedAt, + TaskDefinitionARN: testTaskDefinitionARN, + StartedAt: testTaskStartedAt, + StartedBy: testDeploymentID, }, }, - services: map[string]ecsService{ - testServiceName: ecsService{ - serviceName: testServiceName, - deploymentIDs: []string{testDeploymentID}, - desiredCount: 1, - pendingCount: 0, - runningCount: 1, - taskDefinitionARN: testTaskDefinitionARN, + Services: map[string]awsecs.EcsService{ + testServiceName: { + ServiceName: testServiceName, + DeploymentIDs: []string{testDeploymentID}, + DesiredCount: 1, + PendingCount: 0, + RunningCount: 1, + TaskDefinitionARN: testTaskDefinitionARN, }, }, - taskServiceMap: map[string]string{ + TaskServiceMap: map[string]string{ testTaskARN: testServiceName, }, }, @@ -134,9 +132,9 @@ func TestTagReport(t *testing.T) { t.Fatalf("Result report did not contain task %v: %v", testTaskARN, rpt.ECSTask.Nodes) } taskExpected := map[string]string{ - TaskFamily: testFamily, - Cluster: testCluster, - CreatedAt: testTaskCreatedAt.Format(time.RFC3339Nano), + awsecs.TaskFamily: testFamily, + awsecs.Cluster: testCluster, + awsecs.CreatedAt: testTaskCreatedAt.Format(time.RFC3339Nano), } for key, expectedValue := range taskExpected { value, ok := task.Latest.Lookup(key) @@ -155,9 +153,9 @@ func TestTagReport(t *testing.T) { t.Fatalf("Result report did not contain service %v: %v", testServiceName, rpt.ECSService.Nodes) } serviceExpected := map[string]string{ - Cluster: testCluster, - ServiceDesiredCount: "1", - ServiceRunningCount: "1", + awsecs.Cluster: testCluster, + awsecs.ServiceDesiredCount: "1", + awsecs.ServiceRunningCount: "1", } for key, expectedValue := range serviceExpected { value, ok := service.Latest.Lookup(key) @@ -176,7 +174,7 @@ func TestTagReport(t *testing.T) { t.Fatalf("Result report did not contain container %v: %v", testContainer, rpt.Container.Nodes) } containerParentsExpected := map[string]string{ - report.ECSTask: report.MakeECSTaskNodeID(testTaskARN), + report.ECSTask: report.MakeECSTaskNodeID(testTaskARN), report.ECSService: report.MakeECSServiceNodeID(testServiceName), } for key, expectedValue := range containerParentsExpected { From 79a83e3656e970a2ef0c28ba7f5ed10b16670e38 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Tue, 17 Jan 2017 12:17:34 -0800 Subject: [PATCH 17/19] awsecs: Appease linter --- probe/awsecs/client.go | 6 ++++-- probe/awsecs/reporter.go | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index e27f5b748f..0db7c0a326 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -13,7 +13,7 @@ import ( "github.com/bluele/gcache" ) -// A wrapper around an AWS client that makes all the needed calls and just exposes the final results. +// EcsClient is a wrapper around an AWS client that makes all the needed calls and just exposes the final results. // We create an interface so we can mock for testing. type EcsClient interface { // Returns a EcsInfo struct containing data needed for a report. @@ -28,6 +28,7 @@ type ecsClientImpl struct { serviceCache gcache.Cache // Keys are service names. } +// EcsTask describes the parts of ECS tasks we care about. // Since we're caching tasks heavily, we ensure no mistakes by casting into a structure // that only contains immutable attributes of the resource. // Exported for test. @@ -42,6 +43,7 @@ type EcsTask struct { StartedBy string // tag or deployment id } +// EcsService describes the parts of ECS services we care about. // Services are highly mutable and so we can only cache them on a best-effort basis. // We have to refresh referenced (ie. has an associated task) services each report // but we avoid re-listing services unless we can't find a service for a task. @@ -56,7 +58,7 @@ type EcsService struct { TaskDefinitionARN string } -// Exported for test +// EcsInfo is exported for test type EcsInfo struct { Tasks map[string]EcsTask Services map[string]EcsService diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index 344c0ba5aa..95754d4a25 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -32,13 +32,13 @@ var ( } ) -// Used in return value of GetLabelInfo. Exported for test. +// TaskLabelInfo is used in return value of GetLabelInfo. Exported for test. type TaskLabelInfo struct { ContainerIDs []string Family string } -// Return map from cluster to map of task arns to task infos. +// GetLabelInfo returns map from cluster to map of task arns to task infos. // Exported for test. func GetLabelInfo(rpt report.Report) map[string]map[string]*TaskLabelInfo { results := map[string]map[string]*TaskLabelInfo{} From baffe9453865456d2200032ed452a5599a15e187 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Fri, 20 Jan 2017 14:31:41 -0800 Subject: [PATCH 18/19] awsecs caching: Minor review changes --- probe/awsecs/client.go | 50 ++++++++++++++++++++++++++++-------------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index 0db7c0a326..114bcbc261 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -13,6 +13,8 @@ import ( "github.com/bluele/gcache" ) +const servicePrefix = "ecs-svc" // Task StartedBy field begins with this if it was started by a service + // EcsClient is a wrapper around an AWS client that makes all the needed calls and just exposes the final results. // We create an interface so we can mock for testing. type EcsClient interface { @@ -106,6 +108,27 @@ func newECSService(service *ecs.Service) EcsService { } } +// IsServiceManaged returns true if the task was started by a service. +func (t EcsTask) IsServiceManaged() bool { + return strings.HasPrefix(t.StartedBy, servicePrefix) +} + +// Fetches a task from the cache, returning (task, ok) as per map[] +func (c ecsClientImpl) getCachedTask(taskARN string) (EcsTask, bool) { + if taskRaw, err := c.taskCache.Get(taskARN); err == nil { + return taskRaw.(EcsTask), true + } + return EcsTask{}, false +} + +// Fetches a service from the cache, returning (service, ok) as per map[] +func (c ecsClientImpl) getCachedService(serviceName string) (EcsService, bool) { + if serviceRaw, err := c.serviceCache.Get(serviceName); err == nil { + return serviceRaw.(EcsService), true + } + return EcsService{}, false +} + // Returns a channel from which service ARNs can be read. // Cannot fail as it will attempt to deliver partial results, though that may end up being no results. func (c ecsClientImpl) listServices() <-chan string { @@ -233,17 +256,14 @@ func (c ecsClientImpl) getTasks(taskARNs []string) { // Returns (task to service map, unmatched tasks). Ignores tasks whose startedby values // don't appear to point to a service. func (c ecsClientImpl) matchTasksServices(taskARNs []string) (map[string]string, []string) { - const servicePrefix = "ecs-svc" - deploymentMap := map[string]string{} for _, serviceNameRaw := range c.serviceCache.Keys() { - serviceRaw, err := c.serviceCache.Get(serviceNameRaw) - if err != nil { + serviceName := serviceNameRaw.(string) + service, ok := c.getCachedService(serviceName) + if !ok { // This is rare, but possible if service was evicted after the loop began continue } - serviceName := serviceNameRaw.(string) - service := serviceRaw.(EcsService) for _, deployment := range service.DeploymentIDs { deploymentMap[deployment] = serviceName } @@ -253,14 +273,12 @@ func (c ecsClientImpl) matchTasksServices(taskARNs []string) (map[string]string, results := map[string]string{} unmatched := []string{} for _, taskARN := range taskARNs { - taskRaw, err := c.taskCache.Get(taskARN) - if err != nil { + task, ok := c.getCachedTask(taskARN) + if !ok { // this can happen if we have a failure while describing tasks, just pretend the task doesn't exist continue } - task := taskRaw.(EcsTask) - if !strings.HasPrefix(task.StartedBy, servicePrefix) { - // task was not started by a service + if !task.IsServiceManaged() { continue } if serviceName, ok := deploymentMap[task.StartedBy]; ok { @@ -274,7 +292,7 @@ func (c ecsClientImpl) matchTasksServices(taskARNs []string) (map[string]string, return results, unmatched } -func (c ecsClientImpl) ensureTasks(taskARNs []string) { +func (c ecsClientImpl) ensureTasksAreCached(taskARNs []string) { tasksToFetch := []string{} for _, taskARN := range taskARNs { if _, err := c.taskCache.Get(taskARN); err != nil { @@ -323,8 +341,7 @@ func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string] for _, taskARN := range taskARNs { // It's possible that tasks could still be missing from the cache if describe tasks failed. // We'll just pretend they don't exist. - if taskRaw, err := c.taskCache.Get(taskARN); err == nil { - task := taskRaw.(EcsTask) + if task, ok := c.getCachedTask(taskARN); ok { tasks[taskARN] = task } } @@ -335,8 +352,7 @@ func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string] // Already present. This is expected since multiple tasks can map to the same service. continue } - if serviceRaw, err := c.serviceCache.Get(serviceName); err == nil { - service := serviceRaw.(EcsService) + if service, ok := c.getCachedService(serviceName); ok { services[serviceName] = service } else { log.Errorf("Service %s referenced by task %s in service map but not found in cache, this shouldn't be able to happen. Removing task and continuing.", serviceName, taskARN) @@ -354,7 +370,7 @@ func (c ecsClientImpl) GetInfo(taskARNs []string) EcsInfo { // We do a weird order of operations here to minimize unneeded cache refreshes. // First, we ensure we have all the tasks we need, and fetch the ones we don't. // We also mark the tasks as being used here to prevent eviction. - c.ensureTasks(taskARNs) + c.ensureTasksAreCached(taskARNs) // We're going to do this matching process potentially several times, but that's ok - it's quite cheap. // First, we want to see how far we get with existing data, and identify the set of services From c4eb0960f93931ed2486617a7a5fa322d0b470e5 Mon Sep 17 00:00:00 2001 From: Mike Lang Date: Mon, 23 Jan 2017 12:48:50 -0800 Subject: [PATCH 19/19] awsecs client: simplify list/describe services by removing ability to stream results between them, since this is such a minor optimization and greatly complicates the code. --- probe/awsecs/client.go | 133 +++++++++++++++++------------------------ 1 file changed, 55 insertions(+), 78 deletions(-) diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index 114bcbc261..1ee8a9d553 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -129,85 +129,67 @@ func (c ecsClientImpl) getCachedService(serviceName string) (EcsService, bool) { return EcsService{}, false } -// Returns a channel from which service ARNs can be read. +// Returns a list of service names. // Cannot fail as it will attempt to deliver partial results, though that may end up being no results. -func (c ecsClientImpl) listServices() <-chan string { +func (c ecsClientImpl) listServices() []string { log.Debugf("Listing ECS services") - results := make(chan string) - go func() { - count := 0 - err := c.client.ListServicesPages( - &ecs.ListServicesInput{Cluster: &c.cluster}, - func(page *ecs.ListServicesOutput, lastPage bool) bool { - for _, arn := range page.ServiceArns { - count++ - results <- *arn - } - return true - }, - ) - if err != nil { - log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err) - } - log.Debugf("Listed %d services", count) - close(results) - }() + results := []string{} + err := c.client.ListServicesPages( + &ecs.ListServicesInput{Cluster: &c.cluster}, + func(page *ecs.ListServicesOutput, lastPage bool) bool { + for _, name := range page.ServiceArns { + results = append(results, *name) + } + return true + }, + ) + if err != nil { + log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err) + } + log.Debugf("Listed %d services", len(results)) return results } -// Returns (input, done) channels. Service ARNs given to input are batched and details are fetched, -// with full EcsService objects being put into the cache. Closes done when finished. -func (c ecsClientImpl) describeServices() (chan<- string, <-chan struct{}) { - input := make(chan string) - done := make(chan struct{}) +// Service names given are batched and details are fetched, +// with full EcsService objects being put into the cache. +// Cannot fail as it will attempt to deliver partial results. +func (c ecsClientImpl) describeServices(services []string) { + const maxServices = 10 // How many services we can put in one Describe command + group := sync.WaitGroup{} log.Debugf("Describing ECS services") - go func() { - const maxServices = 10 // How many services we can put in one Describe command - group := sync.WaitGroup{} - - // count and calls is just for logging - count := 0 - calls := 0 - - batch := make([]string, 0, maxServices) - for arn := range input { - batch = append(batch, arn) - if len(batch) == maxServices { - group.Add(1) - go func(arns []string) { - defer group.Done() - c.describeServicesBatch(arns) - }(batch) - count += len(batch) - calls++ - batch = make([]string, 0, maxServices) - } - } - if len(batch) > 0 { - c.describeServicesBatch(batch) - count += len(batch) - calls++ - } + // split into batches + batches := make([][]string, 0, len(services)/maxServices+1) + for len(services) > maxServices { + batch := services[:maxServices] + services = services[maxServices:] + batches = append(batches, batch) + } + if len(services) > 0 { + batches = append(batches, services) + } - log.Debugf("Described %d services in %d calls", count, calls) - group.Wait() - close(done) - }() + for _, batch := range batches { + group.Add(1) + go func(names []string) { + defer group.Done() + c.describeServicesBatch(names) + }(batch) + } - return input, done + group.Wait() } -func (c ecsClientImpl) describeServicesBatch(arns []string) { - arnPtrs := make([]*string, 0, len(arns)) - for i := range arns { - arnPtrs = append(arnPtrs, &arns[i]) +func (c ecsClientImpl) describeServicesBatch(names []string) { + namePtrs := make([]*string, 0, len(names)) + for i := range names { + namePtrs = append(namePtrs, &names[i]) } resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{ Cluster: &c.cluster, - Services: arnPtrs, + Services: namePtrs, }) if err != nil { log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err) @@ -306,33 +288,28 @@ func (c ecsClientImpl) ensureTasksAreCached(taskARNs []string) { } func (c ecsClientImpl) refreshServices(taskServiceMap map[string]string) map[string]bool { - toDescribe, done := c.describeServices() servicesRefreshed := map[string]bool{} + toDescribe := []string{} for _, serviceName := range taskServiceMap { if servicesRefreshed[serviceName] { continue } - toDescribe <- serviceName + toDescribe = append(toDescribe, serviceName) servicesRefreshed[serviceName] = true } - close(toDescribe) - <-done + c.describeServices(toDescribe) return servicesRefreshed } func (c ecsClientImpl) describeAllServices(servicesRefreshed map[string]bool) { - serviceNamesChan := c.listServices() - toDescribe, done := c.describeServices() - go func() { - for serviceName := range serviceNamesChan { - if !servicesRefreshed[serviceName] { - toDescribe <- serviceName - servicesRefreshed[serviceName] = true - } + toDescribe := []string{} + for _, serviceName := range c.listServices() { + if !servicesRefreshed[serviceName] { + toDescribe = append(toDescribe, serviceName) + servicesRefreshed[serviceName] = true } - close(toDescribe) - }() - <-done + } + c.describeServices(toDescribe) } func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string]string) EcsInfo {