Skip to content

Commit

Permalink
*: pre-finish puller and add a pull cmd
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 committed Aug 27, 2019
1 parent 9442c3e commit 9270a7c
Show file tree
Hide file tree
Showing 18 changed files with 912 additions and 73 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ bin
cscope.*
**/*.swp

cmd/cdc/cdc

# Files generated when testing
vendor/
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ dev: check test
build: cdc

cdc:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc cmd/cdc/main.go
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./main.go

install:
go install ./...
Expand Down
47 changes: 31 additions & 16 deletions cdc/buffer.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,39 @@
package cdc

import "context"
import (
"context"

"github.com/pingcap/tidb-cdc/cdc/util"
)

// buffer entry from kv layer
type bufferEntry struct {
kv *KVEntry
resolved *ResolvedSpan
type BufferEntry struct {
KV *KVEntry
Resolved *ResolvedSpan
}

func (e *BufferEntry) GetValue() interface{} {
if e.KV != nil {
return e.KV
} else if e.Resolved != nil {
return e.Resolved
} else {
return nil
}
}

type buffer struct {
entriesCh chan bufferEntry
// Buffer buffer kv entry
type Buffer struct {
entriesCh chan BufferEntry
}

func makeBuffer() *buffer {
return &buffer{
entriesCh: make(chan bufferEntry),
func MakeBuffer() *Buffer {
return &Buffer{
entriesCh: make(chan BufferEntry),
}
}

func (b *buffer) AddEntry(ctx context.Context, entry bufferEntry) error {
func (b *Buffer) AddEntry(ctx context.Context, entry BufferEntry) error {
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -27,18 +42,18 @@ func (b *buffer) AddEntry(ctx context.Context, entry bufferEntry) error {
}
}

func (b *buffer) AddKVEntry(ctx context.Context, kv *KVEntry) error {
return b.AddEntry(ctx, bufferEntry{kv: kv})
func (b *Buffer) AddKVEntry(ctx context.Context, kv *KVEntry) error {
return b.AddEntry(ctx, BufferEntry{KV: kv})
}

func (b *buffer) AddResolved(ctx context.Context, span Span, ts uint64) error {
return b.AddEntry(ctx, bufferEntry{resolved: &ResolvedSpan{Span: span, Timestamp: ts}})
func (b *Buffer) AddResolved(ctx context.Context, span util.Span, ts uint64) error {
return b.AddEntry(ctx, BufferEntry{Resolved: &ResolvedSpan{Span: span, Timestamp: ts}})
}

func (b *buffer) Get(ctx context.Context) (bufferEntry, error) {
func (b *Buffer) Get(ctx context.Context) (BufferEntry, error) {
select {
case <-ctx.Done():
return bufferEntry{}, ctx.Err()
return BufferEntry{}, ctx.Err()
case e := <-b.entriesCh:
return e, nil
}
Expand Down
24 changes: 16 additions & 8 deletions cdc/capture.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package cdc

import "context"
import (
"context"

pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb-cdc/cdc/util"
)

type OpType int

Expand All @@ -12,7 +17,8 @@ const (

// Capture watch some span of KV and emit the entries to sink according to the ChangeFeedDetail
type Capture struct {
watchs []Span
pdCli pd.Client
watchs []util.Span
checkpointTS uint64
encoder Encoder
detail ChangeFeedDetail
Expand All @@ -34,12 +40,13 @@ type KVEntry struct {
}

type ResolvedSpan struct {
Span Span
Span util.Span
Timestamp uint64
}

func NewCapture(
watchs []Span,
pdCli pd.Client,
watchs []util.Span,
checkpointTS uint64,
detail ChangeFeedDetail,
) (c *Capture, err error) {
Expand All @@ -54,6 +61,7 @@ func NewCapture(
}

c = &Capture{
pdCli: pdCli,
watchs: watchs,
checkpointTS: checkpointTS,
encoder: encoder,
Expand All @@ -68,9 +76,9 @@ func (c *Capture) Start(ctx context.Context) (err error) {
ctx, c.cancel = context.WithCancel(ctx)
defer c.cancel()

buf := makeBuffer()
buf := MakeBuffer()

puller := newPuller(c.checkpointTS, c.watchs, c.detail, buf)
puller := NewPuller(c.pdCli, c.checkpointTS, c.watchs, c.detail, buf)
c.errCh = make(chan error, 2)
go func() {
err := puller.Run(ctx)
Expand Down Expand Up @@ -98,13 +106,13 @@ func (c *Capture) Start(ctx context.Context) (err error) {
// Frontier handle all ResolvedSpan and emit resolved timestamp
type Frontier struct {
// once all the span receive a resolved ts, it's safe to emit a changefeed level resolved ts
spans []Span
spans []util.Span
detail ChangeFeedDetail
encoder Encoder
sink Sink
}

func NewFrontier(spans []Span, detail ChangeFeedDetail) (f *Frontier, err error) {
func NewFrontier(spans []util.Span, detail ChangeFeedDetail) (f *Frontier, err error) {
encoder, err := getEncoder(detail.Opts)
if err != nil {
return nil, err
Expand Down
21 changes: 15 additions & 6 deletions cdc/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb-cdc/cdc/util"
"github.com/pingcap/tidb/store/tikv/oracle"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
Expand All @@ -25,6 +27,7 @@ type emitEntry struct {
resolved *ResolvedSpan
}

// ChangeFeedDetail describe the detail of a ChangeFeed
type ChangeFeedDetail struct {
SinkURI string
Opts map[string]string
Expand All @@ -33,14 +36,20 @@ type ChangeFeedDetail struct {
}

type ChangeFeed struct {
pdCli pd.Client
detail ChangeFeedDetail
frontier *Frontier
}

func NewChangeFeed(detail ChangeFeedDetail) *ChangeFeed {
func NewChangeFeed(pdAddr []string, detail ChangeFeedDetail) (*ChangeFeed, error) {
pdCli, err := pd.NewClient(pdAddr, pd.SecurityOption{})
if err != nil {
return nil, errors.Annotatef(err, "create pd client failed, addr: %v", pdAddr)
}
return &ChangeFeed{
detail: detail,
}
pdCli: pdCli,
}, nil
}

func (c *ChangeFeed) Start(ctx context.Context) error {
Expand All @@ -50,13 +59,13 @@ func (c *ChangeFeed) Start(ctx context.Context) error {
}

var err error
c.frontier, err = NewFrontier([]Span{{nil, nil}}, c.detail)
c.frontier, err = NewFrontier([]util.Span{{nil, nil}}, c.detail)
if err != nil {
return errors.Annotate(err, "NewFrontier failed")
}

// TODO: just one capture watch all kv for test now
capure, err := NewCapture([]Span{{nil, nil}}, checkpointTS, c.detail)
capure, err := NewCapture(c.pdCli, []util.Span{{nil, nil}}, checkpointTS, c.detail)
if err != nil {
return errors.Annotate(err, "NewCapture failed")
}
Expand All @@ -78,7 +87,7 @@ func (c *ChangeFeed) Start(ctx context.Context) error {
// The returned closure is not threadsafe.
func kvsToRows(
detail ChangeFeedDetail,
inputFn func(context.Context) (bufferEntry, error),
inputFn func(context.Context) (BufferEntry, error),
) func(context.Context) (*emitEntry, error) {
panic("todo")
}
Expand All @@ -89,7 +98,7 @@ func kvsToRows(
// updates. The returned closure is not threadsafe.
func emitEntries(
detail ChangeFeedDetail,
watchedSpans []Span,
watchedSpans []util.Span,
encoder Encoder,
sink Sink,
inputFn func(context.Context) (*emitEntry, error),
Expand Down
Loading

0 comments on commit 9270a7c

Please sign in to comment.