Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics(cdc): fix mq sink write row count metrics. (#4192) #4323

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
89d2a91
fix the txn_batch_size metric inaccuracy bug when the sink target is MQ
zhaoxinyu Nov 24, 2021
41b5f65
address comments
zhaoxinyu Dec 1, 2021
381aa83
add comments for exported functions
zhaoxinyu Dec 1, 2021
cf6f73f
fix the compiling problem
zhaoxinyu Dec 9, 2021
339688e
workerpool: limit the rate to output deadlock warning (#3775) (#3795)
ti-chi-bot Dec 10, 2021
ef2636a
tests(ticdc): set up the sync diff output directory correctly (#3725)…
ti-chi-bot Dec 14, 2021
a4330bd
relay(dm): use binlog name comparison (#3710) (#3712)
ti-chi-bot Dec 14, 2021
95d038f
dm/load: fix concurrent call Loader.Status (#3459) (#3468)
ti-chi-bot Dec 17, 2021
4bb99d1
cdc/sorter: make unified sorter cgroup aware (#3436) (#3439)
ti-chi-bot Dec 17, 2021
70d2a98
tz (ticdc): fix timezone error (#3887) (#3906)
ti-chi-bot Dec 17, 2021
cc71f85
pkg,cdc: do not use log package (#3902) (#3940)
ti-chi-bot Dec 17, 2021
b58d176
*: rename repo from pingcap/ticdc to pingcap/tiflow (#3959)
amyangfei Dec 20, 2021
84a4d26
http_*: add log for http api and refine the err handle logic (#2997) …
ti-chi-bot Dec 20, 2021
7d57e2f
etcd_worker: batch etcd patch (#3277) (#3389)
ti-chi-bot Dec 20, 2021
e85d1e1
http_api (ticdc): check --cert-allowed-cn before add server common na…
ti-chi-bot Dec 20, 2021
998285e
kvclient(ticdc): fix kvclient takes too long time to recover (#3612) …
ti-chi-bot Dec 20, 2021
99eaa82
owner: fix owner tick block http request (#3490) (#3530)
ti-chi-bot Dec 20, 2021
9f32dd5
dm/syncer: use downstream PK/UK to generate DML (#3168) (#3256)
ti-chi-bot Dec 21, 2021
167d9c5
dep(dm): update go-mysql (#3914) (#3934)
ti-chi-bot Dec 21, 2021
d99b5a6
dm/syncer: multiple rows use downstream schema (#3308) (#3953)
ti-chi-bot Dec 21, 2021
d358c28
errorutil,sink,syncer: add errorutil to handle ignorable error (#3264…
lance6716 Dec 21, 2021
05802b3
dm/worker: don't exit when failed to read checkpoint in relay (#3345)…
ti-chi-bot Dec 22, 2021
5a9e78f
syncer(dm): use an early location to reset binlog and open safemode (…
lance6716 Dec 22, 2021
d9b5b4b
ticdc/owner: Fix ddl special comment syntax error (#3845) (#3978)
ti-chi-bot Dec 22, 2021
091efca
dm/scheduler: fix inconsistent of relay status (#3474) (#4009)
ti-chi-bot Dec 22, 2021
2290a12
owner,scheduler(cdc): fix nil pointer panic in owner scheduler (#2980…
ti-chi-bot Dec 24, 2021
5e7e3af
config(ticdc): Fix old value configuration check for maxwell protocol…
ti-chi-bot Dec 24, 2021
a189f0b
sink(ticdc): cherry pick sink bug fix to release 5.3 (#4083)
sdojjy Dec 28, 2021
1ba5290
master(dm): clean and treat invalid load task (#4004) (#4145)
ti-chi-bot Dec 30, 2021
8e1aaab
loader: fix wrong progress in query-status for loader (#4093) (#4143)
ti-chi-bot Dec 30, 2021
fcbf280
ticdc/processor: Fix backoff base delay misconfiguration (#3992) (#4028)
ti-chi-bot Dec 30, 2021
76de75d
dm: load table structure from dump files (#3295) (#4163)
ti-chi-bot Dec 30, 2021
263f852
compactor: fix duplicate entry in safemode (#3432) (#3434) (#4088)
ti-chi-bot Dec 30, 2021
f9c7619
kv(ticdc): reduce eventfeed rate limited log (#4072) (#4111)
ti-chi-bot Dec 30, 2021
ed96ff4
metrics(ticdc): add resolved ts and add changefeed to dataflow (#4038…
ti-chi-bot Dec 30, 2021
088edf5
This is an automated cherry-pick of #4192
3AceShowHand Jan 4, 2022
ec06b29
retry(dm): align with tidb latest error message (#4172) (#4254)
ti-chi-bot Jan 10, 2022
bd21afe
owner(ticdc): Add bootstrap and try to fix the meta information in it…
ti-chi-bot Jan 10, 2022
72ce433
redolog: add a precleanup process when s3 enable (#3525) (#3878)
ti-chi-bot Jan 10, 2022
de5de8b
ddl(dm): make skipped ddl pass `SplitDDL()` (#4176) (#4227)
ti-chi-bot Jan 10, 2022
b55018e
cdc/sink: remove Initialize method from the sink interface (#3682) (#…
ti-chi-bot Jan 11, 2022
08bd008
http_api (ticdc): fix http api 'get processor' panic. (#4117) (#4123)
ti-chi-bot Jan 12, 2022
b34f929
sink (ticdc): fix a deadlock due to checkpointTs fall back in sinkNod…
ti-chi-bot Jan 13, 2022
600f389
cdc/sink: adjust kafka initialization logic (#3192) (#4162)
ti-chi-bot Jan 13, 2022
16f5f42
try fix conflicts.
3AceShowHand Jan 13, 2022
62dd140
This is an automated cherry-pick of #4192
3AceShowHand Jan 4, 2022
b1b8182
Merge branch 'release-5.3' into cherry-pick-4192-to-release-5.3
3AceShowHand Jan 17, 2022
6f742e6
fix conflicts.
3AceShowHand Jan 18, 2022
59b593b
fix conflicts.
3AceShowHand Jan 18, 2022
7ee6dba
fix conflicts.
3AceShowHand Jan 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
cdc/sink: remove Initialize method from the sink interface (#3682) (#…
…3765)

Co-authored-by: Ling Jin <[email protected]>
  • Loading branch information
ti-chi-bot and 3AceShowHand committed Jan 13, 2022
commit b55018e7b88df1471c4151d72c6077df54ac6e6c
5 changes: 0 additions & 5 deletions cdc/owner/async_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ const (
// The EmitCheckpointTs and EmitDDLEvent is asynchronous function for now
// Other functions are still synchronization
type AsyncSink interface {
Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error
// EmitCheckpointTs emits the checkpoint Ts to downstream data source
// this function will return after recording the checkpointTs specified in memory immediately
// and the recorded checkpointTs will be sent and updated to downstream data source every second
Expand Down Expand Up @@ -101,10 +100,6 @@ func newAsyncSink(ctx cdcContext.Context) (AsyncSink, error) {
return asyncSink, nil
}

func (s *asyncSinkImpl) Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error {
return s.sink.Initialize(ctx, tableInfo)
}

func (s *asyncSinkImpl) run(ctx cdcContext.Context) {
defer s.wg.Done()
// TODO make the tick duration configurable
Expand Down
25 changes: 4 additions & 21 deletions cdc/owner/async_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,10 @@ type asyncSinkSuite struct{}

type mockSink struct {
sink.Sink
initTableInfo []*model.SimpleTableInfo
checkpointTs model.Ts
ddl *model.DDLEvent
ddlMu sync.Mutex
ddlError error
}

func (m *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
m.initTableInfo = tableInfo
return nil
checkpointTs model.Ts
ddl *model.DDLEvent
ddlMu sync.Mutex
ddlError error
}

func (m *mockSink) EmitCheckpointTs(ctx context.Context, ts uint64) error {
Expand Down Expand Up @@ -87,17 +81,6 @@ func newAsyncSink4Test(ctx cdcContext.Context, c *check.C) (cdcContext.Context,
return ctx, sink, mockSink
}

func (s *asyncSinkSuite) TestInitialize(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
ctx, sink, mockSink := newAsyncSink4Test(ctx, c)
defer sink.Close(ctx)
tableInfos := []*model.SimpleTableInfo{{Schema: "test"}}
err := sink.Initialize(ctx, tableInfos)
c.Assert(err, check.IsNil)
c.Assert(tableInfos, check.DeepEquals, mockSink.initTableInfo)
}

func (s *asyncSinkSuite) TestCheckpoint(c *check.C) {
defer testleak.AfterTest(c)()
ctx := cdcContext.NewBackendContext4Test(false)
Expand Down
5 changes: 1 addition & 4 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,7 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
err = c.sink.Initialize(cancelCtx, c.schema.SinkTableInfos())
if err != nil {
return errors.Trace(err)
}

// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,6 @@ func (c *mockFlowController) GetConsumption() uint64 {
return 0
}

func (s *mockSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (s *mockSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
for _, row := range rows {
s.received = append(s.received, struct {
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ func (b *blackHoleSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) e
return nil
}

// Initialize is no-op for blackhole
func (b *blackHoleSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (b *blackHoleSink) Close(ctx context.Context) error {
return nil
}
Expand Down
8 changes: 0 additions & 8 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,6 @@ func newCheckSink(c *check.C) *checkSink {
}
}

func (c *checkSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
panic("unreachable")
}

func (c *checkSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
Expand Down Expand Up @@ -344,10 +340,6 @@ type errorSink struct {
*check.C
}

func (e *errorSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
panic("unreachable")
}

func (e *errorSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
return errors.New("error in emit row changed events")
}
Expand Down
6 changes: 0 additions & 6 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,12 +238,6 @@ func (k *mqSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error {
return errors.Trace(err)
}

// Initialize registers Avro schemas for all tables
func (k *mqSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
// No longer need it for now
return nil
}

func (k *mqSink) Close(ctx context.Context) error {
err := k.mqProducer.Close()
return errors.Trace(err)
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,11 +203,6 @@ func (s *mysqlSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
return errors.Trace(err)
}

// Initialize is no-op for Mysql sink
func (s *mysqlSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (s *mysqlSink) execDDLWithMaxRetries(ctx context.Context, ddl *model.DDLEvent) error {
return retry.Do(ctx, func() error {
err := s.execDDL(ctx, ddl)
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/simple_mysql_tester.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,6 @@ func newSimpleMySQLSink(ctx context.Context, sinkURI *url.URL, config *config.Re
return sink, nil
}

func (s *simpleMySQLSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
// do nothing
return nil
}

// EmitRowChangedEvents sends Row Changed Event to Sink
// EmitRowChangedEvents may write rows to downstream directly;
func (s *simpleMySQLSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
Expand Down
2 changes: 0 additions & 2 deletions cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ const (

// Sink is an abstraction for anything that a changefeed may emit into.
type Sink interface {
Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error

// EmitRowChangedEvents sends Row Changed Event to Sink
// EmitRowChangedEvents may write rows to downstream directly;
EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error
Expand Down
5 changes: 0 additions & 5 deletions cdc/sink/table_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ type tableSink struct {
redoManager redo.LogManager
}

func (t *tableSink) Initialize(ctx context.Context, tableInfo []*model.SimpleTableInfo) error {
// do nothing
return nil
}

func (t *tableSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) error {
t.buffer = append(t.buffer, rows...)
t.manager.metricsTableSinkTotalRows.Add(float64(len(rows)))
Expand Down