Skip to content

Commit 94fa3ab

Browse files
tritoneashmeenkaur
andauthored
feat(storage): Add Flush for appendable writes (#11791)
* feat(storage): Add Flush for appendable writes Adds Writer.Flush, which flushes any data in the internal buffer to GCS. Only supported for appendable writes. Preview feature. * test and error fixes * fix open writer test * return non-EOF error. * Update storage/client_test.go Co-authored-by: Ashmeen Kaur <[email protected]> * fix Flush after Close error * fix data race * add flush at close test * fix conditions test * fix other test issues --------- Co-authored-by: Ashmeen Kaur <[email protected]>
1 parent 484978a commit 94fa3ab

File tree

5 files changed

+295
-12
lines changed

5 files changed

+295
-12
lines changed

storage/client.go

+5
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,11 @@ type openWriterParams struct {
281281
// setObj callback for reporting the resulting object - see `Writer.obj`.
282282
// Required.
283283
setObj func(*ObjectAttrs)
284+
// setFlush callback for providing a Flush function implementation - see `Writer.Flush`.
285+
// Required.
286+
setFlush func(func() (int64, error))
287+
// setPipeWriter callback for reseting `Writer.pw` if needed.
288+
setPipeWriter func(*io.PipeWriter)
284289
}
285290

286291
type newMultiRangeDownloaderParams struct {

storage/client_test.go

+187-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"net/http"
2727
"net/url"
2828
"os"
29+
"slices"
2930
"strconv"
3031
"strings"
3132
"testing"
@@ -904,6 +905,7 @@ func TestOpenWriterEmulated(t *testing.T) {
904905
setError: func(_ error) {}, // no-op
905906
progress: func(_ int64) {}, // no-op
906907
setObj: func(o *ObjectAttrs) { gotAttrs = o },
908+
setFlush: func(f func() (int64, error)) {},
907909
}
908910
pw, err := client.OpenWriter(params)
909911
if err != nil {
@@ -997,7 +999,7 @@ func TestOpenAppendableWriterEmulated(t *testing.T) {
997999
})
9981000
}
9991001

1000-
func TestOpenAppendableWriterMultipleFlushesEmulated(t *testing.T) {
1002+
func TestOpenAppendableWriterMultipleChunksEmulated(t *testing.T) {
10011003
transportClientTest(skipHTTP("appends only supported via gRPC"), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
10021004
// Populate test data.
10031005
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{
@@ -1052,6 +1054,187 @@ func TestOpenAppendableWriterMultipleFlushesEmulated(t *testing.T) {
10521054
})
10531055
}
10541056

1057+
func TestWriterFlushEmulated(t *testing.T) {
1058+
transportClientTest(skipHTTP("appends only supported via gRPC"), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
1059+
// Populate test data.
1060+
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{
1061+
Name: bucket,
1062+
}, nil)
1063+
if err != nil {
1064+
t.Fatalf("client.CreateBucket: %v", err)
1065+
}
1066+
prefix := time.Now().Nanosecond()
1067+
objName := fmt.Sprintf("%d-object-%d", prefix, time.Now().Nanosecond())
1068+
1069+
vc := &Client{tc: client}
1070+
w := vc.Bucket(bucket).Object(objName).NewWriter(ctx)
1071+
w.Append = true
1072+
w.ChunkSize = 3 * MiB
1073+
var gotOffsets []int64
1074+
w.ProgressFunc = func(offset int64) {
1075+
gotOffsets = append(gotOffsets, offset)
1076+
}
1077+
// Flush at 0 and at a range of values, both aligned with ChunkSize and where explicit flushes are expected.
1078+
wantOffsets := []int64{0, 3 * MiB, 4*MiB + 1024, 4*MiB + 2048, 4*MiB + 3072, 6*MiB + 4096, 9 * MiB}
1079+
1080+
// Flush first with no data since this is a special case to test.
1081+
off, err := w.Flush()
1082+
if err != nil {
1083+
t.Errorf("flushing at 0: %v", err)
1084+
}
1085+
if off != 0 {
1086+
t.Errorf("flushing at 0: got %v offset, want 0", off)
1087+
}
1088+
1089+
// Write first 4 MiB data, expect progress every 3 MiB.
1090+
_, err = w.Write(randomBytes9MiB[0 : 4*MiB])
1091+
if err != nil {
1092+
t.Fatalf("writing test data: got %v; want ok", err)
1093+
}
1094+
1095+
// Write another 1KiB three times and do a Flush each time.
1096+
for i := 0; i < 3; i++ {
1097+
start := int64(4*MiB + i*1024)
1098+
end := int64(4*MiB + (i+1)*1024)
1099+
n, err := w.Write(randomBytes9MiB[start:end])
1100+
if err != nil {
1101+
t.Fatalf("writing 1k data: got %v; want ok", err)
1102+
}
1103+
if n != 1024 {
1104+
t.Errorf("writing 1k data: got %v bytes written, want %v", n, 1024)
1105+
}
1106+
off, err := w.Flush()
1107+
1108+
if err != nil {
1109+
t.Fatalf("flushing 1k data: got %v; want ok", err)
1110+
}
1111+
if off != end {
1112+
t.Errorf("flushing 1k data: got %v bytes committed, want %v", off, end)
1113+
}
1114+
}
1115+
1116+
// Do one more Flush that would require multiple messages (over 2 MiB).
1117+
n, err := w.Write(randomBytes9MiB[4*MiB+3072 : 6*MiB+4096])
1118+
if err != nil {
1119+
t.Fatalf("writing 2MiB + 1k data: got %v; want ok", err)
1120+
}
1121+
if n != 2*MiB+1024 {
1122+
t.Errorf("writing 2 MiB + 1k data: got %v bytes written, want %v", n, 2*MiB+1024)
1123+
}
1124+
off, err = w.Flush()
1125+
if err != nil {
1126+
t.Fatalf("flushing 2 MiB + 1k data: got %v; want ok", err)
1127+
}
1128+
if off != 6*MiB+4096 {
1129+
t.Errorf("flushing 2 MiB + 1k data: got %v bytes committed, want %v", off, 6*MiB+4096)
1130+
}
1131+
1132+
// Do one more zero-byte flush; expect noop for this.
1133+
off, err = w.Flush()
1134+
if err != nil {
1135+
t.Fatalf("flushing 0b data: got %v; want ok", err)
1136+
}
1137+
if off != 6*MiB+4096 {
1138+
t.Errorf("flushing 0b data: got %v bytes committed, want %v", off, 6*MiB+4096)
1139+
}
1140+
1141+
// Write remainder of data and close the writer.
1142+
if _, err := w.Write(randomBytes9MiB[6*MiB+4096:]); err != nil {
1143+
t.Fatalf("writing remaining data: got %v; want ok", err)
1144+
}
1145+
if err := w.Close(); err != nil {
1146+
t.Fatalf("closing writer: %v", err)
1147+
}
1148+
1149+
// Check that Flush after close fails.
1150+
if _, err = w.Flush(); err == nil {
1151+
t.Errorf("flush: expected error after close, got nil")
1152+
}
1153+
1154+
// Check offsets
1155+
if !slices.Equal(gotOffsets, wantOffsets) {
1156+
t.Errorf("progress offsets: got %v, want %v", gotOffsets, wantOffsets)
1157+
}
1158+
1159+
// Download object and check data
1160+
r, err := veneerClient.Bucket(bucket).Object(objName).NewReader(ctx)
1161+
defer r.Close()
1162+
if err != nil {
1163+
t.Fatalf("opening reading: %v", err)
1164+
}
1165+
wantLen := len(randomBytes9MiB)
1166+
got, err := io.ReadAll(r)
1167+
if n := len(got); n != wantLen {
1168+
t.Fatalf("expected to read %d bytes, but got %d (%v)", wantLen, n, err)
1169+
}
1170+
if diff := cmp.Diff(got, randomBytes9MiB); diff != "" {
1171+
t.Fatalf("checking written content: got(-), want(+):\n%s", diff)
1172+
}
1173+
})
1174+
}
1175+
1176+
func TestWriterFlushAtCloseEmulated(t *testing.T) {
1177+
transportClientTest(skipHTTP("appends only supported via gRPC"), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
1178+
// Populate test data.
1179+
_, err := client.CreateBucket(ctx, project, bucket, &BucketAttrs{
1180+
Name: bucket,
1181+
}, nil)
1182+
if err != nil {
1183+
t.Fatalf("client.CreateBucket: %v", err)
1184+
}
1185+
prefix := time.Now().Nanosecond()
1186+
objName := fmt.Sprintf("%d-object-%d", prefix, time.Now().Nanosecond())
1187+
1188+
vc := &Client{tc: client}
1189+
w := vc.Bucket(bucket).Object(objName).NewWriter(ctx)
1190+
w.Append = true
1191+
w.ChunkSize = MiB
1192+
var gotOffsets []int64
1193+
w.ProgressFunc = func(offset int64) {
1194+
gotOffsets = append(gotOffsets, offset)
1195+
}
1196+
wantOffsets := []int64{MiB, 2 * MiB, 3 * MiB}
1197+
1198+
// Test Flush right before close only.
1199+
n, err := w.Write(randomBytes3MiB)
1200+
if err != nil {
1201+
t.Fatalf("writing data: got %v; want ok", err)
1202+
}
1203+
if n != 3*MiB {
1204+
t.Errorf("writing data: got %v bytes written, want %v", n, 3*MiB)
1205+
}
1206+
off, err := w.Flush()
1207+
if err != nil {
1208+
t.Fatalf("flush: got %v; want ok", err)
1209+
}
1210+
if off != 3*MiB {
1211+
t.Errorf("flushing data: got %v bytes written, want %v", off, 3*MiB)
1212+
}
1213+
if err := w.Close(); err != nil {
1214+
t.Fatalf("closing writer: %v", err)
1215+
}
1216+
// Check offsets
1217+
if !slices.Equal(gotOffsets, wantOffsets) {
1218+
t.Errorf("progress offsets: got %v, want %v", gotOffsets, wantOffsets)
1219+
}
1220+
1221+
// Download object and check data
1222+
r, err := veneerClient.Bucket(bucket).Object(objName).NewReader(ctx)
1223+
defer r.Close()
1224+
if err != nil {
1225+
t.Fatalf("opening reading: %v", err)
1226+
}
1227+
wantLen := 3 * MiB
1228+
got, err := io.ReadAll(r)
1229+
if n := len(got); n != wantLen {
1230+
t.Fatalf("expected to read %d bytes, but got %d (%v)", wantLen, n, err)
1231+
}
1232+
if diff := cmp.Diff(got, randomBytes3MiB); diff != "" {
1233+
t.Fatalf("checking written content: got(-), want(+):\n%s", diff)
1234+
}
1235+
})
1236+
}
1237+
10551238
func TestListNotificationsEmulated(t *testing.T) {
10561239
transportClientTest(skipGRPC("notifications not implemented"), t, func(t *testing.T, ctx context.Context, project, bucket string, client storageClient) {
10571240
// Populate test object.
@@ -1705,6 +1888,7 @@ func TestObjectConditionsEmulated(t *testing.T) {
17051888
},
17061889
progress: nil,
17071890
setObj: nil,
1891+
setFlush: func(f func() (int64, error)) {},
17081892
})
17091893
return err
17101894
},
@@ -2074,6 +2258,7 @@ func TestWriterChunkTransferTimeoutEmulated(t *testing.T) {
20742258
setError: func(_ error) {}, // no-op
20752259
progress: func(_ int64) {}, // no-op
20762260
setObj: func(o *ObjectAttrs) { gotAttrs = o },
2261+
setFlush: func(func() (int64, error)) {}, // no-op
20772262
}
20782263

