-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathdynamodb_big_segments_impl.go
128 lines (113 loc) · 3.97 KB
/
dynamodb_big_segments_impl.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package lddynamodb
import (
"context"
"errors"
"github.com/launchdarkly/go-sdk-common/v3/ldlog"
"github.com/launchdarkly/go-sdk-common/v3/ldtime"
"github.com/launchdarkly/go-server-sdk/v7/subsystems"
"github.com/launchdarkly/go-server-sdk/v7/subsystems/ldstoreimpl"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
"github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
)
const (
bigSegmentsMetadataKey = "big_segments_metadata"
bigSegmentsUserDataKey = "big_segments_user"
bigSegmentsSyncTimeAttr = "synchronizedOn"
bigSegmentsIncludedAttr = "included"
bigSegmentsExcludedAttr = "excluded"
)
// Internal implementation of the BigSegmentStore interface for DynamoDB.
type dynamoDBBigSegmentStoreImpl struct {
client *dynamodb.Client
context context.Context
cancelContext func()
table string
prefix string
loggers ldlog.Loggers
}
func newDynamoDBBigSegmentStoreImpl(
builder builderOptions,
loggers ldlog.Loggers,
) (*dynamoDBBigSegmentStoreImpl, error) {
if builder.table == "" {
return nil, errors.New("table name is required")
}
client, context, cancelContext, err := makeClientAndContext(builder)
if err != nil {
return nil, err
}
store := &dynamoDBBigSegmentStoreImpl{
client: client,
context: context,
cancelContext: cancelContext,
table: builder.table,
prefix: builder.prefix,
loggers: loggers, // copied by value so we can modify it
}
store.loggers.SetPrefix("DynamoDBBigSegmentStoreStore:")
store.loggers.Infof(`Using DynamoDB table %s`, store.table)
return store, nil
}
func (store *dynamoDBBigSegmentStoreImpl) GetMetadata() (subsystems.BigSegmentStoreMetadata, error) {
key := prefixedNamespace(store.prefix, bigSegmentsMetadataKey)
result, err := store.client.GetItem(store.context, &dynamodb.GetItemInput{
TableName: aws.String(store.table),
ConsistentRead: aws.Bool(true),
Key: map[string]types.AttributeValue{
tablePartitionKey: attrValueOfString(key),
tableSortKey: attrValueOfString(key),
},
})
if err != nil {
return subsystems.BigSegmentStoreMetadata{}, err // COVERAGE: can't cause this in unit tests
}
if len(result.Item) == 0 {
// this is just a "not found" result, not a database error
return subsystems.BigSegmentStoreMetadata{}, nil
}
value := attrValueToUint64(result.Item[bigSegmentsSyncTimeAttr])
if value == 0 {
return subsystems.BigSegmentStoreMetadata{}, nil // COVERAGE: can't cause this in unit tests
}
return subsystems.BigSegmentStoreMetadata{
LastUpToDate: ldtime.UnixMillisecondTime(value),
}, nil
}
func (store *dynamoDBBigSegmentStoreImpl) GetMembership(
contextHashKey string,
) (subsystems.BigSegmentMembership, error) {
result, err := store.client.GetItem(store.context, &dynamodb.GetItemInput{
TableName: aws.String(store.table),
ConsistentRead: aws.Bool(true),
Key: map[string]types.AttributeValue{
tablePartitionKey: attrValueOfString(prefixedNamespace(store.prefix, bigSegmentsUserDataKey)),
tableSortKey: attrValueOfString(contextHashKey),
},
})
if err != nil {
return nil, err // COVERAGE: can't cause this in unit tests
}
if len(result.Item) == 0 {
return ldstoreimpl.NewBigSegmentMembershipFromSegmentRefs(nil, nil), nil
}
includedRefs := getStringListFromSet(result.Item[bigSegmentsIncludedAttr])
excludedRefs := getStringListFromSet(result.Item[bigSegmentsExcludedAttr])
return ldstoreimpl.NewBigSegmentMembershipFromSegmentRefs(includedRefs, excludedRefs), nil
}
func getStringListFromSet(value types.AttributeValue) []string {
if ss, ok := value.(*types.AttributeValueMemberSS); ok {
return ss.Value
}
return nil
}
func (store *dynamoDBBigSegmentStoreImpl) Close() error {
store.cancelContext() // stops any pending operations
return nil
}
func prefixedNamespace(prefix, baseNamespace string) string {
if prefix == "" {
return baseNamespace
}
return prefix + ":" + baseNamespace // COVERAGE: currently the standard test suite doesn't specify a prefix
}