Skip to content

Commit

Permalink
Alternate, Simplified Async Pull (#137)
Browse files Browse the repository at this point in the history
* simplified RunCompletionChecker() loop exit
* updated logging
* updated chart to use new flag
* removed previous async files
* changed to sync from async for this test which may match original intent
* added async puller startup log messages
* fixed omission of setting imageSvc
* updated ephemeral volume yaml to always pull
* added final steps to fully flush the ephemeral volume and verify async pull logic functions properly via fully integrated kind test
* resolved session state bug, reintroduced metrics, updated metrics test
* fixed elapsed calc
* added removal of pull time info after 1 minute expiration
* extracted changes for dev experience PR for issue #143
* operation error metric cleanup
* introduced image size metric in similar pattern to image pull time
* added pull time and size logs, added error handling to image size check (after panic seen locally in kind), added size metrics, added size error count metric
* added integration test for async pull feature
* replaced completionChan and loop with completionFunc
* removed isComplete
* updated timeout to be more verbose
* fixed bug where one call to StartAsyncPuller() in node_server.go was not updated to remove completionChan depth and the build targets still succeeded
* captured comment updates
* implemented change for exposing Image() on PullSession
* added configurability of asyncPullTimeout to helm deployment yaml
* fixed buG resulting from uniform extraction of image string from docker.Named struct
* addressed two comments, replaced use of docker.Named.String() with Puller getter method Puller.ImageWithTag()
  • Loading branch information
imuni4fun authored Mar 4, 2024
1 parent 3d36010 commit ade923f
Show file tree
Hide file tree
Showing 23 changed files with 746 additions and 572 deletions.
39 changes: 39 additions & 0 deletions .github/workflows/containerd-async.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
name: containerd-async-11mins
on:
push:
branches: [main]
pull_request:
branches: [main]
workflow_dispatch:
jobs:
integration:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Start a kind cluster with containerd
uses: helm/[email protected]
with:
cluster_name: kind-${{ github.run_id }}
kubectl_version: "v1.25.2"
config: ./hack/ci/containerd-cluster-conf.yaml
- name: Install private registry
run: ./hack/ci/setup_private_registry.sh
- name: Build image
run: ./hack/ci/build.sh
- name: Set image version
run: |
echo "VALUE_FILE=charts/warm-metal-csi-driver/values.yaml" >> "$GITHUB_ENV"
echo "IMAGE_TAG=$(git rev-parse --short HEAD)" >> "$GITHUB_ENV"
echo "HELM_NAME=wm-csi-integration-tests" >> "$GITHUB_ENV"
- name: Install the CSI Driver
run: |
helm install ${HELM_NAME} charts/warm-metal-csi-driver -n kube-system \
-f ${VALUE_FILE} \
--set csiPlugin.image.tag=${IMAGE_TAG} \
--set enableAsyncPull=true \
--wait \
--debug
- name: Run integration Tests
run: ./hack/ci/test.sh
- name: Uninstall the CSI Driver
run: helm uninstall -n kube-system ${HELM_NAME} --wait
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
VERSION ?= v1.1.0
VERSION ?= v1.2.0

IMAGE_BUILDER ?= docker
IMAGE_BUILD_CMD ?= buildx
Expand Down
4 changes: 2 additions & 2 deletions charts/warm-metal-csi-driver/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ type: application
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
version: 1.1.0
version: 1.2.0

# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
appVersion: v1.1.0
appVersion: v1.2.0
4 changes: 2 additions & 2 deletions charts/warm-metal-csi-driver/templates/nodeplugin.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ spec:
{{- if .Values.enableDaemonImageCredentialCache }}
- --enable-daemon-image-credential-cache
{{- end }}
{{- if .Values.enableAsyncPullMount }}
- --async-pull-mount=true
{{- if .Values.enableAsyncPull }}
- --async-pull-timeout={{ .Values.asyncPullTimeout }}
{{- end }}
- "-v={{ .Values.logLevel }}"
- "--mode=node"
Expand Down
3 changes: 2 additions & 1 deletion charts/warm-metal-csi-driver/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ kubeletRoot: /var/lib/kubelet
snapshotRoot: /var/lib/containerd/io.containerd.snapshotter.v1.overlayfs
logLevel: 4
enableDaemonImageCredentialCache:
enableAsyncPullMount: false
enableAsyncPull: false
asyncPullTimeout: "10m"
pullImageSecretForDaemonset:

csiPlugin:
Expand Down
6 changes: 3 additions & 3 deletions cmd/plugin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ var (
enableCache = flag.Bool("enable-daemon-image-credential-cache", true,
"Whether to save contents of imagepullsecrets of the daemon ServiceAccount in memory. "+
"If set to false, secrets will be fetched from the API server on every image pull.")
asyncImagePullMount = flag.Bool("async-pull-mount", false,
"Whether to pull images asynchronously (helps prevent timeout for larger images)")
asyncImagePullTimeout = flag.Duration("async-pull-timeout", 0,
"If positive, specifies duration allotted for async image pulls as measured from pull start time. If zero, negative, less than 30s, or omitted, the caller's timeout (usually kubelet: 2m) is used instead of this value. (additional time helps prevent timeout for larger images or slower image pull conditions)")
watcherResyncPeriod = flag.Duration("watcher-resync-period", 30*time.Minute, "The resync period of the pvc watcher.")
mode = flag.String("mode", "", "The mode of the driver. Valid values are: node, controller")
nodePluginSA = flag.String("node-plugin-sa", "csi-image-warm-metal", "The name of the ServiceAccount used by the node plugin.")
Expand Down Expand Up @@ -129,7 +129,7 @@ func main() {
server.Start(*endpoint,
NewIdentityServer(driverVersion),
nil,
NewNodeServer(driver, mounter, criClient, secretStore, *asyncImagePullMount))
NewNodeServer(driver, mounter, criClient, secretStore, *asyncImagePullTimeout))
case controllerMode:
watcher, err := watcher.New(context.Background(), *watcherResyncPeriod)
if err != nil {
Expand Down
138 changes: 67 additions & 71 deletions cmd/plugin/node_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,14 @@ import (
"context"
"os"
"strings"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/containerd/containerd/reference/docker"
"github.com/google/uuid"
"github.com/warm-metal/container-image-csi-driver/pkg/backend"
"github.com/warm-metal/container-image-csi-driver/pkg/metrics"
"github.com/warm-metal/container-image-csi-driver/pkg/mountexecutor"
"github.com/warm-metal/container-image-csi-driver/pkg/mountstatus"
"github.com/warm-metal/container-image-csi-driver/pkg/pullexecutor"
"github.com/warm-metal/container-image-csi-driver/pkg/remoteimage"
"github.com/warm-metal/container-image-csi-driver/pkg/remoteimageasync"
"github.com/warm-metal/container-image-csi-driver/pkg/secret"
csicommon "github.com/warm-metal/csi-drivers/pkg/csi-common"
"google.golang.org/grpc/codes"
Expand All @@ -31,36 +30,36 @@ const (

type ImagePullStatus int

func NewNodeServer(driver *csicommon.CSIDriver, mounter backend.Mounter, imageSvc cri.ImageServiceClient, secretStore secret.Store, asyncImagePullMount bool) *NodeServer {
return &NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(driver),
mounter: mounter,
secretStore: secretStore,
asyncImagePullMount: asyncImagePullMount,
mountExecutor: mountexecutor.NewMountExecutor(&mountexecutor.MountExecutorOptions{
AsyncMount: asyncImagePullMount,
Mounter: mounter,
}),
pullExecutor: pullexecutor.NewPullExecutor(&pullexecutor.PullExecutorOptions{
AsyncPull: asyncImagePullMount,
ImageServiceClient: imageSvc,
SecretStore: secretStore,
Mounter: mounter,
}),
}
func NewNodeServer(driver *csicommon.CSIDriver, mounter backend.Mounter, imageSvc cri.ImageServiceClient, secretStore secret.Store, asyncImagePullTimeout time.Duration) *NodeServer {
ns := NodeServer{
DefaultNodeServer: csicommon.NewDefaultNodeServer(driver),
mounter: mounter,
imageSvc: imageSvc,
secretStore: secretStore,
asyncImagePullTimeout: asyncImagePullTimeout,
asyncImagePuller: nil,
}
if asyncImagePullTimeout >= time.Duration(30*time.Second) {
klog.Infof("Starting node server in Async mode with %v timeout", asyncImagePullTimeout)
ns.asyncImagePuller = remoteimageasync.StartAsyncPuller(context.TODO(), 100)
} else {
klog.Info("Starting node server in Sync mode")
ns.asyncImagePullTimeout = 0 // set to default value
}
return &ns
}

type NodeServer struct {
*csicommon.DefaultNodeServer
mounter backend.Mounter
secretStore secret.Store
asyncImagePullMount bool
mountExecutor *mountexecutor.MountExecutor
pullExecutor *pullexecutor.PullExecutor
mounter backend.Mounter
imageSvc cri.ImageServiceClient
secretStore secret.Store
asyncImagePullTimeout time.Duration
asyncImagePuller remoteimageasync.AsyncPuller
}

func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) (resp *csi.NodePublishVolumeResponse, err error) {
valuesLogger := klog.LoggerWithValues(klog.NewKlogr(), "pod-name", req.VolumeContext["pod-name"], "namespace", req.VolumeContext["namespace"], "uid", req.VolumeContext["uid"], "request-id", uuid.NewString())
valuesLogger := klog.LoggerWithValues(klog.NewKlogr(), "pod-name", req.VolumeContext["pod-name"], "namespace", req.VolumeContext["namespace"], "uid", req.VolumeContext["uid"])
valuesLogger.Info("Incoming NodePublishVolume request", "request string", req.String())
if len(req.VolumeId) == 0 {
err = status.Error(codes.InvalidArgument, "VolumeId is missing")
Expand Down Expand Up @@ -122,56 +121,59 @@ func (n NodeServer) NodePublishVolume(ctx context.Context, req *csi.NodePublishV
image = req.VolumeContext[ctxKeyImage]
}

namedRef, err := docker.ParseDockerRef(image)
if err != nil {
klog.Errorf("unable to normalize image %q: %s", image, err)
return
}

pullAlways := strings.ToLower(req.VolumeContext[ctxKeyPullAlways]) == "true"

po := &pullexecutor.PullOptions{
Context: ctx,
NamedRef: namedRef,
PullAlways: pullAlways,
Image: image,
PullSecrets: req.Secrets,
Logger: valuesLogger,
}

if e := n.pullExecutor.StartPulling(po); e != nil {
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, e)
keyring, err := n.secretStore.GetDockerKeyring(ctx, req.Secrets)
if err != nil {
err = status.Errorf(codes.Aborted, "unable to fetch keyring: %s", err)
return
}

if e := n.pullExecutor.WaitForPull(po); e != nil {
err = status.Errorf(codes.DeadlineExceeded, e.Error())
namedRef, err := docker.ParseDockerRef(image)
if err != nil {
klog.Errorf("unable to normalize image %q: %s", image, err)
return
}

if mountstatus.Get(req.VolumeId) == mountstatus.Mounted {
return &csi.NodePublishVolumeResponse{}, nil
}

o := &mountexecutor.MountOptions{
Context: ctx,
NamedRef: namedRef,
VolumeId: req.VolumeId,
TargetPath: req.TargetPath,
VolumeCapability: req.VolumeCapability,
ReadOnly: req.Readonly,
Logger: valuesLogger,
//NOTE: we are relying on n.mounter.ImageExists() to return false when
// a first-time pull is in progress, else this logic may not be
// correct. should test this.
if pullAlways || !n.mounter.ImageExists(ctx, namedRef) {
klog.Errorf("pull image %q", image)
puller := remoteimage.NewPuller(n.imageSvc, namedRef, keyring)

if n.asyncImagePuller != nil {
var session *remoteimageasync.PullSession
session, err = n.asyncImagePuller.StartPull(image, puller, n.asyncImagePullTimeout)
if err != nil {
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err)
metrics.OperationErrorsCount.WithLabelValues("pull-async-start").Inc()
return
}
if err = n.asyncImagePuller.WaitForPull(session, ctx); err != nil {
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err)
metrics.OperationErrorsCount.WithLabelValues("pull-async-wait").Inc()
return
}
} else {
if err = puller.Pull(ctx); err != nil {
err = status.Errorf(codes.Aborted, "unable to pull image %q: %s", image, err)
metrics.OperationErrorsCount.WithLabelValues("pull-sync-call").Inc()
return
}
}
}

if e := n.mountExecutor.StartMounting(o); e != nil {
err = status.Error(codes.Internal, e.Error())
ro := req.Readonly ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_SINGLE_NODE_READER_ONLY ||
req.VolumeCapability.AccessMode.Mode == csi.VolumeCapability_AccessMode_MULTI_NODE_READER_ONLY
if err = n.mounter.Mount(ctx, req.VolumeId, backend.MountTarget(req.TargetPath), namedRef, ro); err != nil {
err = status.Error(codes.Internal, err.Error())
metrics.OperationErrorsCount.WithLabelValues("mount").Inc()
return
}

if e := n.mountExecutor.WaitForMount(o); e != nil {
err = status.Errorf(codes.DeadlineExceeded, e.Error())
return
}
valuesLogger.Info("Successfully completed NodePublishVolume request", "request string", req.String())

return &csi.NodePublishVolumeResponse{}, nil
}
Expand All @@ -194,17 +196,11 @@ func (n NodeServer) NodeUnpublishVolume(ctx context.Context, req *csi.NodeUnpubl
}

if err = n.mounter.Unmount(ctx, req.VolumeId, backend.MountTarget(req.TargetPath)); err != nil {
// TODO(vadasambar): move this to mountexecutor once mountexecutor has `StartUnmounting` function
metrics.OperationErrorsCount.WithLabelValues("StartUnmounting").Inc()
metrics.OperationErrorsCount.WithLabelValues("unmount").Inc()
err = status.Error(codes.Internal, err.Error())
return
}

// Clear the mountstatus since the volume has been unmounted
// Not doing this will make mount not work properly if the same volume is
// attempted to mount twice
mountstatus.Delete(req.VolumeId)

return &csi.NodeUnpublishVolumeResponse{}, nil
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/plugin/node_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestNodePublishVolumeAsync(t *testing.T) {
driver := csicommon.NewCSIDriver(driverName, driverVersion, "fake-node")
assert.NotNil(t, driver)

asyncImagePulls := true
asyncImagePulls := 15 * time.Minute //TODO: determine intended value for this in the context of this test
ns := NewNodeServer(driver, mounter, criClient, &testSecretStore{}, asyncImagePulls)

// based on kubelet's csi mounter plugin code
Expand Down Expand Up @@ -168,7 +168,7 @@ func TestNodePublishVolumeSync(t *testing.T) {
driver := csicommon.NewCSIDriver(driverName, driverVersion, "fake-node")
assert.NotNil(t, driver)

asyncImagePulls := false
asyncImagePulls := 0 * time.Minute //TODO: determine intended value for this in the context of this test
ns := NewNodeServer(driver, mounter, criClient, &testSecretStore{}, asyncImagePulls)

// based on kubelet's csi mounter plugin code
Expand Down Expand Up @@ -300,7 +300,7 @@ func TestMetrics(t *testing.T) {
driver := csicommon.NewCSIDriver(driverName, driverVersion, "fake-node")
assert.NotNil(t, driver)

asyncImagePulls := true
asyncImagePulls := 15 * time.Minute //TODO: determine intended value for this in the context of this test
ns := NewNodeServer(driver, mounter, criClient, &testSecretStore{}, asyncImagePulls)

// based on kubelet's csi mounter plugin code
Expand Down Expand Up @@ -437,7 +437,7 @@ func TestMetrics(t *testing.T) {
assert.NoError(t, err)
respBody := string(b1)
assert.Contains(t, respBody, metrics.ImagePullTimeKey)
assert.Contains(t, respBody, metrics.ImageMountTimeKey)
assert.Contains(t, respBody, metrics.ImagePullTimeHist)
assert.Contains(t, respBody, metrics.OperationErrorsCountKey)

// give some time before stopping the server
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.12.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.8.0
github.com/stretchr/testify v1.8.4
github.com/warm-metal/csi-drivers v0.5.0-alpha.0.0.20210404173852-9ec9cb097dd2
golang.org/x/net v0.0.0-20221004154528-8021a29435af
google.golang.org/grpc v1.50.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -826,8 +826,9 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
github.com/syndtr/gocapability v0.0.0-20180916011248-d98352740cb2/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
Expand Down
Loading

0 comments on commit ade923f

Please sign in to comment.