20792264
pw, err := client.OpenWriter(params)
@@ -2168,6 +2353,7 @@ func TestWriterChunkRetryDeadlineEmulated(t *testing.T) {
21682353
setError: func(_ error) {}, // no-op
21692354
progress: func(_ int64) {}, // no-op
21702355
setObj: func(_ *ObjectAttrs) {},
2356+
setFlush: func(f func() (int64, error)) {},
21712357
}
21722358

21732359
pw, err := client.OpenWriter(params, &idempotentOption{true})

storage/grpc_client.go

+51-7
Original file line numberDiff line numberDiff line change
@@ -1703,6 +1703,7 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
17031703
var offset int64
17041704
errorf := params.setError
17051705
setObj := params.setObj
1706+
setFlush := params.setFlush
17061707
pr, pw := io.Pipe()
17071708

17081709
s := callSettings(c.settings, opts...)
@@ -1728,11 +1729,16 @@ func (c *grpcStorageClient) OpenWriter(params *openWriterParams, opts ...storage
17281729
}
17291730

17301731
var gw *gRPCWriter
1731-
gw, err := newGRPCWriter(c, s, params, r)
1732+
gw, err := newGRPCWriter(c, s, params, r, pw, params.setPipeWriter)
17321733
if err != nil {
17331734
return err
17341735
}
17351736

