Skip to content

Commit

Permalink
relay: add async/batch relay writer (#7580)
Browse files Browse the repository at this point in the history
ref #4287
  • Loading branch information
GMHDBJD authored Nov 29, 2022
1 parent 5cbc97f commit bac9c85
Show file tree
Hide file tree
Showing 6 changed files with 190 additions and 72 deletions.
136 changes: 100 additions & 36 deletions dm/relay/binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,41 @@
package relay

import (
"bytes"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
"go.uber.org/atomic"
"go.uber.org/zap"
)

const (
bufferSize = 1 * 1024 * 1024 // 1MB
chanSize = 1024
)

var nilErr error

// BinlogWriter is a binlog event writer which writes binlog events to a file.
// Open/Write/Close cannot be called concurrently.
type BinlogWriter struct {
mu sync.RWMutex

offset atomic.Int64
file *os.File
relayDir string
uuid string
filename string
uuid atomic.String
filename atomic.String
err atomic.Error

logger log.Logger

input chan []byte
flushWg sync.WaitGroup
wg sync.WaitGroup
}

// BinlogWriterStatus represents the status of a BinlogWriter.
Expand All @@ -64,6 +75,53 @@ func NewBinlogWriter(logger log.Logger, relayDir string) *BinlogWriter {
}
}

// run starts the binlog writer.
func (w *BinlogWriter) run() {
var (
buf = &bytes.Buffer{}
errOccurs bool
)

// writeToFile writes buffer to file
writeToFile := func() {
if buf.Len() == 0 {
return
}

if w.file == nil {
w.err.CompareAndSwap(nilErr, terror.ErrRelayWriterNotOpened.Generate())
errOccurs = true
return
}
n, err := w.file.Write(buf.Bytes())
if err != nil {
w.err.CompareAndSwap(nilErr, terror.ErrBinlogWriterWriteDataLen.Delegate(err, n))
errOccurs = true
return
}
buf.Reset()
}

for bs := range w.input {
if errOccurs {
continue
}
if bs != nil {
buf.Write(bs)
}
// we use bs = nil to mean flush
if bs == nil || buf.Len() > bufferSize || len(w.input) == 0 {
writeToFile()
}
if bs == nil {
w.flushWg.Done()
}
}
if !errOccurs {
writeToFile()
}
}

func (w *BinlogWriter) Open(uuid, filename string) error {
fullName := filepath.Join(w.relayDir, uuid, filename)
f, err := os.OpenFile(fullName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0o600)
Expand All @@ -79,58 +137,66 @@ func (w *BinlogWriter) Open(uuid, filename string) error {
return terror.ErrBinlogWriterGetFileStat.Delegate(err, f.Name())
}

w.mu.Lock()
defer w.mu.Unlock()

w.offset.Store(fs.Size())
w.file = f
w.uuid = uuid
w.filename = filename
w.uuid.Store(uuid)
w.filename.Store(filename)
w.err.Store(nilErr)

w.input = make(chan []byte, chanSize)
w.wg.Add(1)
go func() {
defer w.wg.Done()
w.run()
}()

return nil
}

