Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

metrics(cdc): fix mq sink write row count metrics. (#4192) #4323

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
89d2a91
fix the txn_batch_size metric inaccuracy bug when the sink target is MQ
zhaoxinyu Nov 24, 2021
41b5f65
address comments
zhaoxinyu Dec 1, 2021
381aa83
add comments for exported functions
zhaoxinyu Dec 1, 2021
cf6f73f
fix the compiling problem
zhaoxinyu Dec 9, 2021
339688e
workerpool: limit the rate to output deadlock warning (#3775) (#3795)
ti-chi-bot Dec 10, 2021
ef2636a
tests(ticdc): set up the sync diff output directory correctly (#3725)…
ti-chi-bot Dec 14, 2021
a4330bd
relay(dm): use binlog name comparison (#3710) (#3712)
ti-chi-bot Dec 14, 2021
95d038f
dm/load: fix concurrent call Loader.Status (#3459) (#3468)
ti-chi-bot Dec 17, 2021
4bb99d1
cdc/sorter: make unified sorter cgroup aware (#3436) (#3439)
ti-chi-bot Dec 17, 2021
70d2a98
tz (ticdc): fix timezone error (#3887) (#3906)
ti-chi-bot Dec 17, 2021
cc71f85
pkg,cdc: do not use log package (#3902) (#3940)
ti-chi-bot Dec 17, 2021
b58d176
*: rename repo from pingcap/ticdc to pingcap/tiflow (#3959)
amyangfei Dec 20, 2021
84a4d26
http_*: add log for http api and refine the err handle logic (#2997) …
ti-chi-bot Dec 20, 2021
7d57e2f
etcd_worker: batch etcd patch (#3277) (#3389)
ti-chi-bot Dec 20, 2021
e85d1e1
http_api (ticdc): check --cert-allowed-cn before add server common na…
ti-chi-bot Dec 20, 2021
998285e
kvclient(ticdc): fix kvclient takes too long time to recover (#3612) …
ti-chi-bot Dec 20, 2021
99eaa82
owner: fix owner tick block http request (#3490) (#3530)
ti-chi-bot Dec 20, 2021
9f32dd5
dm/syncer: use downstream PK/UK to generate DML (#3168) (#3256)
ti-chi-bot Dec 21, 2021
167d9c5
dep(dm): update go-mysql (#3914) (#3934)
ti-chi-bot Dec 21, 2021
d99b5a6
dm/syncer: multiple rows use downstream schema (#3308) (#3953)
ti-chi-bot Dec 21, 2021
d358c28
errorutil,sink,syncer: add errorutil to handle ignorable error (#3264…
lance6716 Dec 21, 2021
05802b3
dm/worker: don't exit when failed to read checkpoint in relay (#3345)…
ti-chi-bot Dec 22, 2021
5a9e78f
syncer(dm): use an early location to reset binlog and open safemode (…
lance6716 Dec 22, 2021
d9b5b4b
ticdc/owner: Fix ddl special comment syntax error (#3845) (#3978)
ti-chi-bot Dec 22, 2021
091efca
dm/scheduler: fix inconsistent of relay status (#3474) (#4009)
ti-chi-bot Dec 22, 2021
2290a12
owner,scheduler(cdc): fix nil pointer panic in owner scheduler (#2980…
ti-chi-bot Dec 24, 2021
5e7e3af
config(ticdc): Fix old value configuration check for maxwell protocol…
ti-chi-bot Dec 24, 2021
a189f0b
sink(ticdc): cherry pick sink bug fix to release 5.3 (#4083)
sdojjy Dec 28, 2021
1ba5290
master(dm): clean and treat invalid load task (#4004) (#4145)
ti-chi-bot Dec 30, 2021
8e1aaab
loader: fix wrong progress in query-status for loader (#4093) (#4143)
ti-chi-bot Dec 30, 2021
fcbf280
ticdc/processor: Fix backoff base delay misconfiguration (#3992) (#4028)
ti-chi-bot Dec 30, 2021
76de75d
dm: load table structure from dump files (#3295) (#4163)
ti-chi-bot Dec 30, 2021
263f852
compactor: fix duplicate entry in safemode (#3432) (#3434) (#4088)
ti-chi-bot Dec 30, 2021
f9c7619
kv(ticdc): reduce eventfeed rate limited log (#4072) (#4111)
ti-chi-bot Dec 30, 2021
ed96ff4
metrics(ticdc): add resolved ts and add changefeed to dataflow (#4038…
ti-chi-bot Dec 30, 2021
088edf5
This is an automated cherry-pick of #4192
3AceShowHand Jan 4, 2022
ec06b29
retry(dm): align with tidb latest error message (#4172) (#4254)
ti-chi-bot Jan 10, 2022
bd21afe
owner(ticdc): Add bootstrap and try to fix the meta information in it…
ti-chi-bot Jan 10, 2022
72ce433
redolog: add a precleanup process when s3 enable (#3525) (#3878)
ti-chi-bot Jan 10, 2022
de5de8b
ddl(dm): make skipped ddl pass `SplitDDL()` (#4176) (#4227)
ti-chi-bot Jan 10, 2022
b55018e
cdc/sink: remove Initialize method from the sink interface (#3682) (#…
ti-chi-bot Jan 11, 2022
08bd008
http_api (ticdc): fix http api 'get processor' panic. (#4117) (#4123)
ti-chi-bot Jan 12, 2022
b34f929
sink (ticdc): fix a deadlock due to checkpointTs fall back in sinkNod…
ti-chi-bot Jan 13, 2022
600f389
cdc/sink: adjust kafka initialization logic (#3192) (#4162)
ti-chi-bot Jan 13, 2022
16f5f42
try fix conflicts.
3AceShowHand Jan 13, 2022
62dd140
This is an automated cherry-pick of #4192
3AceShowHand Jan 4, 2022
b1b8182
Merge branch 'release-5.3' into cherry-pick-4192-to-release-5.3
3AceShowHand Jan 17, 2022
6f742e6
fix conflicts.
3AceShowHand Jan 18, 2022
59b593b
fix conflicts.
3AceShowHand Jan 18, 2022
7ee6dba
fix conflicts.
3AceShowHand Jan 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cdc/sink/codec/avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func (a *AvroEventBatchEncoder) AppendRowChangedEvent(e *model.RowChangedEvent)
}

mqMessage.Key = evlp
mqMessage.IncRowsCount()
a.resultBuf = append(a.resultBuf, mqMessage)

return EncoderNeedAsyncWrite, nil
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/codec/canal.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,8 @@ func (d *CanalEventBatchEncoder) EncodeDDLEvent(e *model.DDLEvent) (*MQMessage,

// Build implements the EventBatchEncoder interface
func (d *CanalEventBatchEncoder) Build() []*MQMessage {
if len(d.messages.Messages) == 0 {
rowCount := len(d.messages.Messages)
if rowCount == 0 {
return nil
}

Expand All @@ -392,6 +393,7 @@ func (d *CanalEventBatchEncoder) Build() []*MQMessage {
log.Panic("Error when serializing Canal packet", zap.Error(err))
}
ret := NewMQMessage(ProtocolCanal, nil, value, 0, model.MqMessageTypeRow, nil, nil)
ret.SetRowsCount(rowCount)
d.messages.Reset()
d.resetPacket()
return []*MQMessage{ret}
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/codec/canal_flat.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,9 @@ func (c *CanalFlatEventBatchEncoder) Build() []*MQMessage {
log.Panic("CanalFlatEventBatchEncoder", zap.Error(err))
return nil
}
ret[i] = NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table)
m := NewMQMessage(ProtocolCanalJSON, nil, value, msg.tikvTs, model.MqMessageTypeRow, &msg.Schema, &msg.Table)
m.IncRowsCount()
ret[i] = m
}
c.resolvedBuf = c.resolvedBuf[0:0]
return ret
Expand Down
2 changes: 2 additions & 0 deletions cdc/sink/codec/canal_flat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (s *canalFlatSuite) TestBatching(c *check.C) {
c.Assert(msgs, check.HasLen, int(resolvedTs-lastResolved))

for j := range msgs {
c.Assert(msgs[j].GetRowsCount(), check.Equals, 1)

var msg canalFlatMessage
err := json.Unmarshal(msgs[j].Value, &msg)
c.Assert(err, check.IsNil)
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/canal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (s *canalBatchSuite) TestCanalEventBatchEncoder(c *check.C) {
c.Assert(res, check.HasLen, 1)
c.Assert(res[0].Key, check.IsNil)
c.Assert(len(res[0].Value), check.Equals, size)
c.Assert(res[0].GetRowsCount(), check.Equals, len(cs))

packet := &canal.Packet{}
err := proto.Unmarshal(res[0].Value, packet)
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco
if len(cs) > 0 {
res := encoder.Build()
c.Assert(res, check.HasLen, 1)
c.Assert(res[0].GetRowsCount(), check.Equals, len(cs))
decoder, err := newDecoder(res[0].Key, res[0].Value)
c.Assert(err, check.IsNil)
checkRowDecoder(decoder, cs)
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/maxwell.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,7 @@ func (d *MaxwellEventBatchEncoder) Build() []*MQMessage {
}

ret := NewMQMessage(ProtocolMaxwell, d.keyBuf.Bytes(), d.valueBuf.Bytes(), 0, model.MqMessageTypeRow, nil, nil)
ret.SetRowsCount(d.batchSize)
d.Reset()
return []*MQMessage{ret}
}
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/codec/maxwell_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func (s *maxwellbatchSuite) testmaxwellBatchCodec(c *check.C, newEncoder func()
continue
}
c.Assert(messages, check.HasLen, 1)
c.Assert(messages[0].GetRowsCount(), check.Equals, len(cs))
c.Assert(len(messages[0].Key)+len(messages[0].Value), check.Equals, size)
}

Expand Down