1737+
// Set Flush func for use by exported Writer.Flush.
1738+
setFlush(func() (int64, error) {
1739+
return gw.flush()
1740+
})
1741+
17361742
// Loop until there is an error or the Object has been finalized.
17371743
for {
17381744
// Note: This blocks until either the buffer is full or EOF is read.
@@ -2572,7 +2578,7 @@ func (r *gRPCReader) reopenStream() error {
25722578
return nil
25732579
}
25742580

2575-
func newGRPCWriter(c *grpcStorageClient, s *settings, params *openWriterParams, r io.Reader) (*gRPCWriter, error) {
2581+
func newGRPCWriter(c *grpcStorageClient, s *settings, params *openWriterParams, r io.Reader, pw *io.PipeWriter, setPipeWriter func(*io.PipeWriter)) (*gRPCWriter, error) {
25762582
if params.attrs.Retention != nil {
25772583
// TO-DO: remove once ObjectRetention is available - see b/308194853
25782584
return nil, status.Errorf(codes.Unimplemented, "storage: object retention is not supported in gRPC")
@@ -2609,6 +2615,7 @@ func newGRPCWriter(c *grpcStorageClient, s *settings, params *openWriterParams,
26092615
c: c,
26102616
ctx: params.ctx,
26112617
reader: r,
2618+
pw: pw,
26122619
bucket: params.bucket,
26132620
attrs: params.attrs,
26142621
conds: params.conds,
@@ -2620,15 +2627,19 @@ func newGRPCWriter(c *grpcStorageClient, s *settings, params *openWriterParams,
26202627
forceOneShot: params.chunkSize <= 0,
26212628
forceEmptyContentType: params.forceEmptyContentType,
26222629
append: params.append,
2630+
setPipeWriter: setPipeWriter,
2631+
flushComplete: make(chan int64),
26232632
}, nil
26242633
}
26252634

26262635
// gRPCWriter is a wrapper around the the gRPC client-stream API that manages
26272636
// sending chunks of data provided by the user over the stream.
26282637
type gRPCWriter struct {
2629-
c *grpcStorageClient
2630-
buf []byte
2631-
reader io.Reader
2638+
c *grpcStorageClient
2639+
buf []byte
2640+
reader io.Reader
2641+
pw *io.PipeWriter
2642+
setPipeWriter func(*io.PipeWriter) // used to set in parent storage.Writer
26322643

26332644
ctx context.Context
26342645

@@ -2645,7 +2656,9 @@ type gRPCWriter struct {
26452656
forceEmptyContentType bool
26462657
append bool
26472658

2648-
streamSender gRPCBidiWriteBufferSender
2659+
streamSender gRPCBidiWriteBufferSender
2660+
flushInProgress bool // true when the pipe is being recreated for a flush.
2661+
flushComplete chan int64 // use to signal back to flush call that flush to server was completed.
26492662
}
26502663

26512664
func bucketContext(ctx context.Context, bucket string) context.Context {
@@ -2945,11 +2958,17 @@ func (w *gRPCWriter) uploadBuffer(ctx context.Context, recvd int, start int64, d
29452958
break
29462959
}
29472960
}
2961+
if w.flushInProgress {
2962+
w.flushInProgress = false
2963+
w.flushComplete <- offset
2964+
}
29482965
return
29492966
}
29502967

29512968
// read copies the data in the reader to the given buffer and reports how much
29522969
// data was read into the buffer and if there is no more data to read (EOF).
2970+
// read returns when either 1. the buffer is full, 2. Writer.Flush was called,
2971+
// or 3. Writer.Close was called.
29532972
func (w *gRPCWriter) read() (int, bool, error) {
29542973
// Set n to -1 to start the Read loop.
29552974
var n, recvd int = -1, 0
@@ -2961,12 +2980,37 @@ func (w *gRPCWriter) read() (int, bool, error) {
29612980
}
29622981
var done bool
29632982
if err == io.EOF {
2964-
done = true
29652983
err = nil
2984+
// EOF can come from Writer.Flush or Writer.Close.
2985+
if w.flushInProgress {
2986+
// Reset pipe for additional writes after the flush.
2987+
pr, pw := io.Pipe()
2988+
w.reader = pr
2989+
w.pw = pw
2990+
w.setPipeWriter(pw)
2991+
} else {
2992+
done = true
2993+
}
29662994
}
29672995
return recvd, done, err
29682996
}
29692997

2998+
// flush flushes the current buffer regardless of whether it is full or not.
2999+
// It's the implementation for Writer.Flush.
3000+
func (w *gRPCWriter) flush() (int64, error) {
3001+
if !w.append {
3002+
return 0, errors.New("Flush is supported only if Writer.Append is set to true")
3003+
}
3004+
3005+
// Close PipeWriter to trigger EOF on read side of the stream.
3006+
w.flushInProgress = true
3007+
w.pw.Close()
3008+
3009+
// Wait for flush to complete
3010+
offset := <-w.flushComplete
3011+
return offset, nil
3012+
}
3013+
29703014
func checkCanceled(err error) error {
29713015
if status.Code(err) == codes.Canceled {
29723016
return context.Canceled

storage/http_client.go

+3
Original file line numberDiff line numberDiff line change
@@ -970,6 +970,9 @@ func (c *httpStorageClient) OpenWriter(params *openWriterParams, opts ...storage
970970
setObj := params.setObj
971971
progress := params.progress
972972
attrs := params.attrs
973+
params.setFlush(func() (int64, error) {
974+
return 0, errors.New("Writer.Flush is only supported for gRPC-based clients")
975+
})
973976

974977
mediaOpts := []googleapi.MediaOption{
975978
googleapi.ChunkSize(params.chunkSize),

0 commit comments

Comments
 (0)