forked from liqotech/liqo
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwait.go
228 lines (205 loc) · 10.4 KB
/
wait.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
// Copyright 2019-2024 The Liqo Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wait
import (
"context"
"fmt"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/client"
discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1"
offloadingv1alpha1 "github.com/liqotech/liqo/apis/offloading/v1alpha1"
"github.com/liqotech/liqo/pkg/liqoctl/factory"
"github.com/liqotech/liqo/pkg/liqoctl/output"
"github.com/liqotech/liqo/pkg/utils"
fcutils "github.com/liqotech/liqo/pkg/utils/foreignCluster"
getters "github.com/liqotech/liqo/pkg/utils/getters"
)
// Waiter is a struct that contains the necessary information to wait for resource events.
type Waiter struct {
// Printer is the object used to output messages in the appropriate format.
Printer *output.Printer
// crClient is the controller runtime client.
CRClient client.Client
}
// NewWaiterFromFactory creates a new Waiter object from the given factory.
func NewWaiterFromFactory(f *factory.Factory) *Waiter {
return &Waiter{
Printer: f.Printer,
CRClient: f.CRClient,
}
}
// ForUnpeering waits until the status on the foreiglcusters resource states that the in/outgoing peering has been successfully
// set to None or the timeout expires.
func (w *Waiter) ForUnpeering(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
remName := remoteClusterID.ClusterName
s := w.Printer.StartSpinner(fmt.Sprintf("Unpeering from the remote cluster %q", remName))
err := fcutils.PollForEvent(ctx, w.CRClient, remoteClusterID, fcutils.IsUnpeered, 1*time.Second)
if client.IgnoreNotFound(err) != nil {
s.Fail(fmt.Sprintf("Failed unpeering from remote cluster %q: %s", remName, output.PrettyErr(err)))
return err
}
s.Success(fmt.Sprintf("Successfully unpeered from remote cluster %q", remName))
return nil
}
// ForOutgoingUnpeering waits until the status on the foreiglcusters resource states that the outgoing peering has been successfully
// set to None or the timeout expires.
func (w *Waiter) ForOutgoingUnpeering(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
remName := remoteClusterID.ClusterName
s := w.Printer.StartSpinner(fmt.Sprintf("Disabling outgoing peering to the remote cluster %q", remName))
err := fcutils.PollForEvent(ctx, w.CRClient, remoteClusterID, fcutils.IsOutgoingPeeringNone, 1*time.Second)
if client.IgnoreNotFound(err) != nil {
s.Fail(fmt.Sprintf("Failed disabling outgoing peering to the remote cluster %q: %s", remName, output.PrettyErr(err)))
return err
}
s.Success(fmt.Sprintf("Successfully disabled outgoing peering to the remote cluster %q", remName))
return nil
}
// ForIncomingUnpeering waits until the status on the foreiglcusters resource states that the incoming peering has been successfully
// set to None or the timeout expires.
func (w *Waiter) ForIncomingUnpeering(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
remName := remoteClusterID.ClusterName
s := w.Printer.StartSpinner(fmt.Sprintf("Disabling incoming peering to the remote cluster %q", remName))
err := fcutils.PollForEvent(ctx, w.CRClient, remoteClusterID, fcutils.IsIncomingPeeringNone, 1*time.Second)
if client.IgnoreNotFound(err) != nil {
s.Fail(fmt.Sprintf("Failed disabling incoming peering to the remote cluster %q: %s", remName, output.PrettyErr(err)))
return err
}
s.Success(fmt.Sprintf("Successfully disabled incoming peering to the remote cluster %q", remName))
return nil
}
// ForAuth waits until the authentication has been established with the remote cluster or the timeout expires.
func (w *Waiter) ForAuth(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
remName := remoteClusterID.ClusterName
s := w.Printer.StartSpinner(fmt.Sprintf("Waiting for authentication to the cluster %q", remName))
err := fcutils.PollForEvent(ctx, w.CRClient, remoteClusterID, fcutils.IsAuthenticated, 1*time.Second)
if err != nil {
s.Fail(fmt.Sprintf("Authentication to the remote cluster %q failed: %s", remName, output.PrettyErr(err)))
return err
}
s.Success(fmt.Sprintf("Authenticated to cluster %q", remName))
return nil
}
// ForNetwork waits until the networking has been established with the remote cluster or the timeout expires.
func (w *Waiter) ForNetwork(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
remName := remoteClusterID.ClusterName
s := w.Printer.StartSpinner(fmt.Sprintf("Waiting for network to the remote cluster %q", remName))
err := fcutils.PollForEvent(ctx, w.CRClient, remoteClusterID, fcutils.IsNetworkingEstablishedOrExternal, 1*time.Second)
if err != nil {
s.Fail(fmt.Sprintf("Failed establishing networking to the remote cluster %q: %s", remName, output.PrettyErr(err)))
return err
}
s.Success(fmt.Sprintf("Network established to the remote cluster %q", remName))
return nil
}
// ForOutgoingPeering waits until the status on the foreiglcusters resource states that the outgoing peering has been successfully
// established or the timeout expires.
func (w *Waiter) ForOutgoingPeering(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
remName := remoteClusterID.ClusterName
s := w.Printer.StartSpinner(fmt.Sprintf("Activating outgoing peering to the remote cluster %q", remName))
err := fcutils.PollForEvent(ctx, w.CRClient, remoteClusterID, fcutils.IsOutgoingJoined, 1*time.Second)
if err != nil {
s.Fail(fmt.Sprintf("Failed activating outgoing peering to the remote cluster %q: %s", remName, output.PrettyErr(err)))
return err
}
s.Success(fmt.Sprintf("Outgoing peering activated to the remote cluster %q", remName))
return nil
}
// ForIncomingPeering waits until the status on the foreiglcusters resource states that the incoming peering has been successfully
// set to Yes or the timeout expires.
func (w *Waiter) ForIncomingPeering(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
remName := remoteClusterID.ClusterName
s := w.Printer.StartSpinner(fmt.Sprintf("Activating incoming peering to the remote cluster %q", remName))
err := fcutils.PollForEvent(ctx, w.CRClient, remoteClusterID, fcutils.IsIncomingPeeringYes, 1*time.Second)
if err != nil {
s.Fail(fmt.Sprintf("Failed activating outgoing peering to the remote cluster %q: %s", remName, output.PrettyErr(err)))
return err
}
s.Success(fmt.Sprintf("Incoming peering activated to the remote cluster %q", remName))
return nil
}
// ForNode waits until the node has been added to the cluster or the timeout expires.
func (w *Waiter) ForNode(ctx context.Context, remoteClusterID *discoveryv1alpha1.ClusterIdentity) error {
remName := remoteClusterID.ClusterName
s := w.Printer.StartSpinner(fmt.Sprintf("Waiting for node to be created for the remote cluster %q", remName))
err := wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(ctx context.Context) (done bool, err error) {
nodes, err := getters.ListNodesByClusterID(ctx, w.CRClient, remoteClusterID)
if err != nil {
return false, client.IgnoreNotFound(err)
}
for i := range nodes.Items {
if !utils.IsNodeReady(&nodes.Items[i]) {
return false, nil
}
}
return true, nil
})
if err != nil {
s.Fail(fmt.Sprintf("Failed waiting for node to be created for remote cluster %q: %s", remName, output.PrettyErr(err)))
return err
}
s.Success(fmt.Sprintf("Node created for remote cluster %q", remName))
return nil
}
// ForOffloading waits until the status on the NamespaceOffloading resource states that the offloading has been successfully
// established or the timeout expires.
func (w *Waiter) ForOffloading(ctx context.Context, namespace string) error {
s := w.Printer.StartSpinner(fmt.Sprintf("Waiting for offloading of namespace %q to complete", namespace))
noClusterSelected := false
var offload *offloadingv1alpha1.NamespaceOffloading
err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (done bool, err error) {
offload, err = getters.GetOffloadingByNamespace(ctx, w.CRClient, namespace)
if err != nil {
return false, client.IgnoreNotFound(err)
}
// Retry in case the observed generation does not match, as the status still needs to be updated.
if offload.Status.ObservedGeneration != offload.GetGeneration() {
return false, nil
}
someFailed := offload.Status.OffloadingPhase == offloadingv1alpha1.SomeFailedOffloadingPhaseType
allFailed := offload.Status.OffloadingPhase == offloadingv1alpha1.AllFailedOffloadingPhaseType
if someFailed || allFailed {
return true, fmt.Errorf("the offloading is in %q state", offload.Status.OffloadingPhase)
}
ready := offload.Status.OffloadingPhase == offloadingv1alpha1.ReadyOffloadingPhaseType
noClusterSelected = offload.Status.OffloadingPhase == offloadingv1alpha1.NoClusterSelectedOffloadingPhaseType
return ready || noClusterSelected, nil
})
if err != nil {
s.Fail(fmt.Sprintf("Failed waiting for offloading to complete: %s", output.PrettyErr(err)))
return err
}
if noClusterSelected {
s.Warning("Offloading completed, but no cluster was selected")
return nil
}
s.Success("Offloading completed successfully")
return nil
}
// ForUnoffloading waits until the status on the NamespaceOffloading resource states that the offloading has been
// successfully removed or the timeout expires.
func (w *Waiter) ForUnoffloading(ctx context.Context, namespace string) error {
s := w.Printer.StartSpinner(fmt.Sprintf("Waiting for unoffloading of namespace %q to complete", namespace))
err := wait.PollUntilContextCancel(ctx, 1*time.Second, true, func(ctx context.Context) (done bool, err error) {
_, err = getters.GetOffloadingByNamespace(ctx, w.CRClient, namespace)
return apierrors.IsNotFound(err), client.IgnoreNotFound(err)
})
if err != nil {
s.Fail(fmt.Sprintf("Failed waiting for unoffloading to complete: %s", output.PrettyErr(err)))
return err
}
s.Success("Unoffloading completed successfully")
return nil
}