-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathetcd.go
278 lines (225 loc) · 7.72 KB
/
etcd.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
package store
import (
"context"
"fmt"
"os"
"strings"
"sync"
"time"
log "github.com/sirupsen/logrus"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)
const (
ETCD_TIMEOUT = 3 * time.Second
ETCD_SESSION_TTL = 3 // In seconds
ETCD_SCRIPT_KEY_PREFIX = "msgscript/scripts"
ETCD_LIBRARY_KEY_PREFIX = "msgscript/libs"
)
// EtcdScriptStore stores Lua scripts in etcd, supporting multiple scripts per subject
type EtcdScriptStore struct {
client *clientv3.Client
prefix string
mutexes sync.Map
}
func EtcdClient(endpoints string) (*clientv3.Client, error) {
if e := os.Getenv("ETCD_ENDPOINTS"); e != "" {
endpoints = e
}
// HACK: instead of using a global or carry over the variable everywhere,
// we set the environment variable if it's not defined
if os.Getenv("ETCD_ENDPOINTS") == "" {
err := os.Setenv("ETCD_ENDPOINTS", endpoints)
if err != nil {
return nil, fmt.Errorf("failed to set ETCD_ENDPOINTS environment variable")
}
}
return clientv3.New(clientv3.Config{
Endpoints: etcdEndpoints(endpoints),
DialTimeout: ETCD_TIMEOUT,
})
}
func etcdEndpoints(endpoints string) []string {
return strings.Split(endpoints, ",")
}
// NewEtcdScriptStore creates a new instance of EtcdScriptStore
func NewEtcdScriptStore(endpoints string) (*EtcdScriptStore, error) {
log.Debugf("Attempting to connect to etcd @ %s", endpoints)
client, err := EtcdClient(endpoints)
if err != nil {
return nil, fmt.Errorf("failed to connect to etcd: %v", err)
}
log.Debugf("Connected to etcd @ %s", strings.Join(client.Endpoints(), ","))
return &EtcdScriptStore{
client: client,
prefix: ETCD_SCRIPT_KEY_PREFIX,
mutexes: sync.Map{},
}, nil
}
func (e *EtcdScriptStore) getKey(subject, name string) string {
return strings.Join([]string{e.prefix, subject, name}, "/")
}
// AddScript adds a new Lua script under the given subject with a unique ID
func (e *EtcdScriptStore) AddScript(ctx context.Context, subject, name string, script []byte) error {
key := e.getKey(subject, name)
// Store script in etcd
_, err := e.client.Put(ctx, key, string(script))
if err != nil {
return fmt.Errorf("failed to add script for subject '%s': %v", subject, err)
}
log.Debugf("Script added for subject %s named %s", subject, name)
return nil
}
// GetScripts retrieves all scripts associated with a subject
func (e *EtcdScriptStore) GetScripts(ctx context.Context, subject string) (map[string][]byte, error) {
keyPrefix := strings.Join([]string{e.prefix, subject}, "/")
// Fetch all scripts under the subject's prefix
resp, err := e.client.Get(ctx, keyPrefix, clientv3.WithPrefix())
if err != nil {
return nil, fmt.Errorf("failed to get scripts for subject '%s': %v", subject, err)
}
scripts := make(map[string][]byte)
for _, kv := range resp.Kvs {
scripts[string(kv.Key)] = kv.Value
}
log.Debugf("Retrieved %d scripts for subject %s", len(scripts), subject)
return scripts, nil
}
// DeleteScript deletes a specific Lua script for a subject by its name
func (e *EtcdScriptStore) DeleteScript(ctx context.Context, subject, name string) error {
key := fmt.Sprintf("%s/%s/%s", e.prefix, subject, name)
// Delete script from etcd
_, err := e.client.Delete(ctx, key)
if err != nil {
return fmt.Errorf("failed to delete script for subject '%s' with ID '%s': %v", subject, name, err)
}
log.Debugf("Deleted script for subject %s with ID %s", subject, name)
return nil
}
// WatchScripts watches for changes to scripts for a specific subject
func (e *EtcdScriptStore) WatchScripts(ctx context.Context, subject string, onChange func(subject, name string, script []byte, deleted bool)) {
keyPrefix := fmt.Sprintf("%s/%s/", e.prefix, subject)
watchChan := e.client.Watch(ctx, keyPrefix, clientv3.WithPrefix())
for watchResp := range watchChan {
for _, ev := range watchResp.Events {
name := string(ev.Kv.Key[len(keyPrefix):])
switch ev.Type {
case clientv3.EventTypePut:
log.Debugf("Script added/updated for subject: %s, ID: %s", subject, name)
onChange(subject, name, ev.Kv.Value, false)
case clientv3.EventTypeDelete:
log.Debugf("Script deleted for subject: %s, ID: %s", subject, name)
onChange(subject, name, nil, true)
}
}
}
}
func (e *EtcdScriptStore) acquireLock(ctx context.Context, lockKey string, ttl int) (*concurrency.Mutex, error) {
// Create a lease
sess, err := concurrency.NewSession(e.client, concurrency.WithTTL(ttl), concurrency.WithContext(ctx))
if err != nil {
return nil, fmt.Errorf("failed to create session: %v", err)
}
fields := log.Fields{
"lockKey": lockKey,
}
log.WithFields(fields).Debugf("etcdStore: Acquiring lock")
l := concurrency.NewMutex(sess, lockKey)
err = l.TryLock(ctx)
if err != nil {
if err == context.Canceled {
return nil, concurrency.ErrLocked
}
return nil, err
}
log.WithFields(fields).Debug("etcdStore: Acquired lock")
return l, nil
}
func (e *EtcdScriptStore) ReleaseLock(ctx context.Context, path string) error {
fields := log.Fields{
"path": path,
}
v, ok := e.mutexes.Load(path)
if !ok {
// We don't have a lock for that path
log.WithFields(fields).Debug("etcdStore: failed to find a locking mutex for timer")
return nil
}
l := v.(*lock)
err := l.Mutex.Unlock(ctx)
if err != nil {
return fmt.Errorf("etcdStore: failed to release lock: %v", err)
}
log.WithFields(fields).Debug("etcdStore: Released the lock")
// Stop the timer
l.Timer.Stop()
e.mutexes.Delete(path)
return nil
}
type lock struct {
Mutex *concurrency.Mutex
Timer *time.Timer
}
func (e *EtcdScriptStore) TakeLock(ctx context.Context, path string) (bool, error) {
lockKey := path + "_lock"
mu, err := e.acquireLock(ctx, lockKey, ETCD_SESSION_TTL)
if err != nil {
if err == concurrency.ErrLocked {
return false, fmt.Errorf("already locked")
}
return false, fmt.Errorf("failed to get lock on key %s: %v", lockKey, err)
}
// Remove the mutex from the map after 1 second more than the session's TTL in case it's never unlocked
timer := time.AfterFunc((ETCD_SESSION_TTL+1)*time.Second, func() {
log.WithField("path", path).Debug("Releasing lock on timeout")
e.ReleaseLock(context.Background(), lockKey)
})
e.mutexes.Store(path, &lock{
Mutex: mu,
Timer: timer,
})
return true, nil
}
func (e *EtcdScriptStore) ListSubjects(ctx context.Context) ([]string, error) {
resp, err := e.client.KV.Get(ctx, ETCD_SCRIPT_KEY_PREFIX, clientv3.WithPrefix())
if err != nil {
return nil, fmt.Errorf("failed to list keys: %v", err)
}
var subjects []string
for _, kv := range resp.Kvs {
ss := strings.Split(strings.Replace(string(kv.Key), ETCD_SCRIPT_KEY_PREFIX, "", 1), "/")
subjects = append(subjects, ss[1])
}
return subjects, nil
}
func (e *EtcdScriptStore) LoadLibrairies(ctx context.Context, libraryPaths []string) ([][]byte, error) {
var libraries [][]byte
for _, path := range libraryPaths {
key := strings.Join([]string{ETCD_LIBRARY_KEY_PREFIX, path}, "/")
resp, err := e.client.Get(ctx, key)
if err != nil {
return nil, fmt.Errorf("failed to read key %s: %v", key, err)
}
if len(resp.Kvs) != 1 {
return nil, fmt.Errorf("key %s doesn't exists", key)
}
libraries = append(libraries, resp.Kvs[0].Value)
}
return libraries, nil
}
func (e *EtcdScriptStore) AddLibrary(ctx context.Context, content []byte, path string) error {
key := strings.Join([]string{ETCD_LIBRARY_KEY_PREFIX, path}, "/")
_, err := e.client.Put(ctx, key, string(content))
if err != nil {
return fmt.Errorf("failed to store library key %s: %v", key, err)
}
return nil
}
func (e *EtcdScriptStore) RemoveLibrary(ctx context.Context, path string) error {
key := strings.Join([]string{ETCD_LIBRARY_KEY_PREFIX, path}, "/")
_, err := e.client.Delete(ctx, key)
if err != nil {
return fmt.Errorf("failed to delete library key %s: %v", key, err)
}
return nil
}