func (w *BinlogWriter) Close() error {
w.mu.Lock()
defer w.mu.Unlock()
if w.input != nil {
close(w.input)
}
w.wg.Wait()

var err error
if w.file != nil {
err2 := w.file.Sync() // try sync manually before close.
if err2 != nil {
w.logger.Error("fail to flush buffered data", zap.String("component", "file writer"), zap.Error(err2))
if err := w.file.Sync(); err != nil {
w.logger.Error("fail to flush buffered data", zap.String("component", "file writer"), zap.Error(err))
}
if err := w.file.Close(); err != nil {
w.err.CompareAndSwap(nilErr, err)
}
err = w.file.Close()
}

w.file = nil
w.offset.Store(0)
w.uuid = ""
w.filename = ""

return err
w.uuid.Store("")
w.filename.Store("")
w.input = nil
return w.err.Swap(nilErr)
}

func (w *BinlogWriter) Write(rawData []byte) error {
w.mu.RLock()
defer w.mu.RUnlock()

if w.file == nil {
return terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened"))
return terror.ErrRelayWriterNotOpened.Generate()
}
w.input <- rawData
w.offset.Add(int64(len(rawData)))
return w.err.Load()
}

n, err := w.file.Write(rawData)
w.offset.Add(int64(n))

return terror.ErrBinlogWriterWriteDataLen.Delegate(err, len(rawData))
func (w *BinlogWriter) Flush() error {
w.flushWg.Add(1)
if err := w.Write(nil); err != nil {
return err
}
w.flushWg.Wait()
return w.err.Load()
}

func (w *BinlogWriter) Status() *BinlogWriterStatus {
w.mu.RLock()
defer w.mu.RUnlock()

return &BinlogWriterStatus{
Filename: w.filename,
Filename: w.filename.Load(),
Offset: w.offset.Load(),
}
}
Expand All @@ -140,7 +206,5 @@ func (w *BinlogWriter) Offset() int64 {
}

func (w *BinlogWriter) isActive(uuid, filename string) (bool, int64) {
w.mu.RLock()
defer w.mu.RUnlock()
return uuid == w.uuid && filename == w.filename, w.offset.Load()
return uuid == w.uuid.Load() && filename == w.filename.Load(), w.offset.Load()
}
93 changes: 61 additions & 32 deletions dm/relay/binlog_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,27 @@ import (
"bytes"
"os"
"path/filepath"
"strings"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

var _ = Suite(&testBinlogWriterSuite{})
type testBinlogWriterSuite struct {
suite.Suite
}

type testBinlogWriterSuite struct{}
func TestBinlogWriterSuite(t *testing.T) {
suite.Run(t, new(testBinlogWriterSuite))
}

func (t *testBinlogWriterSuite) TestWrite(c *C) {
dir := c.MkDir()
func (t *testBinlogWriterSuite) TestWrite() {
dir := t.T().TempDir()
uuid := "3ccc475b-2343-11e7-be21-6c0b84d59f30.000001"
binlogDir := filepath.Join(dir, uuid)
c.Assert(os.Mkdir(binlogDir, 0o755), IsNil)
require.NoError(t.T(), os.Mkdir(binlogDir, 0o755))

filename := "test-mysql-bin.000001"
var (
Expand All @@ -41,64 +47,87 @@ func (t *testBinlogWriterSuite) TestWrite(c *C) {

{
w := NewBinlogWriter(log.L(), dir)
c.Assert(w, NotNil)
c.Assert(w.Open(uuid, filename), IsNil)
require.NotNil(t.T(), w)
require.NoError(t.T(), w.Open(uuid, filename))
fwStatus := w.Status()
c.Assert(fwStatus.Filename, Equals, filename)
c.Assert(fwStatus.Offset, Equals, int64(allData.Len()))
require.Equal(t.T(), filename, fwStatus.Filename)
require.Equal(t.T(), int64(allData.Len()), fwStatus.Offset)
fwStatusStr := fwStatus.String()
c.Assert(strings.Contains(fwStatusStr, "filename"), IsTrue)
c.Assert(w.Close(), IsNil)
require.Contains(t.T(), fwStatusStr, filename)
require.NoError(t.T(), w.Close())
}

{
// not opened
w := NewBinlogWriter(log.L(), dir)
err := w.Write(data1)
c.Assert(err, ErrorMatches, "*not opened")
require.Contains(t.T(), err.Error(), "no underlying writer opened")

// open non exist dir
err = w.Open("not-exist-uuid", "bin.000001")
c.Assert(err, ErrorMatches, "*no such file or directory")
require.Contains(t.T(), err.Error(), "no such file or directory")
}

{
// normal call flow
w := NewBinlogWriter(log.L(), dir)
err := w.Open(uuid, filename)
c.Assert(err, IsNil)
c.Assert(w.file, NotNil)
c.Assert(w.filename, Equals, filename)
c.Assert(w.offset.Load(), Equals, int64(0))
require.NoError(t.T(), err)
require.NotNil(t.T(), w.file)
require.Equal(t.T(), filename, w.filename.Load())
require.Equal(t.T(), int64(0), w.offset.Load())

err = w.Write(data1)
c.Assert(err, IsNil)
require.NoError(t.T(), err)
err = w.Flush()
require.NoError(t.T(), err)
allData.Write(data1)

fwStatus := w.Status()
c.Assert(fwStatus.Filename, Equals, filename)
c.Assert(fwStatus.Offset, Equals, int64(len(data1)))
require.Equal(t.T(), fwStatus.Filename, w.filename.Load())
require.Equal(t.T(), int64(len(data1)), fwStatus.Offset)

// write data again
data2 := []byte("another-data")
err = w.Write(data2)
c.Assert(err, IsNil)
require.NoError(t.T(), err)
allData.Write(data2)

c.Assert(w.offset.Load(), Equals, int64(allData.Len()))
require.LessOrEqual(t.T(), int64(allData.Len()), w.offset.Load())

err = w.Close()
c.Assert(err, IsNil)
c.Assert(w.file, IsNil)
c.Assert(w.filename, Equals, "")
c.Assert(w.offset.Load(), Equals, int64(0))

c.Assert(w.Close(), IsNil) // noop
require.NoError(t.T(), err)
require.Nil(t.T(), w.file)
require.Equal(t.T(), "", w.filename.Load())
require.Equal(t.T(), int64(0), w.offset.Load())

// try to read the data back
fullName := filepath.Join(binlogDir, filename)
dataInFile, err := os.ReadFile(fullName)
c.Assert(err, IsNil)
c.Assert(dataInFile, DeepEquals, allData.Bytes())
require.NoError(t.T(), err)
require.Equal(t.T(), allData.Bytes(), dataInFile)
}

{
// cover for error
w := NewBinlogWriter(log.L(), dir)
err := w.Open(uuid, filename)
require.NoError(t.T(), err)
require.NotNil(t.T(), w.file)

err = w.Write(data1)
require.NoError(t.T(), err)
err = w.Flush()
require.NoError(t.T(), err)

require.NoError(t.T(), w.file.Close())
// write data again
data2 := []byte("another-data")
// we cannot determine the error is caused by `Write` or `Flush`
// nolint:errcheck
w.Write(data2)
// nolint:errcheck
w.Flush()
require.True(t.T(), terror.ErrBinlogWriterWriteDataLen.Equal(w.Close()))
}
}
7 changes: 6 additions & 1 deletion dm/relay/local_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ func newBinlogReaderForTest(logger log.Logger, cfg *BinlogReaderConfig, notify b
func (t *testReaderSuite) setActiveRelayLog(r Process, uuid, filename string, offset int64) {
relay := r.(*Relay)
writer := relay.writer.(*FileWriter)
writer.out.uuid, writer.out.filename = uuid, filename
writer.out.uuid.Store(uuid)
writer.out.filename.Store(filename)
writer.out.offset.Store(offset)
}

Expand Down Expand Up @@ -1234,6 +1235,10 @@ func (m *mockFileWriterForActiveTest) Close() error {
panic("should be used")
}

func (m *mockFileWriterForActiveTest) Flush() error {
panic("should be used")
}

func (m *mockFileWriterForActiveTest) WriteEvent(ev *replication.BinlogEvent) (WResult, error) {
panic("should be used")
}
Expand Down
6 changes: 4 additions & 2 deletions dm/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,8 @@ func (r *Relay) handleEvents(
}

firstEvent := true
relayPosGauge := relayLogPosGauge.WithLabelValues("relay")
relayFileGauge := relayLogFileGauge.WithLabelValues("relay")
for {
// 1. read events from upstream server
readTimer := time.Now()
Expand Down Expand Up @@ -704,11 +706,11 @@ func (r *Relay) handleEvents(
}

relayLogWriteSizeHistogram.Observe(float64(e.Header.EventSize))
relayLogPosGauge.WithLabelValues("relay").Set(float64(lastPos.Pos))
relayPosGauge.Set(float64(lastPos.Pos))
if index, err2 := utils.GetFilenameIndex(lastPos.Name); err2 != nil {
r.logger.Error("parse binlog file name", zap.String("file name", lastPos.Name), log.ShortError(err2))
} else {
relayLogFileGauge.WithLabelValues("relay").Set(float64(index))
relayFileGauge.Set(float64(index))
}

if needSavePos {
Expand Down
Loading

0 comments on commit bac9c85

Please sign in to comment.