Skip to content

Commit 321ce2b

Browse files
committed
Implemented OffloadingPatch consumer-side
1 parent 8a4e9cd commit 321ce2b

File tree

13 files changed

+184
-22
lines changed

13 files changed

+184
-22
lines changed

apis/virtualkubelet/v1alpha1/virtualnode_types.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ type OffloadingPatch struct {
3535
// Tolerations contains the tolerations to target the remote cluster.
3636
Tolerations []corev1.Toleration `json:"tolerations,omitempty"`
3737
// Affinity contains the affinity and anti-affinity rules to target the remote cluster.
38-
Affinities *Affinity `json:"affinities,omitempty"`
38+
Affinity *Affinity `json:"affinity,omitempty"`
3939
}
4040

4141
// DeploymentTemplate contains the deployment template of the virtual node.

apis/virtualkubelet/v1alpha1/zz_generated.deepcopy.go

+2-2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/virtual-kubelet/root/root.go

+32
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,25 @@ package root
1818
import (
1919
"context"
2020
"fmt"
21+
"os"
2122

2223
"github.com/pkg/errors"
2324
"github.com/spf13/cobra"
2425
"github.com/virtual-kubelet/virtual-kubelet/node"
2526
corev1 "k8s.io/api/core/v1"
2627
k8serrors "k8s.io/apimachinery/pkg/api/errors"
2728
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29+
"k8s.io/apimachinery/pkg/runtime"
2830
"k8s.io/client-go/discovery"
2931
"k8s.io/client-go/kubernetes"
32+
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
3033
corev1clients "k8s.io/client-go/kubernetes/typed/core/v1"
3134
"k8s.io/client-go/rest"
3235
"k8s.io/client-go/tools/record"
3336
"k8s.io/klog/v2"
37+
"sigs.k8s.io/controller-runtime/pkg/client"
3438

39+
virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
3540
"github.com/liqotech/liqo/pkg/consts"
3641
identitymanager "github.com/liqotech/liqo/pkg/identityManager"
3742
tenantnamespace "github.com/liqotech/liqo/pkg/tenantNamespace"
@@ -44,6 +49,15 @@ import (
4449
"github.com/liqotech/liqo/pkg/virtualKubelet/reflection/generic"
4550
)
4651

52+
var (
53+
scheme = runtime.NewScheme()
54+
)
55+
56+
func init() {
57+
_ = clientgoscheme.AddToScheme(scheme)
58+
_ = virtualkubeletv1alpha1.AddToScheme(scheme)
59+
}
60+
4761
const defaultVersion = "v1.25.0" // This should follow the version of k8s.io/kubernetes we are importing
4862

4963
// NewCommand creates a new top-level command.
@@ -102,6 +116,22 @@ func runRootCommand(ctx context.Context, c *Opts) error {
102116

103117
restcfg.SetRateLimiter(remoteConfig)
104118

119+
cl, err := client.New(localConfig, client.Options{
120+
Scheme: scheme,
121+
})
122+
if err != nil {
123+
return err
124+
}
125+
126+
// Get virtual node
127+
vnName := os.Getenv("VIRTUALNODE_NAME")
128+
ns := os.Getenv("POD_NAMESPACE")
129+
var vn virtualkubeletv1alpha1.VirtualNode
130+
if err := cl.Get(ctx, client.ObjectKey{Name: vnName, Namespace: ns}, &vn); err != nil {
131+
klog.Errorf("Unable to get virtual node: %v", err)
132+
return err
133+
}
134+
105135
// Initialize the pod provider
106136
podcfg := podprovider.InitConfig{
107137
LocalConfig: localConfig,
@@ -134,6 +164,8 @@ func runRootCommand(ctx context.Context, c *Opts) error {
134164

135165
LabelsNotReflected: c.LabelsNotReflected.StringList,
136166
AnnotationsNotReflected: c.AnnotationsNotReflected.StringList,
167+
168+
OffloadingPatch: vn.Spec.OffloadingPatch,
137169
}
138170

139171
eb := record.NewBroadcaster()

deployments/liqo/crds/virtualkubelet.liqo.io_virtualnodes.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ spec:
156156
description: OffloadingPatch contains the information to target a
157157
groups of node on the remote cluster.
158158
properties:
159-
affinities:
159+
affinity:
160160
description: Affinity contains the affinity and anti-affinity
161161
rules to target the remote cluster.
162162
properties:

pkg/liqo-controller-manager/virtualnode-controller/virtualkubelet.go

+47-4
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,16 @@ package virtualnodectrl
1616

1717
import (
1818
"context"
19+
"crypto/sha256"
20+
"encoding/hex"
21+
"encoding/json"
1922
"fmt"
2023

2124
appsv1 "k8s.io/api/apps/v1"
2225
corev1 "k8s.io/api/core/v1"
2326
rbacv1 "k8s.io/api/rbac/v1"
2427
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/apimachinery/pkg/labels"
2529
"k8s.io/klog/v2"
2630
k8strings "k8s.io/utils/strings"
2731
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -33,6 +37,8 @@ import (
3337
vkutils "github.com/liqotech/liqo/pkg/vkMachinery/utils"
3438
)
3539

40+
const offloadingPatchHashAnnotation = "liqo.io/offloading-patch-hash"
41+
3642
// createVirtualKubeletDeployment creates the VirtualKubelet Deployment.
3743
func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentPresence(
3844
ctx context.Context, virtualNode *virtualkubeletv1alpha1.VirtualNode) (err error) {
@@ -50,6 +56,7 @@ func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentPresence(
5056
err = fmt.Errorf("error updating virtual node status: %w", interr)
5157
}
5258
}()
59+
5360
ForgeCondition(virtualNode,
5461
VnConditionMap{
5562
virtualkubeletv1alpha1.VirtualKubeletConditionType: VnCondition{
@@ -86,11 +93,28 @@ func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentPresence(
8693
remoteClusterIdentity.ClusterName, vkClusterRoleBinding.Name, op)
8794

8895
// forge the virtual Kubelet Deployment
89-
vkDeployment := &appsv1.Deployment{}
90-
vkDeployment.ObjectMeta = *virtualNode.Spec.Template.ObjectMeta.DeepCopy()
96+
vkDeployment := appsv1.Deployment{
97+
ObjectMeta: metav1.ObjectMeta{
98+
Name: virtualNode.Spec.Template.GetName(),
99+
Namespace: virtualNode.Spec.Template.GetNamespace(),
100+
},
101+
}
102+
op, err = controllerutil.CreateOrUpdate(ctx, r.Client, &vkDeployment, func() error {
103+
vkDeployment.Annotations = labels.Merge(vkDeployment.Annotations, virtualNode.Spec.Template.ObjectMeta.GetAnnotations())
104+
vkDeployment.Labels = labels.Merge(vkDeployment.Labels, virtualNode.Spec.Template.ObjectMeta.GetLabels())
91105

92-
op, err = controllerutil.CreateOrUpdate(ctx, r.Client, vkDeployment, func() error {
93106
vkDeployment.Spec = *virtualNode.Spec.Template.Spec.DeepCopy()
107+
108+
// Add the hash of the offloading patch as annotation
109+
opHash, err := offloadingPatchHash(virtualNode.Spec.OffloadingPatch)
110+
if err != nil {
111+
return err
112+
}
113+
if vkDeployment.Spec.Template.Annotations == nil {
114+
vkDeployment.Spec.Template.Annotations = make(map[string]string)
115+
}
116+
vkDeployment.Spec.Template.Annotations[offloadingPatchHashAnnotation] = opHash
117+
94118
return nil
95119
})
96120
if err != nil {
@@ -110,7 +134,9 @@ func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentPresence(
110134
VnConditionMap{
111135
virtualkubeletv1alpha1.VirtualKubeletConditionType: VnCondition{
112136
Status: virtualkubeletv1alpha1.RunningConditionStatusType,
113-
}})
137+
},
138+
})
139+
114140
if *virtualNode.Spec.CreateNode {
115141
ForgeCondition(virtualNode,
116142
VnConditionMap{
@@ -160,3 +186,20 @@ func (r *VirtualNodeReconciler) ensureVirtualKubeletDeploymentAbsence(
160186

161187
return nil
162188
}
189+
190+
func offloadingPatchHash(offloadingPatch *virtualkubeletv1alpha1.OffloadingPatch) (string, error) {
191+
if offloadingPatch == nil {
192+
return "", nil
193+
}
194+
195+
opString, err := json.Marshal(offloadingPatch)
196+
if err != nil {
197+
klog.Error(err)
198+
return "", err
199+
}
200+
201+
opHash := sha256.Sum256(opString)
202+
opHashHex := hex.EncodeToString(opHash[:])
203+
204+
return opHashHex, nil
205+
}

pkg/utils/pod/pod.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import (
1919

2020
corev1 "k8s.io/api/core/v1"
2121
"k8s.io/apimachinery/pkg/api/resource"
22-
"k8s.io/utils/pointer"
22+
"k8s.io/utils/ptr"
2323
)
2424

2525
// IsPodReady returns true if a pod is ready; false otherwise. It also returns a reason (as provided by Kubernetes).
@@ -60,7 +60,7 @@ func IsPodSpecEqual(previous, updated *corev1.PodSpec) bool {
6060
// * spec.tolerations (only new entries can be added)
6161
return AreContainersEqual(previous.Containers, updated.Containers) &&
6262
AreContainersEqual(previous.InitContainers, updated.InitContainers) &&
63-
pointer.Int64Equal(previous.ActiveDeadlineSeconds, updated.ActiveDeadlineSeconds) &&
63+
ptr.Equal(previous.ActiveDeadlineSeconds, updated.ActiveDeadlineSeconds) &&
6464
len(previous.Tolerations) == len(updated.Tolerations)
6565
}
6666

pkg/virtualKubelet/forge/forge.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2222

2323
discoveryv1alpha1 "github.com/liqotech/liqo/apis/discovery/v1alpha1"
24+
virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
2425
)
2526

2627
// ReflectionFieldManager -> The name associated with the fields modified by virtual kubelet reflection.
@@ -72,12 +73,14 @@ func ApplyOptions() metav1.ApplyOptions {
7273
type ForgingOpts struct {
7374
LabelsNotReflected []string
7475
AnnotationsNotReflected []string
76+
OffloadingPatch *virtualkubeletv1alpha1.OffloadingPatch
7577
}
7678

7779
// NewForgingOpts returns a new ForgingOpts instance.
78-
func NewForgingOpts(labelsNotReflected, annotationsNotReflected []string) ForgingOpts {
80+
func NewForgingOpts(labelsNotReflected, annotationsNotReflected []string, offloadingPatch *virtualkubeletv1alpha1.OffloadingPatch) ForgingOpts {
7981
return ForgingOpts{
8082
LabelsNotReflected: labelsNotReflected,
8183
AnnotationsNotReflected: annotationsNotReflected,
84+
OffloadingPatch: offloadingPatch,
8285
}
8386
}

pkg/virtualKubelet/forge/pods.go

+61-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import (
2626
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2727
corev1apply "k8s.io/client-go/applyconfigurations/core/v1"
2828
metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1"
29-
"k8s.io/utils/pointer"
29+
"k8s.io/utils/ptr"
3030

3131
vkv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
3232
liqoconst "github.com/liqotech/liqo/pkg/consts"
@@ -225,7 +225,7 @@ func RemotePodSpec(creation bool, local, remote *corev1.PodSpec, mutators ...Rem
225225

226226
// The information about the service account name is not reflected, since the volume is already
227227
// present, and the remote creation would fail as the corresponding service account is not present.
228-
remote.AutomountServiceAccountToken = pointer.Bool(false)
228+
remote.AutomountServiceAccountToken = ptr.To(false)
229229

230230
// This fields are currently forced to false, to prevent invasive settings on the remote cluster (which might not work).
231231
remote.HostIPC = false
@@ -283,7 +283,7 @@ func ServiceAccountMutator(apiServerSupport APIServerSupportType, localAnnotatio
283283
}
284284

285285
remote.ServiceAccountName = remoteServiceAccountName
286-
remote.AutomountServiceAccountToken = pointer.Bool(true)
286+
remote.AutomountServiceAccountToken = ptr.To(true)
287287
default:
288288
// Remove the service account name.
289289
remote.ServiceAccountName = ""
@@ -353,6 +353,64 @@ func AntiAffinityHardMutator(labels map[string]string) RemotePodSpecMutator {
353353
}
354354
}
355355

356+
// NodeSelectorMutator is a mutator which implements the support to propagate a given node selector constraint.
357+
func NodeSelectorMutator(nodeSelector map[string]string) RemotePodSpecMutator {
358+
return func(remote *corev1.PodSpec) {
359+
if remote.NodeSelector == nil {
360+
remote.NodeSelector = map[string]string{}
361+
}
362+
363+
for k, v := range nodeSelector {
364+
remote.NodeSelector[k] = v
365+
}
366+
}
367+
}
368+
369+
// TolerationsMutator is a mutator which implements the support to propagate tolerations.
370+
func TolerationsMutator(tolerations []corev1.Toleration) RemotePodSpecMutator {
371+
return func(remote *corev1.PodSpec) {
372+
remote.Tolerations = append(remote.Tolerations, tolerations...)
373+
}
374+
}
375+
376+
// AffinityMutator is a mutator which implements the support to propagate affinity constraints.
377+
func AffinityMutator(affinity *vkv1alpha1.Affinity) RemotePodSpecMutator {
378+
return func(remote *corev1.PodSpec) {
379+
if affinity == nil || affinity.NodeAffinity == nil {
380+
return
381+
}
382+
383+
nodeAffinity := affinity.NodeAffinity.DeepCopy()
384+
385+
if remote.Affinity == nil {
386+
remote.Affinity = &corev1.Affinity{
387+
NodeAffinity: nodeAffinity,
388+
}
389+
return
390+
}
391+
392+
if remote.Affinity.NodeAffinity == nil {
393+
remote.Affinity.NodeAffinity = nodeAffinity
394+
return
395+
}
396+
397+
if nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
398+
if remote.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution == nil {
399+
remote.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution =
400+
nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution
401+
} else {
402+
remote.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms = append(
403+
remote.Affinity.NodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms,
404+
nodeAffinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms...)
405+
}
406+
}
407+
408+
remote.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution =
409+
append(remote.Affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution,
410+
nodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution...)
411+
}
412+
}
413+
356414
// FilterAntiAffinityLabels filters the label keys which are used to implement the anti-affinity constraints, based on the specified whitelist.
357415
func FilterAntiAffinityLabels(labels map[string]string, whitelist string) map[string]string {
358416
if whitelist != "" {

pkg/virtualKubelet/provider/provider.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ type InitConfig struct {
8181

8282
LabelsNotReflected []string
8383
AnnotationsNotReflected []string
84+
85+
OffloadingPatch *vkalpha1.OffloadingPatch
8486
}
8587

8688
// LiqoProvider implements the virtual-kubelet provider interface and stores pods in memory.
@@ -134,7 +136,7 @@ func NewLiqoProvider(ctx context.Context, cfg *InitConfig, eb record.EventBroadc
134136

135137
podreflector := workload.NewPodReflector(cfg.RemoteConfig, remoteMetricsClient, ipamClient, &podReflectorConfig, cfg.ReflectorsConfigs[generic.Pod])
136138
reflectionManager := manager.New(localClient, remoteClient, localLiqoClient, remoteLiqoClient,
137-
cfg.InformerResyncPeriod, eb, cfg.LabelsNotReflected, cfg.AnnotationsNotReflected).
139+
cfg.InformerResyncPeriod, eb, cfg.LabelsNotReflected, cfg.AnnotationsNotReflected, cfg.OffloadingPatch).
138140
With(podreflector).
139141
With(exposition.NewServiceReflector(cfg.ReflectorsConfigs[generic.Service], cfg.EnableLoadBalancer, cfg.RemoteRealLoadBalancerClassName)).
140142
With(exposition.NewIngressReflector(cfg.ReflectorsConfigs[generic.Ingress], cfg.EnableIngress, cfg.RemoteRealIngressClassName)).

pkg/virtualKubelet/reflection/manager/manager.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"k8s.io/klog/v2"
2929
"k8s.io/utils/trace"
3030

31+
virtualkubeletv1alpha1 "github.com/liqotech/liqo/apis/virtualkubelet/v1alpha1"
3132
liqoclient "github.com/liqotech/liqo/pkg/client/clientset/versioned"
3233
liqoinformers "github.com/liqotech/liqo/pkg/client/informers/externalversions"
3334
traceutils "github.com/liqotech/liqo/pkg/utils/trace"
@@ -61,7 +62,8 @@ type manager struct {
6162

6263
// New returns a new manager to start the reflection towards a remote cluster.
6364
func New(local, remote kubernetes.Interface, localLiqo, remoteLiqo liqoclient.Interface, resync time.Duration,
64-
eb record.EventBroadcaster, labelsNotReflected, annotationsNotReflected []string) Manager {
65+
eb record.EventBroadcaster, labelsNotReflected, annotationsNotReflected []string,
66+
offloadingPatch *virtualkubeletv1alpha1.OffloadingPatch) Manager {
6567
// Configure the field selector to retrieve only the pods scheduled on the current virtual node.
6668
localPodTweakListOptions := func(opts *metav1.ListOptions) {
6769
opts.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", forge.LiqoNodeName).String()
@@ -82,7 +84,7 @@ func New(local, remote kubernetes.Interface, localLiqo, remoteLiqo liqoclient.In
8284
started: false,
8385
stop: make(map[string]context.CancelFunc),
8486

85-
forgingOpts: forge.NewForgingOpts(labelsNotReflected, annotationsNotReflected),
87+
forgingOpts: forge.NewForgingOpts(labelsNotReflected, annotationsNotReflected, offloadingPatch),
8688
}
8789
}
8890

0 commit comments

Comments
 (0)