Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor caching layers in dynamo collector #1616

Merged
merged 8 commits into from
Jun 30, 2016
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 56 additions & 149 deletions app/multitenant/dynamo_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/bluele/gcache"
"github.com/bradfitz/gomemcache/memcache"
"github.com/nats-io/nats"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/net/context"
Expand All @@ -25,14 +23,12 @@ import (
)

const (
hourField = "hour"
tsField = "ts"
reportField = "report"
reportCacheSize = (15 / 3) * 10 * 5 // (window size * report rate) * number of hosts per user * number of users
reportCacheExpiration = 15 * time.Second
memcacheExpiration = 15 // seconds
memcacheUpdateInterval = 1 * time.Minute
natsTimeout = 10 * time.Second
hourField = "hour"
tsField = "ts"
reportField = "report"
reportCacheSize = (15 / 3) * 10 * 5 // (window size * report rate) * number of hosts per user * number of users
reportCacheExpiration = 15 * time.Second
natsTimeout = 10 * time.Second
)

var (
Expand Down Expand Up @@ -70,12 +66,6 @@ var (
Help: "Total compressed size of reports received in bytes.",
})

s3RequestDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Namespace: "scope",
Name: "s3_request_duration_seconds",
Help: "Time in seconds spent doing S3 requests.",
}, []string{"method", "status_code"})

natsRequests = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "scope",
Name: "nats_requests_total",
Expand All @@ -90,7 +80,6 @@ func init() {
prometheus.MustRegister(inProcessCacheRequests)
prometheus.MustRegister(inProcessCacheHits)
prometheus.MustRegister(reportSize)
prometheus.MustRegister(s3RequestDuration)
prometheus.MustRegister(natsRequests)
}

