Skip to content

Commit

Permalink
Revert "maintainer: mainter manager use parallel DS (pingcap#706)"
Browse files Browse the repository at this point in the history
This reverts commit cf291d1.
  • Loading branch information
asddongmen committed Dec 20, 2024
1 parent cf291d1 commit 9a1f9da
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 4 deletions.
1 change: 1 addition & 0 deletions .github/workflows/uint_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,5 @@ jobs:
- name: Unit Test
run: |
go test --tags=intest -timeout 120s github.com/pingcap/ticdc/coordinator/...
go test --tags=intest -timeout 120s github.com/pingcap/ticdc/pkg/eventservice...
2 changes: 1 addition & 1 deletion maintainer/maintainer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewMaintainerManager(selfNode *node.Info,
tsoClient: pdClient,
regionCache: regionCache,
}
m.stream = dynstream.NewParallelDynamicStream(func(path common.GID) uint64 { return path.FastHash() }, NewStreamHandler())
m.stream = dynstream.NewDynamicStream(NewStreamHandler())
m.stream.Start()

mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages)
Expand Down
4 changes: 1 addition & 3 deletions utils/dynstream/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,17 +249,15 @@ func (f *Feedback[A, P, D]) String() string {
return fmt.Sprintf("DynamicStream Feedback{Area: %v, Path: %v, Pause: %v}", f.Area, f.Path, f.Pause)
}

// NewDynamicStream creates a new dynamic stream with a single stream by default.
func NewDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](handler H, option ...Option) DynamicStream[A, P, T, D, H] {
opt := NewOption()
opt.StreamCount = 1
if len(option) > 0 {
opt = option[0]
}
opt.StreamCount = 1
return newParallelDynamicStream(func(path P) uint64 { return 0 }, handler, opt)
}

// NewParallelDynamicStream creates a new dynamic stream with CPU number streams by default.
func NewParallelDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](hasher PathHasher[P], handler H, option ...Option) DynamicStream[A, P, T, D, H] {
opt := NewOption()
if len(option) > 0 {
Expand Down

0 comments on commit 9a1f9da

Please sign in to comment.