diff --git a/.github/workflows/uint_test.yaml b/.github/workflows/uint_test.yaml index bd3baa62d..f70524a67 100644 --- a/.github/workflows/uint_test.yaml +++ b/.github/workflows/uint_test.yaml @@ -88,5 +88,4 @@ jobs: - name: Unit Test run: | go test --tags=intest -timeout 120s github.com/pingcap/ticdc/coordinator/... - go test --tags=intest -timeout 120s github.com/pingcap/ticdc/pkg/eventservice... diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index 25cabcc01..a2dd70c89 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -82,7 +82,7 @@ func NewMaintainerManager(selfNode *node.Info, tsoClient: pdClient, regionCache: regionCache, } - m.stream = dynstream.NewDynamicStream(NewStreamHandler()) + m.stream = dynstream.NewParallelDynamicStream(func(path common.GID) uint64 { return path.FastHash() }, NewStreamHandler()) m.stream.Start() mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages) diff --git a/utils/dynstream/interfaces.go b/utils/dynstream/interfaces.go index a90bbb251..7fad227a5 100644 --- a/utils/dynstream/interfaces.go +++ b/utils/dynstream/interfaces.go @@ -249,15 +249,17 @@ func (f *Feedback[A, P, D]) String() string { return fmt.Sprintf("DynamicStream Feedback{Area: %v, Path: %v, Pause: %v}", f.Area, f.Path, f.Pause) } +// NewDynamicStream creates a new dynamic stream with a single stream by default. func NewDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](handler H, option ...Option) DynamicStream[A, P, T, D, H] { opt := NewOption() + opt.StreamCount = 1 if len(option) > 0 { opt = option[0] } - opt.StreamCount = 1 return newParallelDynamicStream(func(path P) uint64 { return 0 }, handler, opt) } +// NewParallelDynamicStream creates a new dynamic stream with CPU number streams by default. func NewParallelDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](hasher PathHasher[P], handler H, option ...Option) DynamicStream[A, P, T, D, H] { opt := NewOption() if len(option) > 0 {