Expand All @@ -102,18 +91,17 @@ type DynamoDBCollector interface {

// ReportStore is a thing that we can get reports from.
type ReportStore interface {
FetchReports([]string) ([]report.Report, []string, error)
FetchReports([]string) (map[string]report.Report, []string, error)
}

type dynamoDBCollector struct {

This comment was marked as abuse.

This comment was marked as abuse.

userIDer UserIDer
db *dynamodb.DynamoDB
s3 *s3.S3
tableName string
bucketName string
merger app.Merger
inProcess inProcessStore
memcache *MemcacheClient
userIDer UserIDer
db *dynamodb.DynamoDB
s3 *S3Store
tableName string
merger app.Merger
inProcess inProcessStore
memcache *MemcacheClient

nats *nats.Conn
waitersLock sync.Mutex
Expand All @@ -138,9 +126,10 @@ type watchKey struct {
// https://github.com/aws/aws-sdk-go/wiki/common-examples
func NewDynamoDBCollector(
userIDer UserIDer,
dynamoDBConfig, s3Config *aws.Config,
tableName, bucketName, natsHost, memcachedHost string,
memcachedTimeout time.Duration, memcachedService string,
dynamoDBConfig *aws.Config, tableName string,
s3Store *S3Store,
natsHost string,
memcacheClient *MemcacheClient,

This comment was marked as abuse.

This comment was marked as abuse.

) (DynamoDBCollector, error) {
var nc *nats.Conn
if natsHost != "" {
Expand All @@ -151,33 +140,16 @@ func NewDynamoDBCollector(
}
}

var memcacheClient *MemcacheClient
if memcachedHost != "" {
var err error
memcacheClient, err = NewMemcacheClient(memcachedHost, memcachedTimeout, memcachedService, memcacheUpdateInterval, memcacheExpiration)
if err != nil {
// TODO(jml): Ideally, we wouldn't abort here, we would instead
// log errors when we try to use the memcache & fail to do so, as
// aborting here introduces ordering dependencies into our
// deployment.
//
// Note: this error only happens when either the memcachedHost or
// any of the SRV records that it points to fail to resolve.
return nil, err
}
}

return &dynamoDBCollector{
db: dynamodb.New(session.New(dynamoDBConfig)),
s3: s3.New(session.New(s3Config)),
userIDer: userIDer,
tableName: tableName,
bucketName: bucketName,
merger: app.NewSmartMerger(),
inProcess: newInProcessStore(reportCacheSize, reportCacheExpiration),
memcache: memcacheClient,
nats: nc,
waiters: map[watchKey]*nats.Subscription{},
db: dynamodb.New(session.New(dynamoDBConfig)),
s3: s3Store,
userIDer: userIDer,
tableName: tableName,
merger: app.NewSmartMerger(),
inProcess: newInProcessStore(reportCacheSize, reportCacheExpiration),
memcache: memcacheClient,
nats: nc,
waiters: map[watchKey]*nats.Subscription{},
}, nil
}

Expand Down Expand Up @@ -234,7 +206,8 @@ func (c *dynamoDBCollector) CreateTables() error {
}

// getReportKeys gets the s3 keys for reports in this range
func (c *dynamoDBCollector) getReportKeys(rowKey string, start, end time.Time) ([]string, error) {
func (c *dynamoDBCollector) getReportKeys(userid string, row int64, start, end time.Time) ([]string, error) {
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
var resp *dynamodb.QueryOutput
err := timeRequest("Query", dynamoRequestDuration, func() error {
var err error
Expand Down Expand Up @@ -281,64 +254,15 @@ func (c *dynamoDBCollector) getReportKeys(rowKey string, start, end time.Time) (
return result, nil
}

// Fetch multiple reports in parallel from S3.
func (c *dynamoDBCollector) getNonCached(reportKeys []string) ([]report.Report, error) {
type result struct {
key string
report *report.Report
err error
}

ch := make(chan result, len(reportKeys))

for _, reportKey := range reportKeys {
go func(reportKey string) {
r := result{key: reportKey}
r.report, r.err = c.getNonCachedReport(reportKey)
ch <- r
}(reportKey)
}

reports := []report.Report{}
for range reportKeys {
r := <-ch
if r.err != nil {
return nil, r.err
}
reports = append(reports, *r.report)
c.inProcess.StoreReport(r.key, *r.report)
}
return reports, nil
}

// Fetch a single report from S3.
func (c *dynamoDBCollector) getNonCachedReport(reportKey string) (*report.Report, error) {
var resp *s3.GetObjectOutput
err := timeRequest("Get", s3RequestDuration, func() error {
var err error
resp, err = c.s3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(c.bucketName),
Key: aws.String(reportKey),
})
return err
})
if err != nil {
return nil, err
}
return report.MakeFromBinary(resp.Body)
}

func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time.Time) ([]report.Report, error) {
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
missing, err := c.getReportKeys(rowKey, start, end)
if err != nil {
return nil, err
}
func (c *dynamoDBCollector) getReports(reportKeys []string) ([]report.Report, error) {
missing := reportKeys

stores := []ReportStore{c.inProcess}
if c.memcache != nil {
stores = append(stores, c.memcache)
}
stores = append(stores, c.s3)

var reports []report.Report
for _, store := range stores {
if store == nil {
Expand All @@ -348,32 +272,19 @@ func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time
if err != nil {
log.Warningf("Error fetching from cache: %v", err)
}
reports = append(reports, found...)
for key, report := range found {
c.inProcess.StoreReport(key, report)
reports = append(reports, report)
}
if len(missing) == 0 {
return reports, nil
}
}

fetchedReports, err := c.getNonCached(missing)
if err != nil {
return nil, err
}

return append(reports, fetchedReports...), nil
}

func memcacheStatusCode(err error) string {
// See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables
switch err {
case nil:
return "200"
case memcache.ErrCacheMiss:
return "404"
case memcache.ErrMalformedKey:
return "400"
default:
return "500"
if len(missing) > 0 {
return nil, fmt.Errorf("Error fetching from s3, still have missing reports: %v", missing)
}
return reports, nil
}

func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) {
Expand All @@ -382,31 +293,37 @@ func (c *dynamoDBCollector) Report(ctx context.Context) (report.Report, error) {
start = now.Add(-15 * time.Second)
rowStart, rowEnd = start.UnixNano() / time.Hour.Nanoseconds(), now.UnixNano() / time.Hour.Nanoseconds()
userid, err = c.userIDer(ctx)
reports []report.Report
)
if err != nil {
return report.MakeReport(), err
}

// Queries will only every span 2 rows max.
var reportKeys []string
if rowStart != rowEnd {
reports1, err := c.getReports(userid, rowStart, start, now)
reportKeys1, err := c.getReportKeys(userid, rowStart, start, now)
if err != nil {
return report.MakeReport(), err
}

reports2, err := c.getReports(userid, rowEnd, start, now)
reportKeys2, err := c.getReportKeys(userid, rowEnd, start, now)
if err != nil {
return report.MakeReport(), err
}

reports = append(reports1, reports2...)
reportKeys = append(reportKeys, reportKeys1...)
reportKeys = append(reportKeys, reportKeys2...)
} else {
if reports, err = c.getReports(userid, rowEnd, start, now); err != nil {
if reportKeys, err = c.getReportKeys(userid, rowEnd, start, now); err != nil {
return report.MakeReport(), err
}
}

reports, err := c.getReports(reportKeys)
if err != nil {
return report.MakeReport(), err
}

return c.merger.Merge(reports), nil
}

Expand All @@ -430,24 +347,14 @@ func (c *dynamoDBCollector) Add(ctx context.Context, rep report.Report) error {
return err
}
s3Key := fmt.Sprintf("%x/%s", rowKeyHash.Sum(nil), colKey)
err = timeRequest("Put", s3RequestDuration, func() error {
var err error
_, err = c.s3.PutObject(&s3.PutObjectInput{
Body: bytes.NewReader(buf.Bytes()),
Bucket: aws.String(c.bucketName),
Key: aws.String(s3Key),
})
return err
})
err = c.s3.StoreBytes(s3Key, buf.Bytes())
if err != nil {
return err
}

// third, put it in memcache
if c.memcache != nil {
err = timeRequestStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error {
return c.memcache.StoreBytes(s3Key, buf.Bytes())
})
err = c.memcache.StoreBytes(s3Key, buf.Bytes())
if err != nil {
// NOTE: We don't abort here because failing to store in memcache
// doesn't actually break anything else -- it's just an
Expand Down Expand Up @@ -574,13 +481,13 @@ func newInProcessStore(size int, expiration time.Duration) inProcessStore {
}

// FetchReports retrieves the given reports from the store.
func (c inProcessStore) FetchReports(keys []string) ([]report.Report, []string, error) {
found := []report.Report{}
func (c inProcessStore) FetchReports(keys []string) (map[string]report.Report, []string, error) {
found := map[string]report.Report{}
missing := []string{}
for _, key := range keys {
rpt, err := c.cache.Get(key)
if err == nil {
found = append(found, rpt.(report.Report))
found[key] = rpt.(report.Report)
} else {
missing = append(missing, key)
}
Expand Down
28 changes: 22 additions & 6 deletions app/multitenant/memcache_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,22 @@ func (c *MemcacheClient) updateMemcacheServers() error {
return c.serverList.SetServers(servers...)
}

func memcacheStatusCode(err error) string {
// See https://godoc.org/github.com/bradfitz/gomemcache/memcache#pkg-variables
switch err {
case nil:
return "200"
case memcache.ErrCacheMiss:
return "404"
case memcache.ErrMalformedKey:
return "400"
default:
return "500"
}
}

// FetchReports gets reports from memcache.
func (c *MemcacheClient) FetchReports(keys []string) ([]report.Report, []string, error) {
func (c *MemcacheClient) FetchReports(keys []string) (map[string]report.Report, []string, error) {
var found map[string]*memcache.Item
err := timeRequestStatus("Get", memcacheRequestDuration, memcacheStatusCode, func() error {
var err error
Expand Down Expand Up @@ -151,17 +165,17 @@ func (c *MemcacheClient) FetchReports(keys []string) ([]report.Report, []string,
ch <- result{key: key}
return
}
ch <- result{report: rep}
ch <- result{key: key, report: rep}
}(key)
}

var reports []report.Report
var reports map[string]report.Report
for i := 0; i < len(keys)-len(missing); i++ {
r := <-ch
if r.report == nil {
missing = append(missing, r.key)
} else {
reports = append(reports, *r.report)
reports[r.key] = *r.report
}
}

Expand All @@ -172,6 +186,8 @@ func (c *MemcacheClient) FetchReports(keys []string) ([]report.Report, []string,

// StoreBytes stores a report, expecting the report to be serialized already.
func (c *MemcacheClient) StoreBytes(key string, content []byte) error {
item := memcache.Item{Key: key, Value: content, Expiration: c.expiration}
return c.client.Set(&item)
return timeRequestStatus("Put", memcacheRequestDuration, memcacheStatusCode, func() error {
item := memcache.Item{Key: key, Value: content, Expiration: c.expiration}
return c.client.Set(&item)
})
}
Loading