Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support filtering snapshots by IDs #299

Merged
merged 1 commit into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
## unreleased

* Support filtering snapshots by ID
[[GH-299]](https://github.com/digitalocean/csi-digitalocean/pull/299)
* Return minimum disk size field from snapshot response
[[GH-298]](https://github.com/digitalocean/csi-digitalocean/pull/298)
* Improve debug HTTP server usage
Expand Down
170 changes: 105 additions & 65 deletions driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,93 +772,119 @@ func (d *Driver) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequ
// ListSnapshots shold not list a snapshot that is being created but has not
// been cut successfully yet.
func (d *Driver) ListSnapshots(ctx context.Context, req *csi.ListSnapshotsRequest) (*csi.ListSnapshotsResponse, error) {
// Pagination in the CSI world works different than at DO. CSI sends the
// `req.MaxEntries` to indicate how much snapshots it wants. The
// req.StartingToken is returned by us, if we somehow need to indicate that
// we couldn't fetch and need to fetch again. But it's NOT the page number.
// I.e: suppose CSI wants us to fetch 50 entries, we only fetch 30, we need to
// return NextToken as 31 (so req.StartingToken will be set to 31 when CSI
// calls us again), to indicate that we want to continue returning from the
// index 31 up to 50.

var nextToken int
var err error
if req.StartingToken != "" {
nextToken, err = strconv.Atoi(req.StartingToken)
if err != nil {
return nil, status.Errorf(codes.Aborted, "ListSnapshots starting token %s is not valid : %s",
req.StartingToken, err.Error())
}
}

if nextToken != 0 && req.MaxEntries != 0 {
return nil, status.Errorf(codes.Aborted,
"ListSnapshots invalid arguments starting token: %d and max entries: %d can't be non null at the same time", nextToken, req.MaxEntries)
}

listResp := &csi.ListSnapshotsResponse{}
log := d.log.WithFields(logrus.Fields{
"snapshot_id": req.SnapshotId,
"source_volume_id": req.SourceVolumeId,
"req_starting_token": req.StartingToken,
"method": "list_snapshots",
})
log.Info("list snapshots is called")

// fetch all entries
listOpts := &godo.ListOptions{
PerPage: int(req.MaxEntries),
}
var snapshots []godo.Snapshot
for {
snaps, resp, err := d.snapshots.ListVolume(ctx, listOpts)
if req.SnapshotId != "" {
snapshot, resp, err := d.snapshots.Get(ctx, req.SnapshotId)
if err != nil {
return nil, status.Errorf(codes.Aborted, "ListSnapshots listing volume snapshots has failed: %s", err.Error())
if resp == nil || resp.StatusCode != http.StatusNotFound {
return nil, status.Errorf(codes.Internal, "failed to get snapshot by ID %s: %s", req.SnapshotId, err)
}
} else {
snap, err := toCSISnapshot(snapshot)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to convert DO snapshot to CSI snapshot: %s", err)
}
listResp = &csi.ListSnapshotsResponse{
Entries: []*csi.ListSnapshotsResponse_Entry{
{
Snapshot: snap,
},
},
}
}
} else {
// Pagination in the CSI world works different than at DO. CSI sends the
// `req.MaxEntries` to indicate how much snapshots it wants. The
// req.StartingToken is returned by us, if we somehow need to indicate that
// we couldn't fetch and need to fetch again. But it's NOT the page number.
// I.e: suppose CSI wants us to fetch 50 entries, we only fetch 30, we need to
// return NextToken as 31 (so req.StartingToken will be set to 31 when CSI
// calls us again), to indicate that we want to continue returning from the
// index 31 up to 50.

var nextToken int
var err error
if req.StartingToken != "" {
nextToken, err = strconv.Atoi(req.StartingToken)
if err != nil {
return nil, status.Errorf(codes.Aborted, "ListSnapshots starting token %s is not valid : %s",
req.StartingToken, err.Error())
}
}

snapshots = append(snapshots, snaps...)

if resp.Links == nil || resp.Links.IsLastPage() {
break
if nextToken != 0 && req.MaxEntries != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't you have an offset coupled with the page count in this instance?

return nil, status.Errorf(codes.Aborted,
"ListSnapshots invalid arguments starting token: %d and max entries: %d can't be non null at the same time", nextToken, req.MaxEntries)
}

page, err := resp.Links.CurrentPage()
if err != nil {
return nil, err
// fetch all entries
listOpts := &godo.ListOptions{
PerPage: int(req.MaxEntries),
}
var snapshots []godo.Snapshot
for {
snaps, resp, err := d.snapshots.ListVolume(ctx, listOpts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused with this pagination section below. It seems like we paginate through every entry from the start, ignoring the offset of NextToken that is returned. Can we just determine the "pageNumber" in DO pagination terms as a calculation of the pageCount and the offset?

Copy link
Contributor Author

@timoreimann timoreimann Apr 17, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right, the logic looks off. I did not pay much attention to it because existing code was just moved into an else branch but was otherwise not modified. So I believe the pagination had been suboptimal before. Great catch. 👍

If you don't mind then I'd like to address this issue in a separate PR. FWIW, the flawed pagination should not have impacted users much since csi-snapshotter always passes in a snapshot ID in the ListSnapshots request, and our handling of that parameter is only fixed by this PR. Being good CSI citizens, however, we should still fix the pagination for the case that ListSnapshots is ever consumed exhaustively either in the future or by other COs (container orchestrators) possibly today already.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 That works for me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix submitted in #300.

if err != nil {
return nil, status.Errorf(codes.Aborted, "ListSnapshots listing volume snapshots has failed: %s", err.Error())
}

listOpts.Page = page + 1
listOpts.PerPage = len(snaps)
}
snapshots = append(snapshots, snaps...)

if nextToken > len(snapshots) {
return nil, status.Error(codes.Aborted, "ListSnapshots starting token is greater than total number of snapshots")
}
if resp.Links == nil || resp.Links.IsLastPage() {
break
}

if nextToken != 0 {
snapshots = snapshots[nextToken:]
}
page, err := resp.Links.CurrentPage()
if err != nil {
return nil, err
}

if req.MaxEntries != 0 {
nextToken = len(snapshots) - int(req.MaxEntries) - 1
snapshots = snapshots[:req.MaxEntries]
}
listOpts.Page = page + 1
listOpts.PerPage = len(snaps)
}

entries := make([]*csi.ListSnapshotsResponse_Entry, 0, len(snapshots))
for _, snapshot := range snapshots {
snap, err := toCSISnapshot(&snapshot)
if err != nil {
return nil, status.Errorf(codes.Internal,
"couldn't convert DO snapshot to CSI snapshot: %s", err.Error())
if nextToken > len(snapshots) {
return nil, status.Error(codes.Aborted, "ListSnapshots starting token is greater than total number of snapshots")
}

entries = append(entries, &csi.ListSnapshotsResponse_Entry{
Snapshot: snap,
})
}
if nextToken != 0 {
snapshots = snapshots[nextToken:]
}

listResp := &csi.ListSnapshotsResponse{
Entries: entries,
NextToken: strconv.Itoa(nextToken),
if req.MaxEntries != 0 {
nextToken = len(snapshots) - int(req.MaxEntries) - 1
snapshots = snapshots[:req.MaxEntries]
}

entries := make([]*csi.ListSnapshotsResponse_Entry, 0, len(snapshots))
for _, snapshot := range snapshots {
snap, err := toCSISnapshot(&snapshot)
if err != nil {
return nil, status.Errorf(codes.Internal,
"failed to convert DO snapshot to CSI snapshot: %s", err)
}

entries = append(entries, &csi.ListSnapshotsResponse_Entry{
Snapshot: snap,
})
}
listResp = &csi.ListSnapshotsResponse{
Entries: entries,
NextToken: strconv.Itoa(nextToken),
}
}

filterSnapshotEntriesForVolumeID(listResp, req.SourceVolumeId)

log.WithField("response", listResp).Info("snapshots listed")
return listResp, nil
}
Expand Down Expand Up @@ -1176,3 +1202,17 @@ func (d *Driver) tagVolume(parentCtx context.Context, vol *godo.Volume) error {
_, err = d.tags.TagResources(ctx, d.doTag, tagReq)
return err
}

func filterSnapshotEntriesForVolumeID(listResp *csi.ListSnapshotsResponse, sourceVolumeID string) {
if sourceVolumeID == "" {
return
}

var filteredEntries []*csi.ListSnapshotsResponse_Entry
for _, entry := range listResp.Entries {
if entry.Snapshot.SourceVolumeId == sourceVolumeID {
filteredEntries = append(filteredEntries, entry)
}
}
listResp.Entries = filteredEntries
}