From 01436ce9887ccbb3155020dd9082e4e40ff651ce Mon Sep 17 00:00:00 2001 From: Chris Carlon <carlon@google.com> Date: Fri, 17 Jan 2025 03:42:52 +0000 Subject: [PATCH] feat(storage/append): Support appends in w1r3. Upload files in the w1r3 benchmark using the append writer. --- storage/internal/benchmarks/main.go | 9 +++++++++ storage/internal/benchmarks/upload_benchmark.go | 5 +++++ storage/internal/benchmarks/w1r3.go | 1 + 3 files changed, 15 insertions(+) diff --git a/storage/internal/benchmarks/main.go b/storage/internal/benchmarks/main.go index 2884f65313f9..eca901bf4c18 100644 --- a/storage/internal/benchmarks/main.go +++ b/storage/internal/benchmarks/main.go @@ -82,6 +82,8 @@ type benchmarkOptions struct { minChunkSize int64 maxChunkSize int64 + appendWrites bool + forceGC bool connPoolSize int @@ -142,6 +144,7 @@ func (b *benchmarkOptions) String() string { fmt.Sprintf("chunk size:\t\t%d - %d kib (library buffer for uploads)", b.minChunkSize/kib, b.maxChunkSize/kib), fmt.Sprintf("range offset:\t\t%d - %d bytes ", b.minReadOffset, b.maxReadOffset), fmt.Sprintf("range size:\t\t%d bytes (0 -> full object)", b.rangeSize), + fmt.Sprintf("append writes:\t\t%t", b.appendWrites), fmt.Sprintf("connection pool size:\t%d (GRPC)", b.connPoolSize), fmt.Sprintf("num workers:\t\t%d (max number of concurrent benchmark runs at a time)", b.numWorkers), fmt.Sprintf("force garbage collection:%t", b.forceGC), @@ -185,6 +188,8 @@ func parseFlags() { flag.Int64Var(&opts.minChunkSize, "min_chunksize", useDefault, "min chunksize in bytes") flag.Int64Var(&opts.maxChunkSize, "max_chunksize", useDefault, "max chunksize in bytes") + flag.BoolVar(&opts.appendWrites, "append_writes", false, "use the append writer") + flag.IntVar(&opts.connPoolSize, "connection_pool_size", 4, "GRPC connection pool size") flag.BoolVar(&opts.forceGC, "force_garbage_collection", false, "force garbage collection at the beginning of each upload") @@ -227,6 +232,10 @@ func parseFlags() { log.Fatalln("Could not parse object size") } } + + if opts.appendWrites && (opts.api != grpcAPI && opts.api != directPath) { + log.Fatalf("--append_writes requires GRPC or DirectPath; got %v", opts.api) + } } func main() { diff --git a/storage/internal/benchmarks/upload_benchmark.go b/storage/internal/benchmarks/upload_benchmark.go index 256be8caee49..539d087f4493 100644 --- a/storage/internal/benchmarks/upload_benchmark.go +++ b/storage/internal/benchmarks/upload_benchmark.go @@ -42,6 +42,8 @@ type uploadOpts struct { useDefaultChunkSize bool objectPath string timeout time.Duration + + appendWrites bool } func uploadBenchmark(ctx context.Context, uopts uploadOpts) (elapsedTime time.Duration, rerr error) { @@ -78,6 +80,9 @@ func uploadBenchmark(ctx context.Context, uopts uploadOpts) (elapsedTime time.Du if !uopts.useDefaultChunkSize { objectWriter.ChunkSize = int(uopts.params.chunkSize) } + if uopts.appendWrites { + objectWriter.Append = true + } mw, md5Hash, crc32cHash := generateUploadWriter(objectWriter, uopts.params.md5Enabled, uopts.params.crc32cEnabled) diff --git a/storage/internal/benchmarks/w1r3.go b/storage/internal/benchmarks/w1r3.go index 0026a2143467..b9f2c1ecf96c 100644 --- a/storage/internal/benchmarks/w1r3.go +++ b/storage/internal/benchmarks/w1r3.go @@ -169,6 +169,7 @@ func (r *w1r3) run(ctx context.Context) error { useDefaultChunkSize: opts.minChunkSize == useDefault || opts.maxChunkSize == useDefault, objectPath: r.objectPath, timeout: r.opts.timeoutPerOp, + appendWrites: r.opts.appendWrites, }) }, r.isWarmup)