Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Binlog: Improve ZstdInMemoryDecompressorMaxSize management #17220

Merged
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Flags:
--backup_storage_compress if set, the backup files will be compressed. (default true)
--backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, in parallel, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression. (default 2)
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--binlog-in-memory-decompressor-max-size uint This value sets the compressed transaction payload size at which we switch from in-memory decompression to the slower streaming mode. It also controls the maximum memory to be used when in streaming mode. (default 134217728)
--binlog_host string PITR restore parameter: hostname/IP of binlog server.
--binlog_password string PITR restore parameter: password of binlog server.
--binlog_player_protocol string the protocol to download binlogs from a vttablet (default "grpc")
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Flags:
--backup_storage_implementation string Which backup storage implementation to use for creating and restoring backups.
--backup_storage_number_blocks int if backup_storage_compress is true, backup_storage_number_blocks sets the number of blocks that can be processed, in parallel, before the writer blocks, during compression (default is 2). It should be equal to the number of CPUs available for compression. (default 2)
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--binlog-in-memory-decompressor-max-size uint This value sets the compressed transaction payload size at which we switch from in-memory decompression to the slower streaming mode. It also controls the maximum memory to be used when in streaming mode. (default 134217728)
--binlog_host string PITR restore parameter: hostname/IP of binlog server.
--binlog_password string PITR restore parameter: password of binlog server.
--binlog_player_grpc_ca string the server ca to use to validate servers when connecting
Expand Down
14 changes: 7 additions & 7 deletions go/mysql/binlog_event_compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,6 @@ const (
// Length of the binlog event header for internal events within
// the transaction payload.
headerLen = binlogEventLenOffset + eventLenBytes

// At what size should we switch from the in-memory buffer
// decoding to streaming mode which is much slower, but does
// not require everything be done in memory.
zstdInMemoryDecompressorMaxSize = 128 << (10 * 2) // 128MiB
)

var (
Expand All @@ -75,6 +70,11 @@ var (
compressedTrxPayloadsInMem = stats.NewCounter("CompressedTransactionPayloadsInMemory", "The number of compressed binlog transaction payloads that were processed in memory")
compressedTrxPayloadsUsingStream = stats.NewCounter("CompressedTransactionPayloadsViaStream", "The number of compressed binlog transaction payloads that were processed using a stream")

// At what size should we switch from the in-memory buffer
// decoding to streaming mode which is much slower, but does
// not require everything be done in memory all at once.
ZstdInMemoryDecompressorMaxSize = uint64(128 << (10 * 2)) // 128MiB

// A concurrent stateless decoder that caches decompressors. This is
// used for smaller payloads that we want to handle entirely using
// in-memory buffers via DecodeAll.
Expand Down Expand Up @@ -284,7 +284,7 @@ func (tp *TransactionPayload) decompress() error {

// Switch to slower but less memory intensive stream mode for
// larger payloads.
if tp.uncompressedSize > zstdInMemoryDecompressorMaxSize {
if tp.uncompressedSize > ZstdInMemoryDecompressorMaxSize {
in := bytes.NewReader(tp.payload)
streamDecoder, err := statefulDecoderPool.Get(in)
if err != nil {
Expand Down Expand Up @@ -366,7 +366,7 @@ func (dp *decoderPool) Get(reader io.Reader) (*zstd.Decoder, error) {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "[BUG] expected *zstd.Decoder but got %T", pooled)
}
} else {
d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(zstdInMemoryDecompressorMaxSize))
d, err := zstd.NewReader(nil, zstd.WithDecoderMaxMemory(ZstdInMemoryDecompressorMaxSize))
if err != nil { // Should only happen e.g. due to ENOMEM
return nil, vterrors.Wrap(err, "failed to create stateful stream decoder")
}
Expand Down
2 changes: 1 addition & 1 deletion go/mysql/binlog_event_mysql56_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func TestMysql56DecodeTransactionPayload(t *testing.T) {
totalSize += len(eventStr)
require.True(t, strings.HasPrefix(eventStr, want))
}
require.Greater(t, totalSize, zstdInMemoryDecompressorMaxSize)
require.Greater(t, uint64(totalSize), ZstdInMemoryDecompressorMaxSize)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/spf13/pflag"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/servenv"
)

Expand Down Expand Up @@ -94,4 +95,6 @@ func registerFlags(fs *pflag.FlagSet) {
fs.BoolVar(&vreplicationStoreCompressedGTID, "vreplication_store_compressed_gtid", vreplicationStoreCompressedGTID, "Store compressed gtids in the pos column of the sidecar database's vreplication table")

fs.IntVar(&vreplicationParallelInsertWorkers, "vreplication-parallel-insert-workers", vreplicationParallelInsertWorkers, "Number of parallel insertion workers to use during copy phase. Set <= 1 to disable parallelism, or > 1 to enable concurrent insertion during copy phase.")

fs.Uint64Var(&mysql.ZstdInMemoryDecompressorMaxSize, "binlog-in-memory-decompressor-max-size", mysql.ZstdInMemoryDecompressorMaxSize, "This value sets the compressed transaction payload size at which we switch from in-memory decompression to the slower streaming mode. It also controls the maximum memory to be used when in streaming mode.")
}
Loading