diff --git a/examples/gcworker/gcworker.go b/examples/gcworker/gcworker.go index 39da00df0c..1191adcd28 100644 --- a/examples/gcworker/gcworker.go +++ b/examples/gcworker/gcworker.go @@ -21,6 +21,7 @@ import ( "time" "github.com/tikv/client-go/v2/oracle" + "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/txnkv" ) @@ -36,10 +37,15 @@ func main() { panic(err) } - sysSafepoint, err := client.GC(context.Background(), *safepoint) + + sysSafepoint, err := client.GC(context.Background(), *safepoint, tikv.WithConcurrency(10)) if err != nil { panic(err) } fmt.Printf("Finished GC, expect safepoint:%d(%+v),real safepoint:%d(+%v)\n", *safepoint, oracle.GetTimeFromTS(*safepoint), sysSafepoint, oracle.GetTimeFromTS(sysSafepoint)) + err = client.Close() + if err != nil { + panic(err) + } } diff --git a/tikv/gc.go b/tikv/gc.go index d448815ed8..2b47e6bca0 100644 --- a/tikv/gc.go +++ b/tikv/gc.go @@ -50,8 +50,15 @@ import ( // // GC is a simplified version of [GC in TiDB](https://docs.pingcap.com/tidb/stable/garbage-collection-overview). // We skip the second step "delete ranges" which is an optimization for TiDB. -func (s *KVStore) GC(ctx context.Context, safepoint uint64) (newSafePoint uint64, err error) { - err = s.resolveLocks(ctx, safepoint, 8) +func (s *KVStore) GC(ctx context.Context, safepoint uint64, opts ...GCOpt) (newSafePoint uint64, err error) { + // default concurrency 8 + opt := &gcOption{concurrency: 8} + // Apply gc options. + for _, o := range opts { + o(opt) + } + + err = s.resolveLocks(ctx, safepoint, opt.concurrency) if err != nil { return } @@ -59,6 +66,20 @@ func (s *KVStore) GC(ctx context.Context, safepoint uint64) (newSafePoint uint64 return s.pdClient.UpdateGCSafePoint(ctx, safepoint) } +type gcOption struct { + concurrency int +} + +// GCOpt gc options +type GCOpt func(*gcOption) + +// WithConcurrency is used to set gc RangeTaskRunner concurrency. +func WithConcurrency(concurrency int) GCOpt { + return func(opt *gcOption) { + opt.concurrency = concurrency + } +} + func (s *KVStore) resolveLocks(ctx context.Context, safePoint uint64, concurrency int) error { handler := func(ctx context.Context, r kv.KeyRange) (rangetask.TaskStat, error) { return s.resolveLocksForRange(ctx, safePoint, r.StartKey, r.EndKey)