Skip to content

Commit

Permalink
Switch to s2 for bulk loader and backup/restore (#9315)
Browse files Browse the repository at this point in the history
  • Loading branch information
RJKeevil authored Feb 26, 2025
1 parent bdd48f0 commit f4c2345
Show file tree
Hide file tree
Showing 7 changed files with 14 additions and 14 deletions.
4 changes: 2 additions & 2 deletions dgraph/cmd/bulk/mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (

farm "github.com/dgryski/go-farm"
"github.com/golang/glog"
"github.com/golang/snappy"
"github.com/klauspost/compress/s2"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/dgo/v240/protos/api"
Expand Down Expand Up @@ -165,7 +165,7 @@ func (m *mapper) writeMapEntriesToFile(cbuf *z.Buffer, shardIdx int) {
x.Check(f.Close())
}()

w := snappy.NewBufferedWriter(f)
w := s2.NewWriter(f)
defer func() {
x.Check(w.Close())
}()
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"github.com/dustin/go-humanize"
"github.com/golang/glog"
"github.com/golang/snappy"
"github.com/klauspost/compress/s2"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/badger/v4"
Expand Down Expand Up @@ -212,7 +212,7 @@ func (mi *mapIterator) Close() error {
func newMapIterator(filename string) (*pb.MapHeader, *mapIterator) {
fd, err := os.Open(filename)
x.Check(err)
r := snappy.NewReader(fd)
r := s2.NewReader(fd)

// Read the header size.
reader := bufio.NewReaderSize(r, 16<<10)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ require (
github.com/gorilla/websocket v1.5.3
github.com/graph-gophers/graphql-go v1.5.0
github.com/hashicorp/vault/api v1.16.0
github.com/klauspost/compress v1.18.0
github.com/minio/minio-go/v6 v6.0.57
github.com/mitchellh/panicwrap v1.0.0
github.com/paulmach/go.geojson v1.5.0
Expand Down Expand Up @@ -117,7 +118,6 @@ require (
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/magiconair/properties v1.8.9 // indirect
github.com/minio/md5-simd v1.1.2 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,8 @@ github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQL
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.0.3/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
Expand Down
4 changes: 2 additions & 2 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"time"

"github.com/golang/glog"
"github.com/golang/snappy"
"github.com/klauspost/compress/s2"
"github.com/pkg/errors"
ostats "go.opencensus.io/stats"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -397,7 +397,7 @@ func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.BackupResponse,
// Without compression: 7m2s 33GB output.
// With snappy: 7m11s 9.5GB output.
// With snappy + S3: 7m54s 9.5GB output.
cWriter := snappy.NewBufferedWriter(eWriter)
cWriter := s2.NewWriter(eWriter)

stream := pr.DB.NewStreamAt(pr.Request.ReadTs)
stream.LogPrefix = "Dgraph.Backup"
Expand Down
6 changes: 3 additions & 3 deletions worker/restore_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (

"github.com/dustin/go-humanize"
"github.com/golang/glog"
"github.com/golang/snappy"
"github.com/klauspost/compress/s2"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -92,7 +92,7 @@ func (br *backupReader) WithEncryption(encKey x.Sensitive) *backupReader {
func (br *backupReader) WithCompression(comp string) *backupReader {
switch comp {
case "snappy":
br.r = snappy.NewReader(br.r)
br.r = s2.NewReader(br.r)
case "gzip", "":
r, err := gzip.NewReader(br.r)
br.setErr(err)
Expand Down Expand Up @@ -214,7 +214,7 @@ func (m *mapper) writeToDisk(buf *z.Buffer) error {
var lenBuf [4]byte
binary.BigEndian.PutUint32(lenBuf[:], uint32(len(headerBuf)))

w := snappy.NewBufferedWriter(f)
w := s2.NewWriter(f)
x.Check2(w.Write(lenBuf[:]))
x.Check2(w.Write(headerBuf))
x.Check(err)
Expand Down
4 changes: 2 additions & 2 deletions worker/restore_reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/dustin/go-humanize"
"github.com/golang/glog"
"github.com/golang/snappy"
"github.com/klauspost/compress/s2"
"google.golang.org/protobuf/proto"

"github.com/dgraph-io/badger/v4/y"
Expand Down Expand Up @@ -92,7 +92,7 @@ func (mi *mapIterator) Close() error {
func newMapIterator(filename string) (*pb.MapHeader, *mapIterator) {
fd, err := os.Open(filename)
x.Check(err)
r := snappy.NewReader(fd)
r := s2.NewReader(fd)

// Read the header size.
reader := bufio.NewReaderSize(r, 16<<10)
Expand Down

0 comments on commit f4c2345

Please sign in to comment.