From 30255c47830b979907ad748023862402bcd40a3f Mon Sep 17 00:00:00 2001 From: amyangfei Date: Wed, 27 Jan 2021 00:27:23 +0800 Subject: [PATCH] cherry pick #1312 to release-4.0 Signed-off-by: ti-srebot --- cdc/processor.go | 3 +++ metrics/grafana/ticdc.json | 1 + 2 files changed, 4 insertions(+) diff --git a/cdc/processor.go b/cdc/processor.go index 4da6e2320da..d710e1ea031 100644 --- a/cdc/processor.go +++ b/cdc/processor.go @@ -598,6 +598,7 @@ func (p *processor) handleTables(ctx context.Context, status *model.TaskStatus) opt.Done = true opt.Status = model.OperFinished status.Dirty = true + tableResolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, table.name) } } else { replicaInfo, exist := status.Tables[tableID] @@ -1105,6 +1106,7 @@ func (p *processor) stop(ctx context.Context) error { p.stateMu.Lock() for _, tbl := range p.tables { tbl.cancel() + tableResolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr, tbl.name) } p.ddlPullerCancel() // mark tables share the same context with its original table, don't need to cancel @@ -1114,6 +1116,7 @@ func (p *processor) stop(ctx context.Context) error { p.localResolvedNotifier.Close() failpoint.Inject("processorStopDelay", nil) atomic.StoreInt32(&p.stopped, 1) + syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Set(0) if err := p.etcdCli.DeleteTaskPosition(ctx, p.changefeedID, p.captureInfo.ID); err != nil { return err } diff --git a/metrics/grafana/ticdc.json b/metrics/grafana/ticdc.json index 0703d6092d4..f756d10c75a 100644 --- a/metrics/grafana/ticdc.json +++ b/metrics/grafana/ticdc.json @@ -1108,6 +1108,7 @@ { "expr": "max(ticdc_processor_table_resolved_ts{changefeed=~\"$changefeed\",capture=~\"$capture\"}) by (capture,table)", "format": "time_series", + "instant": true, "interval": "", "intervalFactor": 1, "legendFormat": "{{capture}}-{{table}}",