diff --git a/probe/awsecs/client.go b/probe/awsecs/client.go index 2fb3071f5c..1ee8a9d553 100644 --- a/probe/awsecs/client.go +++ b/probe/awsecs/client.go @@ -1,28 +1,73 @@ package awsecs import ( + "strings" "sync" + "time" log "github.com/Sirupsen/logrus" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/ec2metadata" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/ecs" + "github.com/bluele/gcache" ) -// a wrapper around an AWS client that makes all the needed calls and just exposes the final results -type ecsClient struct { - client *ecs.ECS - cluster string +const servicePrefix = "ecs-svc" // Task StartedBy field begins with this if it was started by a service + +// EcsClient is a wrapper around an AWS client that makes all the needed calls and just exposes the final results. +// We create an interface so we can mock for testing. +type EcsClient interface { + // Returns a EcsInfo struct containing data needed for a report. + GetInfo([]string) EcsInfo +} + +// actual implementation +type ecsClientImpl struct { + client *ecs.ECS + cluster string + taskCache gcache.Cache // Keys are task ARNs. + serviceCache gcache.Cache // Keys are service names. +} + +// EcsTask describes the parts of ECS tasks we care about. +// Since we're caching tasks heavily, we ensure no mistakes by casting into a structure +// that only contains immutable attributes of the resource. +// Exported for test. +type EcsTask struct { + TaskARN string + CreatedAt time.Time + TaskDefinitionARN string + + // These started fields are immutable once set, and guaranteed to be set once the task is running, + // which we know it is because otherwise we wouldn't be looking at it. + StartedAt time.Time + StartedBy string // tag or deployment id } -type ecsInfo struct { - tasks map[string]*ecs.Task - services map[string]*ecs.Service - taskServiceMap map[string]string +// EcsService describes the parts of ECS services we care about. +// Services are highly mutable and so we can only cache them on a best-effort basis. +// We have to refresh referenced (ie. has an associated task) services each report +// but we avoid re-listing services unless we can't find a service for a task. +// Exported for test. +type EcsService struct { + ServiceName string + // The following values may be stale in a cached copy + DeploymentIDs []string + DesiredCount int64 + PendingCount int64 + RunningCount int64 + TaskDefinitionARN string } -func newClient(cluster string) (*ecsClient, error) { +// EcsInfo is exported for test +type EcsInfo struct { + Tasks map[string]EcsTask + Services map[string]EcsService + TaskServiceMap map[string]string +} + +func newClient(cluster string, cacheSize int, cacheExpiry time.Duration) (EcsClient, error) { sess := session.New() region, err := ec2metadata.New(sess).Region() @@ -30,73 +75,143 @@ func newClient(cluster string) (*ecsClient, error) { return nil, err } - return &ecsClient{ - client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), - cluster: cluster, + return &ecsClientImpl{ + client: ecs.New(sess, &aws.Config{Region: aws.String(region)}), + cluster: cluster, + taskCache: gcache.New(cacheSize).LRU().Expiration(cacheExpiry).Build(), + serviceCache: gcache.New(cacheSize).LRU().Expiration(cacheExpiry).Build(), }, nil } -// returns a map from deployment ids to service names -func (c ecsClient) getDeploymentMap(services map[string]*ecs.Service) map[string]string { - results := map[string]string{} - for serviceName, service := range services { - for _, deployment := range service.Deployments { - results[*deployment.Id] = serviceName - } +func newECSTask(task *ecs.Task) EcsTask { + return EcsTask{ + TaskARN: *task.TaskArn, + CreatedAt: *task.CreatedAt, + TaskDefinitionARN: *task.TaskDefinitionArn, + StartedAt: *task.StartedAt, + StartedBy: *task.StartedBy, } - return results } -// cannot fail as it will attempt to deliver partial results, though that may end up being no results -func (c ecsClient) getServices() map[string]*ecs.Service { - results := map[string]*ecs.Service{} - lock := sync.Mutex{} // lock mediates access to results +func newECSService(service *ecs.Service) EcsService { + deploymentIDs := make([]string, len(service.Deployments)) + for i, deployment := range service.Deployments { + deploymentIDs[i] = *deployment.Id + } + return EcsService{ + ServiceName: *service.ServiceName, + DeploymentIDs: deploymentIDs, + DesiredCount: *service.DesiredCount, + PendingCount: *service.PendingCount, + RunningCount: *service.RunningCount, + TaskDefinitionARN: *service.TaskDefinition, + } +} - group := sync.WaitGroup{} +// IsServiceManaged returns true if the task was started by a service. +func (t EcsTask) IsServiceManaged() bool { + return strings.HasPrefix(t.StartedBy, servicePrefix) +} + +// Fetches a task from the cache, returning (task, ok) as per map[] +func (c ecsClientImpl) getCachedTask(taskARN string) (EcsTask, bool) { + if taskRaw, err := c.taskCache.Get(taskARN); err == nil { + return taskRaw.(EcsTask), true + } + return EcsTask{}, false +} + +// Fetches a service from the cache, returning (service, ok) as per map[] +func (c ecsClientImpl) getCachedService(serviceName string) (EcsService, bool) { + if serviceRaw, err := c.serviceCache.Get(serviceName); err == nil { + return serviceRaw.(EcsService), true + } + return EcsService{}, false +} +// Returns a list of service names. +// Cannot fail as it will attempt to deliver partial results, though that may end up being no results. +func (c ecsClientImpl) listServices() []string { + log.Debugf("Listing ECS services") + results := []string{} err := c.client.ListServicesPages( &ecs.ListServicesInput{Cluster: &c.cluster}, func(page *ecs.ListServicesOutput, lastPage bool) bool { - // describe each page of 10 (the max for one describe command) concurrently - group.Add(1) - serviceArns := page.ServiceArns - go func() { - defer group.Done() - - resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{ - Cluster: &c.cluster, - Services: serviceArns, - }) - if err != nil { - log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err) - return - } - - for _, failure := range resp.Failures { - log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, *failure.Reason) - } - - lock.Lock() - for _, service := range resp.Services { - results[*service.ServiceName] = service - } - lock.Unlock() - }() + for _, name := range page.ServiceArns { + results = append(results, *name) + } return true }, ) - group.Wait() - if err != nil { log.Warnf("Error listing ECS services, ECS service report may be incomplete: %v", err) } + log.Debugf("Listed %d services", len(results)) return results } -func (c ecsClient) getTasks(taskArns []string) (map[string]*ecs.Task, error) { - taskPtrs := make([]*string, len(taskArns)) - for i := range taskArns { - taskPtrs[i] = &taskArns[i] +// Service names given are batched and details are fetched, +// with full EcsService objects being put into the cache. +// Cannot fail as it will attempt to deliver partial results. +func (c ecsClientImpl) describeServices(services []string) { + const maxServices = 10 // How many services we can put in one Describe command + group := sync.WaitGroup{} + + log.Debugf("Describing ECS services") + + // split into batches + batches := make([][]string, 0, len(services)/maxServices+1) + for len(services) > maxServices { + batch := services[:maxServices] + services = services[maxServices:] + batches = append(batches, batch) + } + if len(services) > 0 { + batches = append(batches, services) + } + + for _, batch := range batches { + group.Add(1) + go func(names []string) { + defer group.Done() + c.describeServicesBatch(names) + }(batch) + } + + group.Wait() +} + +func (c ecsClientImpl) describeServicesBatch(names []string) { + namePtrs := make([]*string, 0, len(names)) + for i := range names { + namePtrs = append(namePtrs, &names[i]) + } + + resp, err := c.client.DescribeServices(&ecs.DescribeServicesInput{ + Cluster: &c.cluster, + Services: namePtrs, + }) + if err != nil { + log.Warnf("Error describing some ECS services, ECS service report may be incomplete: %v", err) + return + } + + for _, failure := range resp.Failures { + log.Warnf("Failed to describe ECS service %s, ECS service report may be incomplete: %s", *failure.Arn, failure.Reason) + } + + for _, service := range resp.Services { + c.serviceCache.Set(*service.ServiceName, newECSService(service)) + } +} + +// get details on given tasks, updating cache with the results +func (c ecsClientImpl) getTasks(taskARNs []string) { + log.Debugf("Describing %d ECS tasks", len(taskARNs)) + + taskPtrs := make([]*string, len(taskARNs)) + for i := range taskARNs { + taskPtrs[i] = &taskARNs[i] } // You'd think there's a limit on how many tasks can be described here, @@ -106,45 +221,161 @@ func (c ecsClient) getTasks(taskArns []string) (map[string]*ecs.Task, error) { Tasks: taskPtrs, }) if err != nil { - return nil, err + log.Warnf("Failed to describe ECS tasks, ECS service report may be incomplete: %v", err) + return } for _, failure := range resp.Failures { log.Warnf("Failed to describe ECS task %s, ECS service report may be incomplete: %s", *failure.Arn, *failure.Reason) } - results := make(map[string]*ecs.Task, len(resp.Tasks)) for _, task := range resp.Tasks { - results[*task.TaskArn] = task + c.taskCache.Set(*task.TaskArn, newECSTask(task)) } - return results, nil } -// returns a map from task ARNs to service names -func (c ecsClient) getInfo(taskArns []string) (ecsInfo, error) { - servicesChan := make(chan map[string]*ecs.Service) - go func() { - servicesChan <- c.getServices() - }() +// Try to match a list of task ARNs to service names using cached info. +// Returns (task to service map, unmatched tasks). Ignores tasks whose startedby values +// don't appear to point to a service. +func (c ecsClientImpl) matchTasksServices(taskARNs []string) (map[string]string, []string) { + deploymentMap := map[string]string{} + for _, serviceNameRaw := range c.serviceCache.Keys() { + serviceName := serviceNameRaw.(string) + service, ok := c.getCachedService(serviceName) + if !ok { + // This is rare, but possible if service was evicted after the loop began + continue + } + for _, deployment := range service.DeploymentIDs { + deploymentMap[deployment] = serviceName + } + } + log.Debugf("Mapped %d deployments from %d services", len(deploymentMap), c.serviceCache.Len()) + + results := map[string]string{} + unmatched := []string{} + for _, taskARN := range taskARNs { + task, ok := c.getCachedTask(taskARN) + if !ok { + // this can happen if we have a failure while describing tasks, just pretend the task doesn't exist + continue + } + if !task.IsServiceManaged() { + continue + } + if serviceName, ok := deploymentMap[task.StartedBy]; ok { + results[taskARN] = serviceName + } else { + unmatched = append(unmatched, taskARN) + } + } - // do these two fetches in parallel - tasks, err := c.getTasks(taskArns) - services := <-servicesChan + log.Debugf("Matched %d from %d tasks, %d unmatched", len(results), len(taskARNs), len(unmatched)) + return results, unmatched +} - if err != nil { - return ecsInfo{}, err +func (c ecsClientImpl) ensureTasksAreCached(taskARNs []string) { + tasksToFetch := []string{} + for _, taskARN := range taskARNs { + if _, err := c.taskCache.Get(taskARN); err != nil { + tasksToFetch = append(tasksToFetch, taskARN) + } + } + if len(tasksToFetch) > 0 { + // This might not fully succeed, but we only try once and ignore any further missing tasks. + c.getTasks(tasksToFetch) } +} - deploymentMap := c.getDeploymentMap(services) +func (c ecsClientImpl) refreshServices(taskServiceMap map[string]string) map[string]bool { + servicesRefreshed := map[string]bool{} + toDescribe := []string{} + for _, serviceName := range taskServiceMap { + if servicesRefreshed[serviceName] { + continue + } + toDescribe = append(toDescribe, serviceName) + servicesRefreshed[serviceName] = true + } + c.describeServices(toDescribe) + return servicesRefreshed +} + +func (c ecsClientImpl) describeAllServices(servicesRefreshed map[string]bool) { + toDescribe := []string{} + for _, serviceName := range c.listServices() { + if !servicesRefreshed[serviceName] { + toDescribe = append(toDescribe, serviceName) + servicesRefreshed[serviceName] = true + } + } + c.describeServices(toDescribe) +} - taskServiceMap := map[string]string{} - for taskArn, task := range tasks { - // Note not all tasks map to a deployment, or we could otherwise mismatch due to races. - // It's safe to just ignore all these cases and consider them "non-service" tasks. - if serviceName, ok := deploymentMap[*task.StartedBy]; ok { - taskServiceMap[taskArn] = serviceName +func (c ecsClientImpl) makeECSInfo(taskARNs []string, taskServiceMap map[string]string) EcsInfo { + // The maps to return are the referenced subsets of the full caches + tasks := map[string]EcsTask{} + for _, taskARN := range taskARNs { + // It's possible that tasks could still be missing from the cache if describe tasks failed. + // We'll just pretend they don't exist. + if task, ok := c.getCachedTask(taskARN); ok { + tasks[taskARN] = task } } - return ecsInfo{services: services, tasks: tasks, taskServiceMap: taskServiceMap}, nil + services := map[string]EcsService{} + for taskARN, serviceName := range taskServiceMap { + if _, ok := taskServiceMap[serviceName]; ok { + // Already present. This is expected since multiple tasks can map to the same service. + continue + } + if service, ok := c.getCachedService(serviceName); ok { + services[serviceName] = service + } else { + log.Errorf("Service %s referenced by task %s in service map but not found in cache, this shouldn't be able to happen. Removing task and continuing.", serviceName, taskARN) + delete(taskServiceMap, taskARN) + } + } + + return EcsInfo{Services: services, Tasks: tasks, TaskServiceMap: taskServiceMap} +} + +// Implements EcsClient.GetInfo +func (c ecsClientImpl) GetInfo(taskARNs []string) EcsInfo { + log.Debugf("Getting ECS info on %d tasks", len(taskARNs)) + + // We do a weird order of operations here to minimize unneeded cache refreshes. + // First, we ensure we have all the tasks we need, and fetch the ones we don't. + // We also mark the tasks as being used here to prevent eviction. + c.ensureTasksAreCached(taskARNs) + + // We're going to do this matching process potentially several times, but that's ok - it's quite cheap. + // First, we want to see how far we get with existing data, and identify the set of services + // we'll need to refresh regardless. + taskServiceMap, unmatched := c.matchTasksServices(taskARNs) + + // In order to ensure service details are fresh, we need to refresh any referenced services + log.Debugf("Refreshing ECS services") + servicesRefreshed := c.refreshServices(taskServiceMap) + + // In refreshing, we may have picked up any new deployment ids. + // If we still have tasks unmatched, we try again. + if len(unmatched) > 0 { + taskServiceMap, unmatched = c.matchTasksServices(taskARNs) + } + + // If we still have tasks unmatched, we'll have to try harder. Get a list of all services and, + // if not already refreshed, fetch them. + log.Debugf("After refreshing services, %d tasks unmatched", len(unmatched)) + if len(unmatched) > 0 { + c.describeAllServices(servicesRefreshed) + + taskServiceMap, unmatched = c.matchTasksServices(taskARNs) + // If we still have unmatched at this point, we don't care - this may be due to partial failures, + // race conditions, and other weirdness. + } + + info := c.makeECSInfo(taskARNs, taskServiceMap) + + return info } diff --git a/probe/awsecs/reporter.go b/probe/awsecs/reporter.go index 9a6e2321cb..95754d4a25 100644 --- a/probe/awsecs/reporter.go +++ b/probe/awsecs/reporter.go @@ -32,14 +32,16 @@ var ( } ) -type taskLabelInfo struct { - containerIDs []string - family string +// TaskLabelInfo is used in return value of GetLabelInfo. Exported for test. +type TaskLabelInfo struct { + ContainerIDs []string + Family string } -// return map from cluster to map of task arns to task infos -func getLabelInfo(rpt report.Report) map[string]map[string]*taskLabelInfo { - results := map[string]map[string]*taskLabelInfo{} +// GetLabelInfo returns map from cluster to map of task arns to task infos. +// Exported for test. +func GetLabelInfo(rpt report.Report) map[string]map[string]*TaskLabelInfo { + results := map[string]map[string]*TaskLabelInfo{} log.Debug("scanning for ECS containers") for nodeID, node := range rpt.Container.Nodes { @@ -60,17 +62,17 @@ func getLabelInfo(rpt report.Report) map[string]map[string]*taskLabelInfo { taskMap, ok := results[cluster] if !ok { - taskMap = map[string]*taskLabelInfo{} + taskMap = map[string]*TaskLabelInfo{} results[cluster] = taskMap } task, ok := taskMap[taskArn] if !ok { - task = &taskLabelInfo{containerIDs: []string{}, family: family} + task = &TaskLabelInfo{ContainerIDs: []string{}, Family: family} taskMap[taskArn] = task } - task.containerIDs = append(task.containerIDs, nodeID) + task.ContainerIDs = append(task.ContainerIDs, nodeID) } log.Debug("Got ECS container info: %v", results) return results @@ -78,20 +80,38 @@ func getLabelInfo(rpt report.Report) map[string]map[string]*taskLabelInfo { // Reporter implements Tagger, Reporter type Reporter struct { + ClientsByCluster map[string]EcsClient // Exported for test + cacheSize int + cacheExpiry time.Duration +} + +// Make creates a new Reporter +func Make(cacheSize int, cacheExpiry time.Duration) Reporter { + return Reporter{ + ClientsByCluster: map[string]EcsClient{}, + cacheSize: cacheSize, + cacheExpiry: cacheExpiry, + } } // Tag needed for Tagger -func (Reporter) Tag(rpt report.Report) (report.Report, error) { +func (r Reporter) Tag(rpt report.Report) (report.Report, error) { rpt = rpt.Copy() - clusterMap := getLabelInfo(rpt) + clusterMap := GetLabelInfo(rpt) for cluster, taskMap := range clusterMap { log.Debugf("Fetching ECS info for cluster %v with %v tasks", cluster, len(taskMap)) - client, err := newClient(cluster) - if err != nil { - return rpt, err + client, ok := r.ClientsByCluster[cluster] + if !ok { + log.Debugf("Creating new ECS client") + var err error + client, err = newClient(cluster, r.cacheSize, r.cacheExpiry) + if err != nil { + return rpt, err + } + r.ClientsByCluster[cluster] = client } taskArns := make([]string, 0, len(taskMap)) @@ -99,24 +119,22 @@ func (Reporter) Tag(rpt report.Report) (report.Report, error) { taskArns = append(taskArns, taskArn) } - ecsInfo, err := client.getInfo(taskArns) - if err != nil { - return rpt, err - } + ecsInfo := client.GetInfo(taskArns) + log.Debugf("Got info from ECS: %d tasks, %d services", len(ecsInfo.Tasks), len(ecsInfo.Services)) // Create all the services first - for serviceName, service := range ecsInfo.services { + for serviceName, service := range ecsInfo.Services { serviceID := report.MakeECSServiceNodeID(serviceName) rpt.ECSService = rpt.ECSService.AddNode(report.MakeNodeWith(serviceID, map[string]string{ Cluster: cluster, - ServiceDesiredCount: fmt.Sprintf("%d", *service.DesiredCount), - ServiceRunningCount: fmt.Sprintf("%d", *service.RunningCount), + ServiceDesiredCount: fmt.Sprintf("%d", service.DesiredCount), + ServiceRunningCount: fmt.Sprintf("%d", service.RunningCount), })) } - log.Debugf("Created %v ECS service nodes", len(ecsInfo.services)) + log.Debugf("Created %v ECS service nodes", len(ecsInfo.Services)) for taskArn, info := range taskMap { - task, ok := ecsInfo.tasks[taskArn] + task, ok := ecsInfo.Tasks[taskArn] if !ok { // can happen due to partial failures, just skip it continue @@ -125,7 +143,7 @@ func (Reporter) Tag(rpt report.Report) (report.Report, error) { // new task node taskID := report.MakeECSTaskNodeID(taskArn) node := report.MakeNodeWith(taskID, map[string]string{ - TaskFamily: info.family, + TaskFamily: info.Family, Cluster: cluster, CreatedAt: task.CreatedAt.Format(time.RFC3339Nano), }) @@ -134,11 +152,11 @@ func (Reporter) Tag(rpt report.Report) (report.Report, error) { // parents sets to merge into all matching container nodes parentsSets := report.MakeSets() parentsSets = parentsSets.Add(report.ECSTask, report.MakeStringSet(taskID)) - if serviceName, ok := ecsInfo.taskServiceMap[taskArn]; ok { + if serviceName, ok := ecsInfo.TaskServiceMap[taskArn]; ok { serviceID := report.MakeECSServiceNodeID(serviceName) parentsSets = parentsSets.Add(report.ECSService, report.MakeStringSet(serviceID)) } - for _, containerID := range info.containerIDs { + for _, containerID := range info.ContainerIDs { if containerNode, ok := rpt.Container.Nodes[containerID]; ok { rpt.Container.Nodes[containerID] = containerNode.WithParents(parentsSets) } else { diff --git a/probe/awsecs/reporter_test.go b/probe/awsecs/reporter_test.go new file mode 100644 index 0000000000..117b5364d0 --- /dev/null +++ b/probe/awsecs/reporter_test.go @@ -0,0 +1,189 @@ +package awsecs_test + +import ( + "reflect" + "testing" + "time" + + "github.com/weaveworks/scope/probe/awsecs" + "github.com/weaveworks/scope/probe/docker" + "github.com/weaveworks/scope/report" +) + +var ( + testCluster = "test-cluster" + testFamily = "test-family" + testTaskARN = "arn:aws:ecs:us-east-1:123456789012:task/12345678-9abc-def0-1234-56789abcdef0" + testTaskCreatedAt = time.Unix(1483228800, 0) + testTaskDefinitionARN = "arn:aws:ecs:us-east-1:123456789012:task-definition/deadbeef-dead-beef-dead-beefdeadbeef" + testTaskStartedAt = time.Unix(1483228805, 0) + testDeploymentID = "ecs-svc/1121123211234321" + testServiceName = "test-service" + testServiceCount = 1 + testContainer = "test-container" + testContainerData = map[string]string{ + docker.LabelPrefix + "com.amazonaws.ecs.task-arn": testTaskARN, + docker.LabelPrefix + "com.amazonaws.ecs.cluster": testCluster, + docker.LabelPrefix + "com.amazonaws.ecs.task-definition-family": testFamily, + } +) + +func getTestContainerNode() report.Node { + return report.MakeNodeWith( + report.MakeContainerNodeID(testContainer), + testContainerData, + ) +} + +func TestGetLabelInfo(t *testing.T) { + r := awsecs.Make(1e6, time.Hour) + rpt, err := r.Report() + if err != nil { + t.Fatalf("Error making report: %v", err) + } + labelInfo := awsecs.GetLabelInfo(rpt) + expected := map[string]map[string]*awsecs.TaskLabelInfo{} + if !reflect.DeepEqual(labelInfo, expected) { + t.Errorf("Empty report did not produce empty label info: %v != %v", labelInfo, expected) + } + + rpt.Container = rpt.Container.AddNode(getTestContainerNode()) + labelInfo = awsecs.GetLabelInfo(rpt) + expected = map[string]map[string]*awsecs.TaskLabelInfo{ + testCluster: { + testTaskARN: { + ContainerIDs: []string{report.MakeContainerNodeID(testContainer)}, + Family: testFamily, + }, + }, + } + if !reflect.DeepEqual(labelInfo, expected) { + t.Errorf("Did not get expected label info: %v != %v", labelInfo, expected) + } +} + +// Implements EcsClient +type mockEcsClient struct { + t *testing.T + expectedARNs []string + info awsecs.EcsInfo +} + +func newMockEcsClient(t *testing.T, expectedARNs []string, info awsecs.EcsInfo) awsecs.EcsClient { + return &mockEcsClient{ + t, + expectedARNs, + info, + } +} + +func (c mockEcsClient) GetInfo(taskARNs []string) awsecs.EcsInfo { + if !reflect.DeepEqual(taskARNs, c.expectedARNs) { + c.t.Fatalf("GetInfo called with wrong ARNs: %v != %v", taskARNs, c.expectedARNs) + } + return c.info +} + +func TestTagReport(t *testing.T) { + r := awsecs.Make(1e6, time.Hour) + + r.ClientsByCluster[testCluster] = newMockEcsClient( + t, + []string{testTaskARN}, + awsecs.EcsInfo{ + Tasks: map[string]awsecs.EcsTask{ + testTaskARN: { + TaskARN: testTaskARN, + CreatedAt: testTaskCreatedAt, + TaskDefinitionARN: testTaskDefinitionARN, + StartedAt: testTaskStartedAt, + StartedBy: testDeploymentID, + }, + }, + Services: map[string]awsecs.EcsService{ + testServiceName: { + ServiceName: testServiceName, + DeploymentIDs: []string{testDeploymentID}, + DesiredCount: 1, + PendingCount: 0, + RunningCount: 1, + TaskDefinitionARN: testTaskDefinitionARN, + }, + }, + TaskServiceMap: map[string]string{ + testTaskARN: testServiceName, + }, + }, + ) + + rpt, err := r.Report() + if err != nil { + t.Fatalf("Error making report") + } + rpt.Container = rpt.Container.AddNode(getTestContainerNode()) + rpt, err = r.Tag(rpt) + if err != nil { + t.Fatalf("Failed to tag: %v", err) + } + + // Check task node is present and contains expected values + task, ok := rpt.ECSTask.Nodes[report.MakeECSTaskNodeID(testTaskARN)] + if !ok { + t.Fatalf("Result report did not contain task %v: %v", testTaskARN, rpt.ECSTask.Nodes) + } + taskExpected := map[string]string{ + awsecs.TaskFamily: testFamily, + awsecs.Cluster: testCluster, + awsecs.CreatedAt: testTaskCreatedAt.Format(time.RFC3339Nano), + } + for key, expectedValue := range taskExpected { + value, ok := task.Latest.Lookup(key) + if !ok { + t.Errorf("Result task did not contain expected key %v: %v", key, task.Latest) + continue + } + if value != expectedValue { + t.Errorf("Result task did not contain expected value for key %v: %v != %v", key, value, expectedValue) + } + } + + // Check service node is present and contains expected values + service, ok := rpt.ECSService.Nodes[report.MakeECSServiceNodeID(testServiceName)] + if !ok { + t.Fatalf("Result report did not contain service %v: %v", testServiceName, rpt.ECSService.Nodes) + } + serviceExpected := map[string]string{ + awsecs.Cluster: testCluster, + awsecs.ServiceDesiredCount: "1", + awsecs.ServiceRunningCount: "1", + } + for key, expectedValue := range serviceExpected { + value, ok := service.Latest.Lookup(key) + if !ok { + t.Errorf("Result service did not contain expected key %v: %v", key, service.Latest) + continue + } + if value != expectedValue { + t.Errorf("Result service did not contain expected value for key %v: %v != %v", key, value, expectedValue) + } + } + + // Check container node is present and contains expected parents + container, ok := rpt.Container.Nodes[report.MakeContainerNodeID(testContainer)] + if !ok { + t.Fatalf("Result report did not contain container %v: %v", testContainer, rpt.Container.Nodes) + } + containerParentsExpected := map[string]string{ + report.ECSTask: report.MakeECSTaskNodeID(testTaskARN), + report.ECSService: report.MakeECSServiceNodeID(testServiceName), + } + for key, expectedValue := range containerParentsExpected { + values, ok := container.Parents.Lookup(key) + if !ok { + t.Errorf("Result container did not have any parents for key %v: %v", key, container.Parents) + } + if !values.Contains(expectedValue) { + t.Errorf("Result container did not contain expected value %v as a parent for key %v: %v", expectedValue, key, values) + } + } +} diff --git a/prog/main.go b/prog/main.go index ac1ca13a79..ed4d65d28a 100644 --- a/prog/main.go +++ b/prog/main.go @@ -104,7 +104,9 @@ type probeFlags struct { kubernetesEnabled bool kubernetesConfig kubernetes.ClientConfig - ecsEnabled bool + ecsEnabled bool + ecsCacheSize int + ecsCacheExpiry time.Duration weaveEnabled bool weaveAddr string @@ -287,6 +289,8 @@ func main() { // AWS ECS flag.BoolVar(&flags.probe.ecsEnabled, "probe.ecs", false, "Collect ecs-related attributes for containers on this node") + flag.IntVar(&flags.probe.ecsCacheSize, "probe.ecs.cache.size", 1024*1024, "Max size of cached info for each ECS cluster") + flag.DurationVar(&flags.probe.ecsCacheExpiry, "probe.ecs.cache.expiry", time.Hour, "How long to keep cached ECS info") // Weave flag.StringVar(&flags.probe.weaveAddr, "probe.weave.addr", "127.0.0.1:6784", "IP address & port of the Weave router") diff --git a/prog/probe.go b/prog/probe.go index f4030ea219..946e93889c 100644 --- a/prog/probe.go +++ b/prog/probe.go @@ -206,7 +206,7 @@ func probeMain(flags probeFlags, targets []appclient.Target) { } if flags.ecsEnabled { - reporter := awsecs.Reporter{} + reporter := awsecs.Make(flags.ecsCacheSize, flags.ecsCacheExpiry) p.AddReporter(reporter) p.AddTagger(reporter) }