From 5775995ea17cbd3969fc82697e1d5526f75e3655 Mon Sep 17 00:00:00 2001
From: xiongjiwei <xiongjiwei1996@outlook.com>
Date: Fri, 25 Nov 2022 17:09:58 +0800
Subject: [PATCH] variable: add `txn_source` into kv.context (#39159)

ref pingcap/tidb#38587
---
 DEPS.bzl                                     |  8 ++++----
 br/pkg/streamhelper/basic_lib_for_test.go    |  4 ++++
 ddl/ddl.go                                   |  1 -
 domain/globalconfigsync/globalconfig_test.go | 10 +++++++---
 executor/set_test.go                         |  9 +++++++++
 go.mod                                       |  4 ++--
 go.sum                                       |  8 ++++----
 kv/option.go                                 |  2 ++
 session/session.go                           |  1 +
 sessionctx/variable/session.go               |  4 ++++
 sessionctx/variable/sysvar.go                |  5 +++++
 sessionctx/variable/tidb_vars.go             |  6 ++++++
 sessionctx/variable/variable.go              |  1 +
 store/driver/txn/txn_driver.go               |  2 ++
 14 files changed, 51 insertions(+), 14 deletions(-)

diff --git a/DEPS.bzl b/DEPS.bzl
index c9d495cb6e17e..6d80e6322c01c 100644
--- a/DEPS.bzl
+++ b/DEPS.bzl
@@ -2915,8 +2915,8 @@ def go_deps():
         name = "com_github_pingcap_kvproto",
         build_file_proto_mode = "disable_global",
         importpath = "github.com/pingcap/kvproto",
-        sum = "h1:HyWSOT/drBEtfXK2HLkWWR8dCO+rcf7OiRDRhBxAfU4=",
-        version = "v0.0.0-20221114102356-3debb6820e46",
+        sum = "h1:Ywk7n+4zm6W6T9XSyAwihBWdxXR2ALQzswQMEOglHkM=",
+        version = "v0.0.0-20221117075110-51120697d051",
     )
     go_repository(
         name = "com_github_pingcap_log",
@@ -3519,8 +3519,8 @@ def go_deps():
         name = "com_github_tikv_client_go_v2",
         build_file_proto_mode = "disable_global",
         importpath = "github.com/tikv/client-go/v2",
-        sum = "h1:nFVdyTXcQYZwQQCdSJcFI1vBFyzG1hVuZ39MAK6wqK4=",
-        version = "v2.0.3-0.20221108030801-9c0835c80eba",
+        sum = "h1:5df3qAcxvdGAffe0aBVFYhwQwAvl3VrF/xSX+J8ueyI=",
+        version = "v2.0.3-0.20221121025013-e9db9e6a8a94",
     )
     go_repository(
         name = "com_github_tikv_pd_client",
diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go
index 9e438c32f0f1f..b41d5baf19528 100644
--- a/br/pkg/streamhelper/basic_lib_for_test.go
+++ b/br/pkg/streamhelper/basic_lib_for_test.go
@@ -163,6 +163,10 @@ func (f *fakeStore) GetLastFlushTSOfRegion(ctx context.Context, in *logbackup.Ge
 	return resp, nil
 }
 
+func (f *fakeStore) SubscribeFlushEvent(ctx context.Context, in *logbackup.SubscribeFlushEventRequest, opts ...grpc.CallOption) (logbackup.LogBackup_SubscribeFlushEventClient, error) {
+	return nil, nil
+}
+
 // RegionScan gets a list of regions, starts from the region that contains key.
 // Limit limits the maximum number of regions returned.
 func (f *fakeCluster) RegionScan(ctx context.Context, key []byte, endKey []byte, limit int) ([]streamhelper.RegionWithLeader, error) {
diff --git a/ddl/ddl.go b/ddl/ddl.go
index 52a5b0480c42a..af8a0ca67a8d5 100644
--- a/ddl/ddl.go
+++ b/ddl/ddl.go
@@ -979,7 +979,6 @@ func (d *ddl) DoDDLJob(ctx sessionctx.Context, job *model.Job) error {
 		// Instead, we merge all the jobs into one pending job.
 		return appendToSubJobs(mci, job)
 	}
-
 	// Get a global job ID and put the DDL job in the queue.
 	setDDLJobQuery(ctx, job)
 	task := &limitJobTask{job, make(chan error)}
diff --git a/domain/globalconfigsync/globalconfig_test.go b/domain/globalconfigsync/globalconfig_test.go
index c7dab0064dedb..d6013f2887693 100644
--- a/domain/globalconfigsync/globalconfig_test.go
+++ b/domain/globalconfigsync/globalconfig_test.go
@@ -88,19 +88,23 @@ func TestStoreGlobalConfig(t *testing.T) {
 
 	_, err = se.Execute(context.Background(), "set @@global.tidb_enable_top_sql=1;")
 	require.NoError(t, err)
+	_, err = se.Execute(context.Background(), "set @@global.tidb_source_id=2;")
+	require.NoError(t, err)
 	for i := 0; i < 20; i++ {
 		time.Sleep(100 * time.Millisecond)
 		client :=
 			store.(kv.StorageWithPD).GetPDClient()
 		// enable top sql will be translated to enable_resource_metering
-		items, err := client.LoadGlobalConfig(context.Background(), []string{"enable_resource_metering"})
+		items, err := client.LoadGlobalConfig(context.Background(), []string{"enable_resource_metering", "source_id"})
 		require.NoError(t, err)
-		if len(items) == 1 && items[0].Value == "" {
+		if len(items) == 2 && items[0].Value == "" {
 			continue
 		}
-		require.Len(t, items, 1)
+		require.Len(t, items, 2)
 		require.Equal(t, items[0].Name, "/global/config/enable_resource_metering")
 		require.Equal(t, items[0].Value, "true")
+		require.Equal(t, items[1].Name, "/global/config/source_id")
+		require.Equal(t, items[1].Value, "2")
 		return
 	}
 	require.Fail(t, "timeout for waiting global config synced")
diff --git a/executor/set_test.go b/executor/set_test.go
index a4a54a37a3595..734fdab8750fe 100644
--- a/executor/set_test.go
+++ b/executor/set_test.go
@@ -861,6 +861,15 @@ func TestSetVar(t *testing.T) {
 	tk.MustQuery("SELECT @@GLOBAL.validate_password.length").Check(testkit.Rows("4"))
 	tk.MustExec("SET GLOBAL validate_password.mixed_case_count = 2")
 	tk.MustQuery("SELECT @@GLOBAL.validate_password.length").Check(testkit.Rows("6"))
+
+	// test tidb_cdc_write_source
+	require.Equal(t, uint64(0), tk.Session().GetSessionVars().CDCWriteSource)
+	tk.MustQuery("select @@tidb_cdc_write_source").Check(testkit.Rows("0"))
+	tk.MustExec("set @@session.tidb_cdc_write_source = 2")
+	tk.MustQuery("select @@tidb_cdc_write_source").Check(testkit.Rows("2"))
+	require.Equal(t, uint64(2), tk.Session().GetSessionVars().CDCWriteSource)
+	tk.MustExec("set @@session.tidb_cdc_write_source = 0")
+	require.Equal(t, uint64(0), tk.Session().GetSessionVars().CDCWriteSource)
 }
 
 func TestGetSetNoopVars(t *testing.T) {
diff --git a/go.mod b/go.mod
index 2cd460e8279c3..56831b44d54ba 100644
--- a/go.mod
+++ b/go.mod
@@ -68,7 +68,7 @@ require (
 	github.com/pingcap/errors v0.11.5-0.20220729040631-518f63d66278
 	github.com/pingcap/failpoint v0.0.0-20220423142525-ae43b7f4e5c3
 	github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
-	github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46
+	github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051
 	github.com/pingcap/log v1.1.1-0.20221116035753-734d527bc87c
 	github.com/pingcap/sysutil v0.0.0-20220114020952-ea68d2dbf5b4
 	github.com/pingcap/tidb/parser v0.0.0-20211011031125-9b13dc409c5e
@@ -86,7 +86,7 @@ require (
 	github.com/stretchr/testify v1.8.0
 	github.com/tdakkota/asciicheck v0.1.1
 	github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
-	github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba
+	github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94
 	github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07
 	github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144
 	github.com/twmb/murmur3 v1.1.3
diff --git a/go.sum b/go.sum
index bdc73a0d74ddc..c1da1efba939c 100644
--- a/go.sum
+++ b/go.sum
@@ -778,8 +778,8 @@ github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 h1:surzm05a8C9dN
 github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
 github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
 github.com/pingcap/kvproto v0.0.0-20221026112947-f8d61344b172/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
-github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46 h1:HyWSOT/drBEtfXK2HLkWWR8dCO+rcf7OiRDRhBxAfU4=
-github.com/pingcap/kvproto v0.0.0-20221114102356-3debb6820e46/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
+github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051 h1:Ywk7n+4zm6W6T9XSyAwihBWdxXR2ALQzswQMEOglHkM=
+github.com/pingcap/kvproto v0.0.0-20221117075110-51120697d051/go.mod h1:OYtxs0786qojVTmkVeufx93xe+jUgm56GUYRIKnmaGI=
 github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
 github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
 github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
@@ -928,8 +928,8 @@ github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3 h1:f+jULpR
 github.com/tenntenn/text/transform v0.0.0-20200319021203-7eef512accb3/go.mod h1:ON8b8w4BN/kE1EOhwT0o+d62W65a6aPw1nouo9LMgyY=
 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU=
-github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba h1:nFVdyTXcQYZwQQCdSJcFI1vBFyzG1hVuZ39MAK6wqK4=
-github.com/tikv/client-go/v2 v2.0.3-0.20221108030801-9c0835c80eba/go.mod h1:X9s4ct/MLk1sFqe5mU79KClKegLFDTa/FCx3hzexGtk=
+github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94 h1:5df3qAcxvdGAffe0aBVFYhwQwAvl3VrF/xSX+J8ueyI=
+github.com/tikv/client-go/v2 v2.0.3-0.20221121025013-e9db9e6a8a94/go.mod h1:mQQhAIZ2uJwWXOG2UEz9s9oLGRcNKGGGtDOk4b13Bos=
 github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc=
 github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A=
 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro=
diff --git a/kv/option.go b/kv/option.go
index 888a1e24f0fa0..ee5354141cd7b 100644
--- a/kv/option.go
+++ b/kv/option.go
@@ -93,6 +93,8 @@ const (
 	ReplicaReadAdjuster
 	// ScanBatchSize set the iter scan batch size.
 	ScanBatchSize
+	// TxnSource set the source of this transaction.
+	TxnSource
 )
 
 // ReplicaReadType is the type of replica to read data from
diff --git a/session/session.go b/session/session.go
index 51a3e22d39aac..cd2e5eced09b4 100644
--- a/session/session.go
+++ b/session/session.go
@@ -702,6 +702,7 @@ func (s *session) doCommit(ctx context.Context) error {
 	if tables := sessVars.TxnCtx.TemporaryTables; len(tables) > 0 {
 		s.txn.SetOption(kv.KVFilter, temporaryTableKVFilter(tables))
 	}
+	s.txn.SetOption(kv.TxnSource, sessVars.CDCWriteSource)
 	if tables := sessVars.TxnCtx.CachedTables; len(tables) > 0 {
 		c := cachedTableRenewLease{tables: tables}
 		now := time.Now()
diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go
index b8fbcf54848e1..359baffdd9679 100644
--- a/sessionctx/variable/session.go
+++ b/sessionctx/variable/session.go
@@ -1041,6 +1041,10 @@ type SessionVars struct {
 
 	// MetricSchemaStep indicates the step when query metric schema.
 	MetricSchemaStep int64
+
+	// CDCWriteSource indicates the following data is written by TiCDC if it is not 0.
+	CDCWriteSource uint64
+
 	// MetricSchemaRangeDuration indicates the step when query metric schema.
 	MetricSchemaRangeDuration int64
 
diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go
index 060c542bddd77..0ba657405dd5e 100644
--- a/sessionctx/variable/sysvar.go
+++ b/sessionctx/variable/sysvar.go
@@ -300,6 +300,10 @@ var defaultSysVars = []*SysVar{
 		s.MetricSchemaStep = TidbOptInt64(val, DefTiDBMetricSchemaStep)
 		return nil
 	}},
+	{Scope: ScopeSession, Name: TiDBCDCWriteSource, Value: "0", Type: TypeInt, MinValue: 0, MaxValue: 15, SetSession: func(s *SessionVars, val string) error {
+		s.CDCWriteSource = uint64(TidbOptInt(val, 0))
+		return nil
+	}},
 	{Scope: ScopeSession, Name: TiDBMetricSchemaRangeDuration, Value: strconv.Itoa(DefTiDBMetricSchemaRangeDuration), skipInit: true, Type: TypeUnsigned, MinValue: 10, MaxValue: 60 * 60 * 60, SetSession: func(s *SessionVars, val string) error {
 		s.MetricSchemaRangeDuration = TidbOptInt64(val, DefTiDBMetricSchemaRangeDuration)
 		return nil
@@ -776,6 +780,7 @@ var defaultSysVars = []*SysVar{
 	// TopSQL enable only be controlled by TopSQL pub/sub sinker.
 	// This global variable only uses to update the global config which store in PD(ETCD).
 	{Scope: ScopeGlobal, Name: TiDBEnableTopSQL, Value: BoolToOnOff(topsqlstate.DefTiDBTopSQLEnable), Type: TypeBool, AllowEmpty: true, GlobalConfigName: GlobalConfigEnableTopSQL},
+	{Scope: ScopeGlobal, Name: TiDBSourceID, Value: "1", Type: TypeInt, MinValue: 1, MaxValue: 15, GlobalConfigName: GlobalConfigSourceID},
 	{Scope: ScopeGlobal, Name: TiDBTopSQLMaxTimeSeriesCount, Value: strconv.Itoa(topsqlstate.DefTiDBTopSQLMaxTimeSeriesCount), Type: TypeInt, MinValue: 1, MaxValue: 5000, GetGlobal: func(_ context.Context, s *SessionVars) (string, error) {
 		return strconv.FormatInt(topsqlstate.GlobalState.MaxStatementCount.Load(), 10), nil
 	}, SetGlobal: func(_ context.Context, vars *SessionVars, s string) error {
diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go
index a9e278107d270..55ec073d85e51 100644
--- a/sessionctx/variable/tidb_vars.go
+++ b/sessionctx/variable/tidb_vars.go
@@ -555,6 +555,9 @@ const (
 	// TiDBMetricSchemaStep indicates the step when query metric schema.
 	TiDBMetricSchemaStep = "tidb_metric_query_step"
 
+	// TiDBCDCWriteSource indicates the following data is written by TiCDC if it is not 0.
+	TiDBCDCWriteSource = "tidb_cdc_write_source"
+
 	// TiDBMetricSchemaRangeDuration indicates the range duration when query metric schema.
 	TiDBMetricSchemaRangeDuration = "tidb_metric_query_range_duration"
 
@@ -627,6 +630,9 @@ const (
 	// TiDBEnableTopSQL indicates whether the top SQL is enabled.
 	TiDBEnableTopSQL = "tidb_enable_top_sql"
 
+	// TiDBSourceID indicates the source ID of the TiDB server.
+	TiDBSourceID = "tidb_source_id"
+
 	// TiDBTopSQLMaxTimeSeriesCount indicates the max number of statements been collected in each time series.
 	TiDBTopSQLMaxTimeSeriesCount = "tidb_top_sql_max_time_series_count"
 
diff --git a/sessionctx/variable/variable.go b/sessionctx/variable/variable.go
index 4b7faa09481c8..2792e373cdda1 100644
--- a/sessionctx/variable/variable.go
+++ b/sessionctx/variable/variable.go
@@ -85,6 +85,7 @@ const (
 // Global config name list.
 const (
 	GlobalConfigEnableTopSQL = "enable_resource_metering"
+	GlobalConfigSourceID     = "source_id"
 )
 
 func (s ScopeFlag) String() string {
diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go
index 851e68eac89ef..b18b6d0db1f33 100644
--- a/store/driver/txn/txn_driver.go
+++ b/store/driver/txn/txn_driver.go
@@ -258,6 +258,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
 		txn.KVTxn.SetRequestSourceType(val.(string))
 	case kv.ReplicaReadAdjuster:
 		txn.KVTxn.GetSnapshot().SetReplicaReadAdjuster(val.(txnkv.ReplicaReadAdjuster))
+	case kv.TxnSource:
+		txn.KVTxn.SetTxnSource(val.(uint64))
 	}
 }