diff --git a/app/multitenant/dynamo_collector.go b/app/multitenant/aws_collector.go similarity index 66% rename from app/multitenant/dynamo_collector.go rename to app/multitenant/aws_collector.go index 5f53010596..477e2261b0 100644 --- a/app/multitenant/dynamo_collector.go +++ b/app/multitenant/aws_collector.go @@ -13,9 +13,7 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/dynamodb" - "github.com/aws/aws-sdk-go/service/s3" "github.com/bluele/gcache" - "github.com/bradfitz/gomemcache/memcache" "github.com/nats-io/nats" "github.com/prometheus/client_golang/prometheus" "golang.org/x/net/context" @@ -25,14 +23,12 @@ import ( ) const ( - hourField = "hour" - tsField = "ts" - reportField = "report" - reportCacheSize = (15 / 3) * 10 * 5 // (window size * report rate) * number of hosts per user * number of users - reportCacheExpiration = 15 * time.Second - memcacheExpiration = 15 // seconds - memcacheUpdateInterval = 1 * time.Minute - natsTimeout = 10 * time.Second + hourField = "hour" + tsField = "ts" + reportField = "report" + reportCacheSize = (15 / 3) * 10 * 5 // (window size * report rate) * number of hosts per user * number of users + reportCacheExpiration = 15 * time.Second + natsTimeout = 10 * time.Second ) var ( @@ -70,12 +66,6 @@ var ( Help: "Total compressed size of reports received in bytes.", }) - s3RequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: "scope", - Name: "s3_request_duration_seconds", - Help: "Time in seconds spent doing S3 requests.", - }, []string{"method", "status_code"}) - natsRequests = prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "scope", Name: "nats_requests_total", @@ -90,30 +80,38 @@ func init() { prometheus.MustRegister(inProcessCacheRequests) prometheus.MustRegister(inProcessCacheHits) prometheus.MustRegister(reportSize) - prometheus.MustRegister(s3RequestDuration) prometheus.MustRegister(natsRequests) } -// DynamoDBCollector is a Collector which can also CreateTables -type DynamoDBCollector interface { +// AWSCollector is a Collector which can also CreateTables +type AWSCollector interface { app.Collector CreateTables() error } // ReportStore is a thing that we can get reports from. type ReportStore interface { - FetchReports([]string) ([]report.Report, []string, error) + FetchReports([]string) (map[string]report.Report, []string, error) } -type dynamoDBCollector struct { - userIDer UserIDer - db *dynamodb.DynamoDB - s3 *s3.S3 - tableName string - bucketName string - merger app.Merger - inProcess inProcessStore - memcache *MemcacheClient +// AWSCollectorConfig has everything we need to make an AWS collector. +type AWSCollectorConfig struct { + UserIDer UserIDer + DynamoDBConfig *aws.Config + DynamoTable string + S3Store *S3Store + NatsHost string + MemcacheClient *MemcacheClient +} + +type awsCollector struct { + userIDer UserIDer + db *dynamodb.DynamoDB + s3 *S3Store + tableName string + merger app.Merger + inProcess inProcessStore + memcache *MemcacheClient nats *nats.Conn waitersLock sync.Mutex @@ -134,55 +132,33 @@ type watchKey struct { c chan struct{} } -// NewDynamoDBCollector the reaper of souls +// NewAWSCollector the elastic reaper of souls // https://github.com/aws/aws-sdk-go/wiki/common-examples -func NewDynamoDBCollector( - userIDer UserIDer, - dynamoDBConfig, s3Config *aws.Config, - tableName, bucketName, natsHost, memcachedHost string, - memcachedTimeout time.Duration, memcachedService string, -) (DynamoDBCollector, error) { +func NewAWSCollector(config AWSCollectorConfig) (AWSCollector, error) { var nc *nats.Conn - if natsHost != "" { - var err error - nc, err = nats.Connect(natsHost) - if err != nil { - return nil, err - } - } - - var memcacheClient *MemcacheClient - if memcachedHost != "" { + if config.NatsHost != "" { var err error - memcacheClient, err = NewMemcacheClient(memcachedHost, memcachedTimeout, memcachedService, memcacheUpdateInterval, memcacheExpiration) + nc, err = nats.Connect(config.NatsHost) if err != nil { - // TODO(jml): Ideally, we wouldn't abort here, we would instead - // log errors when we try to use the memcache & fail to do so, as - // aborting here introduces ordering dependencies into our - // deployment. - // - // Note: this error only happens when either the memcachedHost or - // any of the SRV records that it points to fail to resolve. return nil, err } } - return &dynamoDBCollector{ - db: dynamodb.New(session.New(dynamoDBConfig)), - s3: s3.New(session.New(s3Config)), - userIDer: userIDer, - tableName: tableName, - bucketName: bucketName, - merger: app.NewSmartMerger(), - inProcess: newInProcessStore(reportCacheSize, reportCacheExpiration), - memcache: memcacheClient, - nats: nc, - waiters: map[watchKey]*nats.Subscription{}, + return &awsCollector{ + db: dynamodb.New(session.New(config.DynamoDBConfig)), + s3: config.S3Store, + userIDer: config.UserIDer, + tableName: config.DynamoTable, + merger: app.NewSmartMerger(), + inProcess: newInProcessStore(reportCacheSize, reportCacheExpiration), + memcache: config.MemcacheClient, + nats: nc, + waiters: map[watchKey]*nats.Subscription{}, }, nil } -// CreateDynamoDBTables creates the required tables in dynamodb -func (c *dynamoDBCollector) CreateTables() error { +// CreateTables creates the required tables in dynamodb +func (c *awsCollector) CreateTables() error { // see if tableName exists resp, err := c.db.ListTables(&dynamodb.ListTablesInput{ Limit: aws.Int64(10), @@ -234,7 +210,8 @@ func (c *dynamoDBCollector) CreateTables() error { } // getReportKeys gets the s3 keys for reports in this range -func (c *dynamoDBCollector) getReportKeys(rowKey string, start, end time.Time) ([]string, error) { +func (c *awsCollector) getReportKeys(userid string, row int64, start, end time.Time) ([]string, error) { + rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10)) var resp *dynamodb.QueryOutput err := timeRequest("Query", dynamoRequestDuration, func() error { var err error @@ -281,64 +258,15 @@ func (c *dynamoDBCollector) getReportKeys(rowKey string, start, end time.Time) ( return result, nil } -// Fetch multiple reports in parallel from S3. -func (c *dynamoDBCollector) getNonCached(reportKeys []string) ([]report.Report, error) { - type result struct { - key string - report *report.Report - err error - } - - ch := make(chan result, len(reportKeys)) - - for _, reportKey := range reportKeys { - go func(reportKey string) { - r := result{key: reportKey} - r.report, r.err = c.getNonCachedReport(reportKey) - ch <- r - }(reportKey) - } - - reports := []report.Report{} - for range reportKeys { - r := <-ch - if r.err != nil { - return nil, r.err - } - reports = append(reports, *r.report) - c.inProcess.StoreReport(r.key, *r.report) - } - return reports, nil -} - -// Fetch a single report from S3. -func (c *dynamoDBCollector) getNonCachedReport(reportKey string) (*report.Report, error) { - var resp *s3.GetObjectOutput - err := timeRequest("Get", s3RequestDuration, func() error { - var err error - resp, err = c.s3.GetObject(&s3.GetObjectInput{ - Bucket: aws.String(c.bucketName), - Key: aws.String(reportKey), - }) - return err - }) - if err != nil { - return nil, err - } - return report.MakeFromBinary(resp.Body) -} - -func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time.Time) ([]report.Report, error) { - rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10)) - missing, err := c.getReportKeys(rowKey, start, end) - if err != nil { - return nil, err - } +func (c *awsCollector) getReports(reportKeys []string) ([]report.Report, error) { + missing := reportKeys stores := []ReportStore{c.inProcess} if c.memcache != nil { stores = append(stores, c.memcache) } + stores = append(stores, c.s3) + var reports []report.Report for _, store := range stores { if store == nil { @@ -348,69 +276,62 @@ func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time if err != nil { log.Warningf("Error fetching from cache: %v", err) } - reports = append(reports, found...) + for key, report := range found { + c.inProcess.StoreReport(key, report) + reports = append(reports, report) + } if len(missing) == 0 { return reports, nil } } - fetchedReports, err := c.getNonCached(missing) - if err != nil { - return nil, err - } - - return append(reports, fetchedReports...), nil -} - -func memcacheStatusCode(err error) string { - // See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables - switch err { - case nil: - return "200" - case memcache.ErrCacheMiss: - return "404" - case memcache.ErrMalformedKey: - return "400" - default: - return "500" + if len(missing) > 0 { + return nil, fmt.Errorf("Error fetching from s3, still have missing reports: %v", missing) } + return reports, nil } -func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) { +func (c *awsCollector) Report(ctx context.Context) (report.Report, error) { var ( now = time.Now() start = now.Add(-15 * time.Second) rowStart, rowEnd = start.UnixNano() / time.Hour.Nanoseconds(), now.UnixNano() / time.Hour.Nanoseconds() userid, err = c.userIDer(ctx) - reports []report.Report ) if err != nil { return report.MakeReport(), err } // Queries will only every span 2 rows max. + var reportKeys []string if rowStart != rowEnd { - reports1, err := c.getReports(userid, rowStart, start, now) + reportKeys1, err := c.getReportKeys(userid, rowStart, start, now) if err != nil { return report.MakeReport(), err } - reports2, err := c.getReports(userid, rowEnd, start, now) + reportKeys2, err := c.getReportKeys(userid, rowEnd, start, now) if err != nil { return report.MakeReport(), err } - reports = append(reports1, reports2...) + reportKeys = append(reportKeys, reportKeys1...) + reportKeys = append(reportKeys, reportKeys2...) } else { - if reports, err = c.getReports(userid, rowEnd, start, now); err != nil { + if reportKeys, err = c.getReportKeys(userid, rowEnd, start, now); err != nil { return report.MakeReport(), err } } + reports, err := c.getReports(reportKeys) + if err != nil { + return report.MakeReport(), err + } + return c.merger.Merge(reports), nil } -func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error { +func (c *awsCollector) Add(ctx context.Context, rep report.Report) error { userid, err := c.userIDer(ctx) if err != nil { return err @@ -430,24 +351,14 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error { return err } s3Key := fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey) - err = timeRequest("Put", s3RequestDuration, func() error { - var err error - _, err = c.s3.PutObject(&s3.PutObjectInput{ - Body: bytes.NewReader(buf.Bytes()), - Bucket: aws.String(c.bucketName), - Key: aws.String(s3Key), - }) - return err - }) + err = c.s3.StoreBytes(s3Key, buf.Bytes()) if err != nil { return err } // third, put it in memcache if c.memcache != nil { - err = timeRequestStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error { - return c.memcache.StoreBytes(s3Key, buf.Bytes()) - }) + err = c.memcache.StoreBytes(s3Key, buf.Bytes()) if err != nil { // NOTE: We don't abort here because failing to store in memcache // doesn't actually break anything else -- it's just an @@ -499,7 +410,7 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error { return nil } -func (c *dynamoDBCollector) WaitOn(ctx context.Context, waiter chan struct{}) { +func (c *awsCollector) WaitOn(ctx context.Context, waiter chan struct{}) { userid, err := c.userIDer(ctx) if err != nil { log.Errorf("Error getting user id in WaitOn: %v", err) @@ -540,7 +451,7 @@ func (c *dynamoDBCollector) WaitOn(ctx context.Context, waiter chan struct{}) { }() } -func (c *dynamoDBCollector) UnWait(ctx context.Context, waiter chan struct{}) { +func (c *awsCollector) UnWait(ctx context.Context, waiter chan struct{}) { userid, err := c.userIDer(ctx) if err != nil { log.Errorf("Error getting user id in WaitOn: %v", err) @@ -574,13 +485,13 @@ func newInProcessStore(size int, expiration time.Duration) inProcessStore { } // FetchReports retrieves the given reports from the store. -func (c inProcessStore) FetchReports(keys []string) ([]report.Report, []string, error) { - found := []report.Report{} +func (c inProcessStore) FetchReports(keys []string) (map[string]report.Report, []string, error) { + found := map[string]report.Report{} missing := []string{} for _, key := range keys { rpt, err := c.cache.Get(key) if err == nil { - found = append(found, rpt.(report.Report)) + found[key] = rpt.(report.Report) } else { missing = append(missing, key) } diff --git a/app/multitenant/memcache_client.go b/app/multitenant/memcache_client.go index 2dcd4b14c9..2b311c8b53 100644 --- a/app/multitenant/memcache_client.go +++ b/app/multitenant/memcache_client.go @@ -119,8 +119,22 @@ func (c *MemcacheClient) updateMemcacheServers() error { return c.serverList.SetServers(servers...) } +func memcacheStatusCode(err error) string { + // See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables + switch err { + case nil: + return "200" + case memcache.ErrCacheMiss: + return "404" + case memcache.ErrMalformedKey: + return "400" + default: + return "500" + } +} + // FetchReports gets reports from memcache. -func (c *MemcacheClient) FetchReports(keys []string) ([]report.Report, []string, error) { +func (c *MemcacheClient) FetchReports(keys []string) (map[string]report.Report, []string, error) { var found map[string]*memcache.Item err := timeRequestStatus("Get", memcacheRequestDuration, memcacheStatusCode, func() error { var err error @@ -151,17 +165,17 @@ func (c *MemcacheClient) FetchReports(keys []string) ([]report.Report, []string, ch <- result{key: key} return } - ch <- result{report: rep} + ch <- result{key: key, report: rep} }(key) } - var reports []report.Report + var reports map[string]report.Report for i := 0; i < len(keys)-len(missing); i++ { r := <-ch if r.report == nil { missing = append(missing, r.key) } else { - reports = append(reports, *r.report) + reports[r.key] = *r.report } } @@ -172,6 +186,8 @@ func (c *MemcacheClient) FetchReports(keys []string) ([]report.Report, []string, // StoreBytes stores a report, expecting the report to be serialized already. func (c *MemcacheClient) StoreBytes(key string, content []byte) error { - item := memcache.Item{Key: key, Value: content, Expiration: c.expiration} - return c.client.Set(&item) + return timeRequestStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error { + item := memcache.Item{Key: key, Value: content, Expiration: c.expiration} + return c.client.Set(&item) + }) } diff --git a/app/multitenant/s3_client.go b/app/multitenant/s3_client.go new file mode 100644 index 0000000000..4cb9acd0e7 --- /dev/null +++ b/app/multitenant/s3_client.go @@ -0,0 +1,96 @@ +package multitenant + +import ( + "bytes" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" + "github.com/prometheus/client_golang/prometheus" + + "github.com/weaveworks/scope/report" +) + +var ( + s3RequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ + Namespace: "scope", + Name: "s3_request_duration_seconds", + Help: "Time in seconds spent doing S3 requests.", + }, []string{"method", "status_code"}) +) + +// S3Store is an S3 client that stores and retrieves Reports. +type S3Store struct { + s3 *s3.S3 + bucketName string +} + +func init() { + prometheus.MustRegister(s3RequestDuration) +} + +// NewS3Client creates a new S3 client. +func NewS3Client(config *aws.Config, bucketName string) S3Store { + return S3Store{ + s3: s3.New(session.New(config)), + bucketName: bucketName, + } +} + +// FetchReports fetches multiple reports in parallel from S3. +func (store *S3Store) FetchReports(keys []string) (map[string]report.Report, []string, error) { + type result struct { + key string + report *report.Report + err error + } + + ch := make(chan result, len(keys)) + + for _, key := range keys { + go func(key string) { + r := result{key: key} + r.report, r.err = store.fetchReport(key) + ch <- r + }(key) + } + + reports := map[string]report.Report{} + for range keys { + r := <-ch + if r.err != nil { + return nil, []string{}, r.err + } + reports[r.key] = *r.report + } + return reports, []string{}, nil +} + +func (store *S3Store) fetchReport(key string) (*report.Report, error) { + var resp *s3.GetObjectOutput + err := timeRequest("Get", s3RequestDuration, func() error { + var err error + resp, err = store.s3.GetObject(&s3.GetObjectInput{ + Bucket: aws.String(store.bucketName), + Key: aws.String(key), + }) + return err + }) + if err != nil { + return nil, err + } + return report.MakeFromBinary(resp.Body) +} + +// StoreBytes stores a report in S3, expecting the report to be serialized +// already. +func (store *S3Store) StoreBytes(key string, content []byte) error { + return timeRequest("Put", s3RequestDuration, func() error { + _, err := store.s3.PutObject(&s3.PutObjectInput{ + Body: bytes.NewReader(content), + Bucket: aws.String(store.bucketName), + Key: aws.String(key), + }) + return err + }) +} diff --git a/prog/app.go b/prog/app.go index e75ce98b98..1fd1e008b4 100644 --- a/prog/app.go +++ b/prog/app.go @@ -27,6 +27,11 @@ import ( "github.com/weaveworks/scope/probe/docker" ) +const ( + memcacheExpiration = 15 // seconds + memcacheUpdateInterval = 1 * time.Minute +) + var ( requestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Namespace: "scope", @@ -99,21 +104,46 @@ func collectorFactory(userIDer multitenant.UserIDer, collectorURL, s3URL, natsHo if err != nil { return nil, err } - tableName := strings.TrimPrefix(parsed.Path, "/") bucketName := strings.TrimPrefix(s3.Path, "/") - dynamoCollector, err := multitenant.NewDynamoDBCollector( - userIDer, dynamoDBConfig, s3Config, tableName, bucketName, natsHostname, - memcachedHostname, memcachedTimeout, memcachedService, + tableName := strings.TrimPrefix(parsed.Path, "/") + s3Store := multitenant.NewS3Client(s3Config, bucketName) + var memcacheClient *multitenant.MemcacheClient + if memcachedHostname != "" { + memcacheClient, err = multitenant.NewMemcacheClient( + memcachedHostname, memcachedTimeout, memcachedService, + memcacheUpdateInterval, memcacheExpiration, + ) + if err != nil { + // TODO(jml): Ideally, we wouldn't abort here, we would instead + // log errors when we try to use the memcache & fail to do so, as + // aborting here introduces ordering dependencies into our + // deployment. + // + // Note: this error only happens when either the memcachedHost + // or any of the SRV records that it points to fail to + // resolve. + return nil, err + } + } + awsCollector, err := multitenant.NewAWSCollector( + multitenant.AWSCollectorConfig{ + UserIDer: userIDer, + DynamoDBConfig: dynamoDBConfig, + DynamoTable: tableName, + S3Store: &s3Store, + NatsHost: natsHostname, + MemcacheClient: memcacheClient, + }, ) if err != nil { return nil, err } if createTables { - if err := dynamoCollector.CreateTables(); err != nil { + if err := awsCollector.CreateTables(); err != nil { return nil, err } } - return dynamoCollector, nil + return awsCollector, nil } return nil, fmt.Errorf("Invalid collector '%s'", collectorURL)