forked from elastic/beats
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaction_store.go
189 lines (157 loc) · 5.19 KB
/
action_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package application
import (
"context"
"fmt"
"io"
yaml "gopkg.in/yaml.v2"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
)
// actionStore receives multiples actions to persist to disk, the implementation of the store only
// take care of action policy change every other action are discarded. The store will only keep the
// last good action on disk, we assume that the action is added to the store after it was ACK with
// Fleet. The store is not threadsafe.
type actionStore struct {
log *logger.Logger
store storeLoad
dirty bool
action action
}
func newActionStore(log *logger.Logger, store storeLoad) (*actionStore, error) {
// If the store exists we will read it, if any errors is returned we assume we do not have anything
// persisted and we return an empty store.
reader, err := store.Load()
if err != nil {
return &actionStore{log: log, store: store}, nil
}
defer reader.Close()
var action actionConfigChangeSerializer
dec := yaml.NewDecoder(reader)
err = dec.Decode(&action)
if err == io.EOF {
return &actionStore{
log: log,
store: store,
}, nil
}
if err != nil {
return nil, err
}
apc := fleetapi.ActionConfigChange(action)
return &actionStore{
log: log,
store: store,
action: &apc,
}, nil
}
// Add is only taking care of ActionPolicyChange for now and will only keep the last one it receive,
// any other type of action will be silently ignored.
func (s *actionStore) Add(a action) {
switch v := a.(type) {
case *fleetapi.ActionConfigChange, *fleetapi.ActionUnenroll:
// Only persist the action if the action is different.
if s.action != nil && s.action.ID() == v.ID() {
return
}
s.dirty = true
s.action = a
}
}
func (s *actionStore) Save() error {
defer func() { s.dirty = false }()
if !s.dirty {
return nil
}
var reader io.Reader
if apc, ok := s.action.(*fleetapi.ActionConfigChange); ok {
serialize := actionConfigChangeSerializer(*apc)
r, err := yamlToReader(&serialize)
if err != nil {
return err
}
reader = r
} else if aun, ok := s.action.(*fleetapi.ActionUnenroll); ok {
serialize := actionUnenrollSerializer(*aun)
r, err := yamlToReader(&serialize)
if err != nil {
return err
}
reader = r
}
if reader == nil {
return fmt.Errorf("incompatible type, expected ActionPolicyChange and received %T", s.action)
}
if err := s.store.Save(reader); err != nil {
return err
}
s.log.Debugf("save on disk action policy change: %+v", s.action)
return nil
}
// Actions returns a slice of action to execute in order, currently only a action policy change is
// persisted.
func (s *actionStore) Actions() []action {
if s.action == nil {
return []action{}
}
return []action{s.action}
}
// actionConfigChangeSerializer is a struct that adds a YAML serialization, I don't think serialization
// is a concern of the fleetapi package. I went this route so I don't have to do much refactoring.
//
// There are four ways to achieve the same results:
// 1. We create a second struct that map the existing field.
// 2. We add the serialization in the fleetapi.
// 3. We move the actual action type outside of the actual fleetapi package.
// 4. We have two sets of type.
//
// This could be done in a refactoring.
type actionConfigChangeSerializer struct {
ActionID string `yaml:"action_id"`
ActionType string `yaml:"action_type"`
Config map[string]interface{} `yaml:"config"`
}
// Add a guards between the serializer structs and the original struct.
var _ actionConfigChangeSerializer = actionConfigChangeSerializer(fleetapi.ActionConfigChange{})
// actionUnenrollSerializer is a struct that adds a YAML serialization,
type actionUnenrollSerializer struct {
ActionID string `yaml:"action_id"`
ActionType string `yaml:"action_type"`
IsDetected bool `yaml:"is_detected"`
}
// Add a guards between the serializer structs and the original struct.
var _ actionUnenrollSerializer = actionUnenrollSerializer(fleetapi.ActionUnenroll{})
// actionStoreAcker wraps an existing acker and will send any acked event to the action store,
// its up to the action store to decide if we need to persist the event for future replay or just
// discard the event.
type actionStoreAcker struct {
acker fleetAcker
store *actionStore
}
func (a *actionStoreAcker) Ack(ctx context.Context, action fleetapi.Action) error {
if err := a.acker.Ack(ctx, action); err != nil {
return err
}
a.store.Add(action)
return a.store.Save()
}
func (a *actionStoreAcker) Commit(ctx context.Context) error {
return a.acker.Commit(ctx)
}
func newActionStoreAcker(acker fleetAcker, store *actionStore) *actionStoreAcker {
return &actionStoreAcker{acker: acker, store: store}
}
func replayActions(
log *logger.Logger,
dispatcher dispatcher,
acker fleetAcker,
actions ...action,
) error {
log.Info("restoring current policy from disk")
if err := dispatcher.Dispatch(acker, actions...); err != nil {
return err
}
return nil
}