Skip to content

Commit

Permalink
util: define and implement core interfaces for async api (#1591)
Browse files Browse the repository at this point in the history
ref #1586

Signed-off-by: zyguan <[email protected]>
  • Loading branch information
zyguan authored Feb 27, 2025
1 parent ad47ad6 commit 5ac118b
Show file tree
Hide file tree
Showing 4 changed files with 492 additions and 0 deletions.
83 changes: 83 additions & 0 deletions util/async/core.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2025 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package async

import (
"sync"
)

// Pool is an interface for goroutine pool.
type Pool interface {
// Go submits a function to the goroutine pool.
Go(f func())
}

// Executor is an interface that can append functions to be executed asynchronously.
type Executor interface {
Pool
// Append adds functions to the executor. It should be safe to call Append concurrently.
Append(fs ...func())
}

// Callback defines a callback function that can be invoked immediately or later.
type Callback[T any] interface {
// Executor returns the executor that the callback uses.
Executor() Executor
// Inject adds a deferred action that will be invoked before the callback.
Inject(g func(T, error) (T, error))
// Invoke invokes the callback immediately in current goroutine.
Invoke(val T, err error)
// Schedule schedules the callback to be invoked later, it's typically called in other goroutines.
Schedule(val T, err error)
}

// NewCallback creates a new callback function.
func NewCallback[T any](e Executor, f func(T, error)) Callback[T] {
return &callback[T]{e: e, f: f}
}

type callback[T any] struct {
once sync.Once
e Executor
f func(T, error)
gs []func(T, error) (T, error)
}

// Executor implements Callback.
func (cb *callback[T]) Executor() Executor {
return cb.e
}

// Inject implements Callback.
func (cb *callback[T]) Inject(g func(T, error) (T, error)) {
cb.gs = append(cb.gs, g)
}

// Invoke implements Callback.
func (cb *callback[T]) Invoke(val T, err error) {
cb.once.Do(func() { cb.call(val, err) })
}

// Schedule implements Callback.
func (cb *callback[T]) Schedule(val T, err error) {
cb.once.Do(func() { cb.e.Append(func() { cb.call(val, err) }) })
}

func (cb *callback[T]) call(val T, err error) {
for i := len(cb.gs) - 1; i >= 0; i-- {
val, err = cb.gs[i](val, err)
}
cb.f(val, err)
}
90 changes: 90 additions & 0 deletions util/async/core_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2025 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package async

import (
"sync"
"testing"

"github.com/stretchr/testify/require"
)

type mockExecutor struct {
lock sync.Mutex
tasks []func()
}

func (e *mockExecutor) Go(f func()) {
e.Append(f)
}

func (e *mockExecutor) Append(fs ...func()) {
e.lock.Lock()
e.tasks = append(e.tasks, fs...)
e.lock.Unlock()
}

func TestInjectOrder(t *testing.T) {
cb := NewCallback(&mockExecutor{}, func(ns []int, err error) {
require.NoError(t, err)
// injected functions are executed in reverse order
require.Equal(t, []int{1, 2, 3}, ns)
})
cb.Inject(func(ns []int, err error) ([]int, error) { return append(ns, 3), nil })
cb.Inject(func(ns []int, err error) ([]int, error) { return append(ns, 2), nil })
cb.Inject(func(ns []int, err error) ([]int, error) { return append(ns, 1), nil })
cb.Invoke([]int{}, nil)
}

func TestFulfillOnce(t *testing.T) {
t.Run("InvokeTwice", func(t *testing.T) {
ns := []int{}
cb := NewCallback(&mockExecutor{}, func(n int, err error) { ns = append(ns, n) })
cb.Invoke(1, nil)
cb.Invoke(2, nil)
require.Equal(t, []int{1}, ns)
})
t.Run("ScheduleTwice", func(t *testing.T) {
e := &mockExecutor{}
ns := []int{}
cb := NewCallback(e, func(n int, err error) { ns = append(ns, n) })
cb.Schedule(1, nil)
cb.Schedule(2, nil)
require.Equal(t, 1, len(e.tasks))
require.Equal(t, []int{}, ns)
e.tasks[0]()
require.Equal(t, []int{1}, ns)
})
t.Run("InvokeSchedule", func(t *testing.T) {
e := &mockExecutor{}
ns := []int{}
cb := NewCallback(e, func(n int, err error) { ns = append(ns, n) })
cb.Invoke(1, nil)
cb.Schedule(2, nil)
require.Equal(t, 0, len(e.tasks))
require.Equal(t, []int{1}, ns)
})
t.Run("ScheduleInvoke", func(t *testing.T) {
e := &mockExecutor{}
ns := []int{}
cb := NewCallback(e, func(n int, err error) { ns = append(ns, n) })
cb.Schedule(1, nil)
cb.Invoke(2, nil)
require.Equal(t, 1, len(e.tasks))
require.Equal(t, []int{}, ns)
e.tasks[0]()
require.Equal(t, []int{1}, ns)
})
}
158 changes: 158 additions & 0 deletions util/async/runloop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2025 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package async

import (
"context"
"errors"
"sync"
)

// State represents the state of a run loop.
type State uint32

const (
StateIdle State = iota
StateWaiting
StateRunning
)

// RunLoop implements the Executor interface.
type RunLoop struct {
Pool

lock sync.Mutex
ready chan struct{}
runnable []func()
running []func()
state State
}

// NewRunLoop creates a new run-loop.
func NewRunLoop() *RunLoop {
return &RunLoop{ready: make(chan struct{})}
}

// Go submits f to the pool when possible (pool is not nil), otherwise starts a new goroutine for f.
func (l *RunLoop) Go(f func()) {
if l.Pool == nil {
go f()
} else {
l.Pool.Go(f)
}
}

// State returns the current state of the run-loop.
func (l *RunLoop) State() State {
l.lock.Lock()
state := l.state
l.lock.Unlock()
return state
}

// NumRunnable returns the number of runnable tasks in the run-loop currently.
func (l *RunLoop) NumRunnable() int {
l.lock.Lock()
n := len(l.runnable)
l.lock.Unlock()
return n
}

// Append implements the Executor interface. It's safe to call Append concurrently.
func (l *RunLoop) Append(fs ...func()) {
if len(fs) == 0 {
return
}

notify := false

l.lock.Lock()
l.runnable = append(l.runnable, fs...)
if l.state == StateWaiting {
l.state = StateIdle // waiting -> idle
notify = true
}
l.lock.Unlock()

if notify {
l.ready <- struct{}{}
}
}

// Exec drives the run-loop to execute all runnable tasks and returns the number of tasks executed. If the context is
// done before all tasks are executed, it returns the number of tasks executed and the context error. Exec turns the
// run-loop to running or waiting state during process, and finally to idle state on return. When calling Exec without
// pending runnables, the run-loop turns to waiting, in which case one should make sure that Append will be called in
// the other goroutine to wake it up later, or the context will be canceled finally to break the waiting. Exec should
// only be called by one goroutine.
func (l *RunLoop) Exec(ctx context.Context) (int, error) {
for {
l.lock.Lock()
if l.state != StateIdle {
l.lock.Unlock()
return 0, errors.New("runloop: already executing")
}
// assert l.state == stateIdle

if len(l.runnable) == 0 {
l.state = StateWaiting // idle -> waiting
l.lock.Unlock()
select {
case <-l.ready:
continue
case <-ctx.Done():
l.lock.Lock()
l.state = StateIdle // waiting -> idle
l.lock.Unlock()
return 0, ctx.Err()
}
} else {
l.running, l.runnable = l.runnable, l.running[:0]
l.state = StateRunning // idle -> running
l.lock.Unlock()
return l.run(ctx)
}
}
}

func (l *RunLoop) run(ctx context.Context) (int, error) {
count := 0
for {
for i, f := range l.running {
select {
case <-ctx.Done():
l.lock.Lock()
// move remaining running tasks to runnable
l.running = append(l.running[:0], l.running[i:]...)
l.running = append(l.running, l.runnable...)
l.running, l.runnable = l.runnable, l.running
l.state = StateIdle // running -> idle
l.lock.Unlock()
return count, ctx.Err()
default:
f()
count++
}
}
l.lock.Lock()
if len(l.runnable) == 0 {
l.state = StateIdle // running -> idle
l.lock.Unlock()
return count, nil
}
l.running, l.runnable = l.runnable, l.running[:0]
l.lock.Unlock()
}
}
Loading

0 comments on commit 5ac118b

Please sign in to comment.