diff --git a/ddl/split_region.go b/ddl/split_region.go
index 031f24ffb975b..861594ba76608 100644
--- a/ddl/split_region.go
+++ b/ddl/split_region.go
@@ -121,7 +121,7 @@ func splitIndexRegion(store kv.SplitableStore, tblInfo *model.TableInfo, scatter
 
 func waitScatterRegionFinish(store kv.SplitableStore, regionIDs ...uint64) {
 	for _, regionID := range regionIDs {
-		err := store.WaitScatterRegionFinish(regionID)
+		err := store.WaitScatterRegionFinish(regionID, 0)
 		if err != nil {
 			logutil.Logger(context.Background()).Warn("[ddl] wait scatter region failed", zap.Uint64("regionID", regionID), zap.Error(err))
 		}
diff --git a/executor/builder.go b/executor/builder.go
index a31d958aafabc..88c02a2dbae11 100644
--- a/executor/builder.go
+++ b/executor/builder.go
@@ -1265,8 +1265,9 @@ func (b *executorBuilder) buildUnionAll(v *plannercore.PhysicalUnionAll) Executo
 }
 
 func (b *executorBuilder) buildSplitRegion(v *plannercore.SplitRegion) Executor {
-	base := newBaseExecutor(b.ctx, nil, v.ExplainID())
-	base.initCap = chunk.ZeroCapacity
+	base := newBaseExecutor(b.ctx, v.Schema(), v.ExplainID())
+	base.initCap = 1
+	base.maxChunkSize = 1
 	if v.IndexInfo != nil {
 		return &SplitIndexRegionExec{
 			baseExecutor: base,
diff --git a/executor/executor_test.go b/executor/executor_test.go
index 36636c431832b..ef0dc69f208e5 100644
--- a/executor/executor_test.go
+++ b/executor/executor_test.go
@@ -1976,16 +1976,9 @@ func (s *testSuite) TestSplitRegionTimeout(c *C) {
 	tk.MustExec("create table t(a varchar(100),b int, index idx1(b,a))")
 	tk.MustExec(`split table t index idx1 by (10000,"abcd"),(10000000);`)
 	tk.MustExec(`set @@tidb_wait_split_region_timeout=1`)
-	_, err := tk.Exec(`split table t between (0) and (10000) regions 10`)
-	c.Assert(err, NotNil)
-	c.Assert(err.Error(), Equals, "split region timeout(1s)")
+	// result 0 0 means split 0 region and 0 region finish scatter regions before timeout.
+	tk.MustQuery(`split table t between (0) and (10000) regions 10`).Check(testkit.Rows("0 0"))
 	c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockSplitRegionTimeout"), IsNil)
-
-	c.Assert(failpoint.Enable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout", `return(true)`), IsNil)
-	_, err = tk.Exec(`split table t between (0) and (10000) regions 10`)
-	c.Assert(err, NotNil)
-	c.Assert(err.Error(), Equals, "wait split region scatter timeout(1s)")
-	c.Assert(failpoint.Disable("github.com/pingcap/tidb/executor/mockScatterRegionTimeout"), IsNil)
 }
 
 func (s *testSuite) TestRow(c *C) {
@@ -3952,7 +3945,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {
 
 	// Test show table regions.
 	tk.MustExec(`split table t_regions1 by (0)`)
-	tk.MustExec(`split table t_regions between (-10000) and (10000) regions 4;`)
+	tk.MustQuery(`split table t_regions between (-10000) and (10000) regions 4;`).Check(testkit.Rows("3 1"))
 	re := tk.MustQuery("show table t_regions regions")
 	rows := re.Rows()
 	// Table t_regions should have 4 regions now.
@@ -3967,7 +3960,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {
 	c.Assert(rows[3][1], Equals, fmt.Sprintf("t_%d_r_5000", tbl.Meta().ID))
 
 	// Test show table index regions.
-	tk.MustExec(`split table t_regions index idx between (-1000) and (1000) regions 4;`)
+	tk.MustQuery(`split table t_regions index idx between (-1000) and (1000) regions 4;`).Check(testkit.Rows("4 1"))
 	re = tk.MustQuery("show table t_regions index idx regions")
 	rows = re.Rows()
 	// The index `idx` of table t_regions should have 4 regions now.
@@ -3997,7 +3990,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {
 
 	// Test show table regions.
 	tk.MustExec(`set @@session.tidb_wait_split_region_finish=1;`)
-	tk.MustExec(`split table t_regions between (0) and (10000) regions 4;`)
+	tk.MustQuery(`split table t_regions by (2500),(5000),(7500);`).Check(testkit.Rows("3 1"))
 	re = tk.MustQuery("show table t_regions regions")
 	rows = re.Rows()
 	// Table t_regions should have 4 regions now.
@@ -4010,7 +4003,7 @@ func (s *testSuite) TestShowTableRegion(c *C) {
 	c.Assert(rows[3][1], Equals, fmt.Sprintf("t_%d_r_7500", tbl.Meta().ID))
 
 	// Test show table index regions.
-	tk.MustExec(`split table t_regions index idx between (0) and (1000) regions 4;`)
+	tk.MustQuery(`split table t_regions index idx by (250),(500),(750);`).Check(testkit.Rows("4 1"))
 	re = tk.MustQuery("show table t_regions index idx regions")
 	rows = re.Rows()
 	// The index `idx` of table t_regions should have 4 regions now.
diff --git a/executor/split.go b/executor/split.go
old mode 100644
new mode 100755
index afb2459d8a29a..68e8cd0f5a864
--- a/executor/split.go
+++ b/executor/split.go
@@ -28,6 +28,7 @@ import (
 	"github.com/pingcap/parser/model"
 	"github.com/pingcap/parser/mysql"
 	"github.com/pingcap/tidb/kv"
+	"github.com/pingcap/tidb/sessionctx"
 	"github.com/pingcap/tidb/store/tikv"
 	"github.com/pingcap/tidb/table/tables"
 	"github.com/pingcap/tidb/tablecodec"
@@ -48,10 +49,37 @@ type SplitIndexRegionExec struct {
 	upper      []types.Datum
 	num        int
 	valueLists [][]types.Datum
+
+	done bool
+	splitRegionResult
+}
+
+type splitRegionResult struct {
+	splitRegions     int
+	finishScatterNum int
+}
+
+// Open implements the Executor Open interface.
+func (e *SplitIndexRegionExec) Open(ctx context.Context) error {
+	return e.splitIndexRegion(ctx)
 }
 
 // Next implements the Executor Next interface.
-func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
+func (e *SplitIndexRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
+	chk.Reset()
+	if e.done {
+		return nil
+	}
+	appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
+	e.done = true
+	return nil
+}
+
+// checkScatterRegionFinishBackOff is the back off time that used to check if a region has finished scattering before split region timeout.
+const checkScatterRegionFinishBackOff = 50
+
+// splitIndexRegion is used to split index regions.
+func (e *SplitIndexRegionExec) splitIndexRegion(ctx context.Context) error {
 	store := e.ctx.GetStore()
 	s, ok := store.(kv.SplitableStore)
 	if !ok {
@@ -62,10 +90,15 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
 		return err
 	}
 
+	start := time.Now()
 	ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
 	defer cancel()
 	regionIDs := make([]uint64, 0, len(splitIdxKeys))
 	for _, idxKey := range splitIdxKeys {
+		if isCtxDone(ctxWithTimeout) {
+			break
+		}
+
 		regionID, err := s.SplitRegion(idxKey, true)
 		if err != nil {
 			logutil.Logger(context.Background()).Warn("split table index region failed",
@@ -74,28 +107,17 @@ func (e *SplitIndexRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
 				zap.Error(err))
 			continue
 		}
+		if regionID == 0 {
+			continue
+		}
 		regionIDs = append(regionIDs, regionID)
 
-		if isCtxDone(ctxWithTimeout) {
-			return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
-		}
 	}
+	e.splitRegions = len(regionIDs)
 	if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
 		return nil
 	}
-	for _, regionID := range regionIDs {
-		err := s.WaitScatterRegionFinish(regionID)
-		if err != nil {
-			logutil.Logger(context.Background()).Warn("wait scatter region failed",
-				zap.Uint64("regionID", regionID),
-				zap.String("table", e.tableInfo.Name.L),
-				zap.String("index", e.indexInfo.Name.L),
-				zap.Error(err))
-		}
-		if isCtxDone(ctxWithTimeout) {
-			return errors.Errorf("wait split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
-		}
-	}
+	e.finishScatterNum = waitScatterRegionFinish(ctxWithTimeout, e.ctx, start, s, regionIDs, e.tableInfo.Name.L, e.indexInfo.Name.L)
 	return nil
 }
 
@@ -225,16 +247,35 @@ type SplitTableRegionExec struct {
 	upper      types.Datum
 	num        int
 	valueLists [][]types.Datum
+
+	done bool
+	splitRegionResult
+}
+
+// Open implements the Executor Open interface.
+func (e *SplitTableRegionExec) Open(ctx context.Context) error {
+	return e.splitTableRegion(ctx)
 }
 
 // Next implements the Executor Next interface.
-func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
+func (e *SplitTableRegionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
+	chk.Reset()
+	if e.done {
+		return nil
+	}
+	appendSplitRegionResultToChunk(chk, e.splitRegions, e.finishScatterNum)
+	e.done = true
+	return nil
+}
+
+func (e *SplitTableRegionExec) splitTableRegion(ctx context.Context) error {
 	store := e.ctx.GetStore()
 	s, ok := store.(kv.SplitableStore)
 	if !ok {
 		return nil
 	}
 
+	start := time.Now()
 	ctxWithTimeout, cancel := context.WithTimeout(ctx, e.ctx.GetSessionVars().GetSplitRegionTimeout())
 	defer cancel()
 
@@ -244,6 +285,14 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
 	}
 	regionIDs := make([]uint64, 0, len(splitKeys))
 	for _, key := range splitKeys {
+		failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
+			if val.(bool) {
+				time.Sleep(time.Second*1 + time.Millisecond*10)
+			}
+		})
+		if isCtxDone(ctxWithTimeout) {
+			break
+		}
 		regionID, err := s.SplitRegion(key, true)
 		if err != nil {
 			logutil.Logger(context.Background()).Warn("split table region failed",
@@ -251,41 +300,63 @@ func (e *SplitTableRegionExec) Next(ctx context.Context, _ *chunk.Chunk) error {
 				zap.Error(err))
 			continue
 		}
+		if regionID == 0 {
+			continue
+		}
 		regionIDs = append(regionIDs, regionID)
 
-		failpoint.Inject("mockSplitRegionTimeout", func(val failpoint.Value) {
-			if val.(bool) {
-				time.Sleep(time.Second * 1)
-			}
-		})
-
-		if isCtxDone(ctxWithTimeout) {
-			return errors.Errorf("split region timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
-		}
 	}
+	e.splitRegions = len(regionIDs)
 	if !e.ctx.GetSessionVars().WaitSplitRegionFinish {
 		return nil
 	}
+
+	e.finishScatterNum = waitScatterRegionFinish(ctxWithTimeout, e.ctx, start, s, regionIDs, e.tableInfo.Name.L, "")
+	return nil
+}
+
+func waitScatterRegionFinish(ctxWithTimeout context.Context, sctx sessionctx.Context, startTime time.Time, store kv.SplitableStore, regionIDs []uint64, tableName, indexName string) int {
+	remainMillisecond := 0
+	finishScatterNum := 0
 	for _, regionID := range regionIDs {
-		err := s.WaitScatterRegionFinish(regionID)
-		if err != nil {
-			logutil.Logger(context.Background()).Warn("wait scatter region failed",
-				zap.Uint64("regionID", regionID),
-				zap.String("table", e.tableInfo.Name.L),
-				zap.Error(err))
+		if isCtxDone(ctxWithTimeout) {
+			// Do not break here for checking remain regions scatter finished with a very short backoff time.
+			// Consider this situation -  Regions 1, 2, and 3 are to be split.
+			// Region 1 times out before scattering finishes, while Region 2 and Region 3 have finished scattering.
+			// In this case, we should return 2 Regions, instead of 0, have finished scattering.
+			remainMillisecond = checkScatterRegionFinishBackOff
+		} else {
+			remainMillisecond = int((sctx.GetSessionVars().GetSplitRegionTimeout().Seconds() - time.Since(startTime).Seconds()) * 1000)
 		}
 
-		failpoint.Inject("mockScatterRegionTimeout", func(val failpoint.Value) {
-			if val.(bool) {
-				time.Sleep(time.Second * 1)
+		err := store.WaitScatterRegionFinish(regionID, remainMillisecond)
+		if err == nil {
+			finishScatterNum++
+		} else {
+			if len(indexName) == 0 {
+				logutil.Logger(context.Background()).Warn("wait scatter region failed",
+					zap.Uint64("regionID", regionID),
+					zap.String("table", tableName),
+					zap.Error(err))
+			} else {
+				logutil.Logger(context.Background()).Warn("wait scatter region failed",
+					zap.Uint64("regionID", regionID),
+					zap.String("table", tableName),
+					zap.String("index", indexName),
+					zap.Error(err))
 			}
-		})
-
-		if isCtxDone(ctxWithTimeout) {
-			return errors.Errorf("wait split region scatter timeout(%v)", e.ctx.GetSessionVars().GetSplitRegionTimeout())
 		}
 	}
-	return nil
+	return finishScatterNum
+}
+
+func appendSplitRegionResultToChunk(chk *chunk.Chunk, totalRegions, finishScatterNum int) {
+	chk.AppendInt64(0, int64(totalRegions))
+	if finishScatterNum > 0 && totalRegions > 0 {
+		chk.AppendFloat64(1, float64(finishScatterNum)/float64(totalRegions))
+	} else {
+		chk.AppendFloat64(1, float64(0))
+	}
 }
 
 func isCtxDone(ctx context.Context) bool {
diff --git a/kv/kv.go b/kv/kv.go
index a646b19b50a02..87d993717fb27 100644
--- a/kv/kv.go
+++ b/kv/kv.go
@@ -299,6 +299,6 @@ type Iterator interface {
 // SplitableStore is the kv store which supports split regions.
 type SplitableStore interface {
 	SplitRegion(splitKey Key, scatter bool) (regionID uint64, err error)
-	WaitScatterRegionFinish(regionID uint64) error
+	WaitScatterRegionFinish(regionID uint64, backOff int) error
 	CheckRegionInScattering(regionID uint64) (bool, error)
 }
diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go
index 545f1fc933a64..4c7287d2cc503 100644
--- a/planner/core/planbuilder.go
+++ b/planner/core/planbuilder.go
@@ -999,6 +999,13 @@ func buildTableRegionsSchema() *expression.Schema {
 	return schema
 }
 
+func buildSplitRegionsSchema() *expression.Schema {
+	schema := expression.NewSchema(make([]*expression.Column, 0, 2)...)
+	schema.Append(buildColumn("", "TOTAL_SPLIT_REGION", mysql.TypeLonglong, 4))
+	schema.Append(buildColumn("", "SCATTER_FINISH_RATIO", mysql.TypeDouble, 8))
+	return schema
+}
+
 func buildShowDDLJobQueriesFields() *expression.Schema {
 	schema := expression.NewSchema(make([]*expression.Column, 0, 1)...)
 	schema.Append(buildColumn("", "QUERY", mysql.TypeVarchar, 256))
@@ -1687,6 +1694,7 @@ func (b *PlanBuilder) buildSplitIndexRegion(node *ast.SplitRegionStmt) (Plan, er
 		TableInfo: tblInfo,
 		IndexInfo: indexInfo,
 	}
+	p.SetSchema(buildSplitRegionsSchema())
 	// Split index regions by user specified value lists.
 	if len(node.SplitOpt.ValueLists) > 0 {
 		indexValues := make([][]types.Datum, 0, len(node.SplitOpt.ValueLists))
@@ -1801,6 +1809,7 @@ func (b *PlanBuilder) buildSplitTableRegion(node *ast.SplitRegionStmt) (Plan, er
 	p := &SplitRegion{
 		TableInfo: tblInfo,
 	}
+	p.SetSchema(buildSplitRegionsSchema())
 	if len(node.SplitOpt.ValueLists) > 0 {
 		values := make([][]types.Datum, 0, len(node.SplitOpt.ValueLists))
 		for i, valuesItem := range node.SplitOpt.ValueLists {
diff --git a/store/mockstore/mocktikv/cluster.go b/store/mockstore/mocktikv/cluster.go
index 88b3195c8ffa4..460fffd6d39b8 100644
--- a/store/mockstore/mocktikv/cluster.go
+++ b/store/mockstore/mocktikv/cluster.go
@@ -319,12 +319,13 @@ func (c *Cluster) Split(regionID, newRegionID uint64, key []byte, peerIDs []uint
 }
 
 // SplitRaw splits a Region at the key (not encoded) and creates new Region.
-func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) {
+func (c *Cluster) SplitRaw(regionID, newRegionID uint64, rawKey []byte, peerIDs []uint64, leaderPeerID uint64) *Region {
 	c.Lock()
 	defer c.Unlock()
 
 	newRegion := c.regions[regionID].split(newRegionID, rawKey, peerIDs, leaderPeerID)
 	c.regions[newRegionID] = newRegion
+	return newRegion
 }
 
 // Merge merges 2 regions, their key ranges should be adjacent.
diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go
index 5e9d253cc4ffe..e4919318048fa 100644
--- a/store/mockstore/mocktikv/rpc.go
+++ b/store/mockstore/mocktikv/rpc.go
@@ -597,8 +597,8 @@ func (h *rpcHandler) handleSplitRegion(req *kvrpcpb.SplitRegionRequest) *kvrpcpb
 		return &kvrpcpb.SplitRegionResponse{}
 	}
 	newRegionID, newPeerIDs := h.cluster.AllocID(), h.cluster.AllocIDs(len(region.Peers))
-	h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
-	return &kvrpcpb.SplitRegionResponse{}
+	newRegion := h.cluster.SplitRaw(region.GetId(), newRegionID, key, newPeerIDs, newPeerIDs[0])
+	return &kvrpcpb.SplitRegionResponse{Left: newRegion.Meta}
 }
 
 // RPCClient sends kv RPC calls to mock cluster. RPCClient mocks the behavior of
diff --git a/store/tikv/split_region.go b/store/tikv/split_region.go
index 8851b9078bf29..4a84f29ddb774 100644
--- a/store/tikv/split_region.go
+++ b/store/tikv/split_region.go
@@ -104,10 +104,15 @@ func (s *tikvStore) scatterRegion(regionID uint64) error {
 }
 
 // WaitScatterRegionFinish implements SplitableStore interface.
-func (s *tikvStore) WaitScatterRegionFinish(regionID uint64) error {
+// backOff is the back off time of the wait scatter region.(Milliseconds)
+// if backOff <= 0, the default wait scatter back off time will be used.
+func (s *tikvStore) WaitScatterRegionFinish(regionID uint64, backOff int) error {
 	logutil.Logger(context.Background()).Info("wait scatter region",
 		zap.Uint64("regionID", regionID))
-	bo := NewBackoffer(context.Background(), waitScatterRegionFinishBackoff)
+	if backOff <= 0 {
+		backOff = waitScatterRegionFinishBackoff
+	}
+	bo := NewBackoffer(context.Background(), backOff)
 	logFreq := 0
 	for {
 		resp, err := s.pdClient.GetOperator(context.Background(), regionID)