Skip to content

Commit

Permalink
Initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
july2993 committed Aug 20, 2019
0 parents commit 9442c3e
Show file tree
Hide file tree
Showing 20 changed files with 1,435 additions and 0 deletions.
35 changes: 35 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
*.a
*.so

# Folders
_obj
_test

# Architecture specific extensions/prefixes
*.[568vq]
[568vq].out

*.cgo1.go
*.cgo2.c
_cgo_defun.c
_cgo_gotypes.go
_cgo_export.*

_testmain.go

*.exe
*.test
*.prof

bin
*.iml
.idea
.DS_Store

cscope.*
**/*.swp

# Files generated when testing
vendor/
99 changes: 99 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
### Makefile for tidb-cdc
.PHONY: build test check clean fmt cdc

PROJECT=tidb-cdc

# Ensure GOPATH is set before running build process.
ifeq "$(GOPATH)" ""
$(error Please set the environment variable GOPATH before running `make`)
endif
FAIL_ON_STDOUT := awk '{ print } END { if (NR > 0) { exit 1 } }'

CURDIR := $(shell pwd)
path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH)))
export PATH := $(path_to_add):$(PATH)

TEST_DIR := /tmp/tidb_cdc_test

GO := GO111MODULE=on go
GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
GOTEST := CGO_ENABLED=1 $(GO) test -p 3

ARCH := "`uname -s`"
LINUX := "Linux"
MAC := "Darwin"
PACKAGE_LIST := go list ./...| grep -vE 'vendor|proto'
PACKAGES := $$($(PACKAGE_LIST))
PACKAGE_DIRECTORIES := $(PACKAGE_LIST) | sed 's|github.com/pingcap/$(PROJECT)/||'
FILES := $$(find . -name '*.go' -type f | grep -vE 'vendor')

# LDFLAGS += -X "github.com/pingcap/tidb-cdc/pkg/version.BuildTS=$(shell date -u '+%Y-%m-%d %I:%M:%S')"
# LDFLAGS += -X "github.com/pingcap/tidb-cdc/pkg/version.GitHash=$(shell git rev-parse HEAD)"
# LDFLAGS += -X "github.com/pingcap/tidb-cdc/pkg/version.ReleaseVersion=$(shell git describe --tags --dirty)"

default: build buildsucc

buildsucc:
@echo Build TiDB CDC successfully!

all: dev install

dev: check test

build: cdc

cdc:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc cmd/cdc/main.go

install:
go install ./...

test:
mkdir -p "$(TEST_DIR)"
@export log_level=error;\
$(GOTEST) -cover -covermode=count -coverprofile="$(TEST_DIR)/cov.unit.out" $(PACKAGES)

fmt:
@echo "gofmt (simplify)"
@gofmt -s -l -w $(FILES) 2>&1 | $(FAIL_ON_STDOUT)

lint:tools/bin/revive
@echo "linting"
@tools/bin/revive -formatter friendly -config tools/check/revive.toml $(FILES)

vet:
@echo "vet"
$(GO) vet $(PACKAGES) 2>&1 | $(FAIL_ON_STDOUT)

tidy:
@echo "go mod tidy"
./tools/check/check-tidy.sh

check: fmt lint check-static tidy

coverage:
GO111MODULE=off go get github.com/wadey/gocovmerge
gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go" > "$(TEST_DIR)/all_cov.out"
ifeq ("$(JenkinsCI)", "1")
GO111MODULE=off go get github.com/mattn/goveralls
@goveralls -coverprofile=$(TEST_DIR)/all_cov.out -service=jenkins-ci -repotoken $(COVERALLS_TOKEN)
else
go tool cover -html "$(TEST_DIR)/all_cov.out" -o "$(TEST_DIR)/all_cov.html"
grep -F '<option' "$(TEST_DIR)/all_cov.html"
endif

check-static: tools/bin/golangci-lint
$(GO) mod vendor
tools/bin/golangci-lint --disable errcheck run $$($(PACKAGE_DIRECTORIES))

clean:
go clean -i ./...
rm -rf *.out

tools/bin/revive: tools/check/go.mod
cd tools/check; \
$(GO) build -o ../bin/revive github.com/mgechev/revive

tools/bin/golangci-lint: tools/check/go.mod
cd tools/check; \
$(GO) build -o ../bin/golangci-lint github.com/golangci/golangci-lint/cmd/golangci-lint
47 changes: 47 additions & 0 deletions cdc/buffer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package cdc

import "context"

// buffer entry from kv layer
type bufferEntry struct {
kv *KVEntry
resolved *ResolvedSpan
}

type buffer struct {
entriesCh chan bufferEntry
}

func makeBuffer() *buffer {
return &buffer{
entriesCh: make(chan bufferEntry),
}
}

func (b *buffer) AddEntry(ctx context.Context, entry bufferEntry) error {
select {
case <-ctx.Done():
return ctx.Err()
case b.entriesCh <- entry:
return nil
}
}

func (b *buffer) AddKVEntry(ctx context.Context, kv *KVEntry) error {
return b.AddEntry(ctx, bufferEntry{kv: kv})
}

func (b *buffer) AddResolved(ctx context.Context, span Span, ts uint64) error {
return b.AddEntry(ctx, bufferEntry{resolved: &ResolvedSpan{Span: span, Timestamp: ts}})
}

func (b *buffer) Get(ctx context.Context) (bufferEntry, error) {
select {
case <-ctx.Done():
return bufferEntry{}, ctx.Err()
case e := <-b.entriesCh:
return e, nil
}
}

// TODO limit memory buffer
132 changes: 132 additions & 0 deletions cdc/capture.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package cdc

import "context"

type OpType int

const (
OpTypeUnknow OpType = 0
OpTypePut OpType = 1
OpTypeDelete OpType = 2
)

// Capture watch some span of KV and emit the entries to sink according to the ChangeFeedDetail
type Capture struct {
watchs []Span
checkpointTS uint64
encoder Encoder
detail ChangeFeedDetail

// errCh contains the return values of the puller
errCh chan error
cancel context.CancelFunc

// sink is the Sink to write rows to.
// Resolved timestamps are never written by Capture
sink Sink
}

type KVEntry struct {
OpType OpType
Key []byte
Value []byte
TS uint64
}

type ResolvedSpan struct {
Span Span
Timestamp uint64
}

func NewCapture(
watchs []Span,
checkpointTS uint64,
detail ChangeFeedDetail,
) (c *Capture, err error) {
encoder, err := getEncoder(detail.Opts)
if err != nil {
return nil, err
}

sink, err := getSink(detail.SinkURI, detail.Opts)
if err != nil {
return nil, err
}

c = &Capture{
watchs: watchs,
checkpointTS: checkpointTS,
encoder: encoder,
sink: sink,
detail: detail,
}

return
}

func (c *Capture) Start(ctx context.Context) (err error) {
ctx, c.cancel = context.WithCancel(ctx)
defer c.cancel()

buf := makeBuffer()

puller := newPuller(c.checkpointTS, c.watchs, c.detail, buf)
c.errCh = make(chan error, 2)
go func() {
err := puller.Run(ctx)
c.errCh <- err
}()

rowsFn := kvsToRows(c.detail, buf.Get)
emitFn := emitEntries(c.detail, c.watchs, c.encoder, c.sink, rowsFn)

for {
resolved, err := emitFn(ctx)
if err != nil {
select {
case err = <-c.errCh:
default:
}
return err
}

// TODO: forward resolved span to Frontier
_ = resolved
}
}

// Frontier handle all ResolvedSpan and emit resolved timestamp
type Frontier struct {
// once all the span receive a resolved ts, it's safe to emit a changefeed level resolved ts
spans []Span
detail ChangeFeedDetail
encoder Encoder
sink Sink
}

func NewFrontier(spans []Span, detail ChangeFeedDetail) (f *Frontier, err error) {
encoder, err := getEncoder(detail.Opts)
if err != nil {
return nil, err
}

sink, err := getSink(detail.SinkURI, detail.Opts)
if err != nil {
return nil, err
}

f = &Frontier{
spans: spans,
detail: detail,
encoder: encoder,
sink: sink,
}

return
}

func (f *Frontier) NotifyResolvedSpan(resolve ResolvedSpan) error {
// TODO emit resolved timestamp once it's safe

return nil
}
Loading

0 comments on commit 9442c3e

Please sign in to comment.