Skip to content

Commit

Permalink
Rename partitionOffsetWatcher -> PartitionOffsetWatcher (#10371)
Browse files Browse the repository at this point in the history
  • Loading branch information
leizor authored Jan 8, 2025
1 parent 455f104 commit ce6871c
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 19 deletions.
16 changes: 8 additions & 8 deletions pkg/storage/ingest/partition_offset_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ var (
errPartitionOffsetWatcherStopped = errors.New("partition offset watcher is stopped")
)

type partitionOffsetWatcher struct {
type PartitionOffsetWatcher struct {
services.Service

// mx protects the following fields.
Expand All @@ -35,8 +35,8 @@ type partitionOffsetWatcher struct {
stopped bool
}

func newPartitionOffsetWatcher() *partitionOffsetWatcher {
w := &partitionOffsetWatcher{
func NewPartitionOffsetWatcher() *PartitionOffsetWatcher {
w := &PartitionOffsetWatcher{
lastConsumedOffset: -1, // -1 means nothing has been consumed yet.
watchGroups: map[int64]*partitionOffsetWatchGroup{},
}
Expand All @@ -46,7 +46,7 @@ func newPartitionOffsetWatcher() *partitionOffsetWatcher {
return w
}

func (w *partitionOffsetWatcher) stopping(_ error) error {
func (w *PartitionOffsetWatcher) stopping(_ error) error {
w.mx.Lock()
defer w.mx.Unlock()

Expand All @@ -64,7 +64,7 @@ func (w *partitionOffsetWatcher) stopping(_ error) error {

// Notify that a given offset has been consumed. This function is expected to run very quickly
// and never block in practice.
func (w *partitionOffsetWatcher) Notify(lastConsumedOffset int64) {
func (w *PartitionOffsetWatcher) Notify(lastConsumedOffset int64) {
w.mx.Lock()
defer w.mx.Unlock()

Expand All @@ -90,7 +90,7 @@ func (w *partitionOffsetWatcher) Notify(lastConsumedOffset int64) {
}

// Wait until the given offset has been consumed or the context is canceled.
func (w *partitionOffsetWatcher) Wait(ctx context.Context, waitForOffset int64) error {
func (w *PartitionOffsetWatcher) Wait(ctx context.Context, waitForOffset int64) error {
// A negative offset is used to signal the partition is empty,
// so we can immediately return.
if waitForOffset < 0 {
Expand Down Expand Up @@ -164,7 +164,7 @@ func (w *partitionOffsetWatcher) Wait(ctx context.Context, waitForOffset int64)
}

// LastConsumedOffset returns the last consumed offset.
func (w *partitionOffsetWatcher) LastConsumedOffset() int64 {
func (w *PartitionOffsetWatcher) LastConsumedOffset() int64 {
w.mx.Lock()
defer w.mx.Unlock()

Expand All @@ -173,7 +173,7 @@ func (w *partitionOffsetWatcher) LastConsumedOffset() int64 {

// waitingGoroutinesCount returns the number of active watch groups (an active group has at least
// 1 goroutine waiting). This function is useful for testing.
func (w *partitionOffsetWatcher) watchGroupsCount() int {
func (w *PartitionOffsetWatcher) watchGroupsCount() int {
w.mx.Lock()
defer w.mx.Unlock()

Expand Down
18 changes: 9 additions & 9 deletions pkg/storage/ingest/partition_offset_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestPartitionOffsetWatcher(t *testing.T) {

var (
ctx = context.Background()
w = newPartitionOffsetWatcher()
w = NewPartitionOffsetWatcher()
firstDone = atomic.NewBool(false)
)

Expand Down Expand Up @@ -56,7 +56,7 @@ func TestPartitionOffsetWatcher(t *testing.T) {

var (
ctx = context.Background()
w = newPartitionOffsetWatcher()
w = NewPartitionOffsetWatcher()
firstDone = atomic.NewBool(false)
secondDone = atomic.NewBool(false)
)
Expand Down Expand Up @@ -96,7 +96,7 @@ func TestPartitionOffsetWatcher(t *testing.T) {

var (
ctx = context.Background()
w = newPartitionOffsetWatcher()
w = NewPartitionOffsetWatcher()
firstDone = atomic.NewBool(false)
secondDone = atomic.NewBool(false)
)
Expand Down Expand Up @@ -135,7 +135,7 @@ func TestPartitionOffsetWatcher(t *testing.T) {

var (
ctx = context.Background()
w = newPartitionOffsetWatcher()
w = NewPartitionOffsetWatcher()
)

w.Notify(10)
Expand All @@ -149,7 +149,7 @@ func TestPartitionOffsetWatcher(t *testing.T) {

var (
ctx = context.Background()
w = newPartitionOffsetWatcher()
w = NewPartitionOffsetWatcher()
)

require.NoError(t, w.Wait(ctx, -1))
Expand All @@ -162,7 +162,7 @@ func TestPartitionOffsetWatcher(t *testing.T) {

ctx := context.Background()

w := newPartitionOffsetWatcher()
w := NewPartitionOffsetWatcher()
require.NoError(t, services.StartAndAwaitRunning(ctx, w))

wg := sync.WaitGroup{}
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestPartitionOffsetWatcher_Concurrency(t *testing.T) {

var (
ctx, cancelCtx = context.WithCancel(context.Background())
w = newPartitionOffsetWatcher()
w = NewPartitionOffsetWatcher()
lastNotifiedOffset = atomic.NewInt64(0)
lastWatchedOffset = atomic.NewInt64(0)
notificationsCount = atomic.NewInt64(0)
Expand Down Expand Up @@ -257,7 +257,7 @@ func BenchmarkPartitionOffsetWatcher(b *testing.B) {

var (
ctx = context.Background()
w = newPartitionOffsetWatcher()
w = NewPartitionOffsetWatcher()
)

// Start all watching goroutines.
Expand Down Expand Up @@ -295,7 +295,7 @@ func BenchmarkPartitionOffsetWatcher(b *testing.B) {

var (
ctx, cancelCtx = context.WithCancel(context.Background())
w = newPartitionOffsetWatcher()
w = NewPartitionOffsetWatcher()
nextOffset = atomic.NewInt64(0)
done = atomic.NewBool(false)
)
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/ingest/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type PartitionReader struct {

// consumedOffsetWatcher is used to wait until a given offset has been consumed.
// This gets initialised with -1 which means nothing has been consumed from the partition yet.
consumedOffsetWatcher *partitionOffsetWatcher
consumedOffsetWatcher *PartitionOffsetWatcher
offsetReader *partitionOffsetReader

logger log.Logger
Expand All @@ -118,7 +118,7 @@ func newPartitionReader(kafkaCfg KafkaConfig, partitionID int32, instanceID stri
partitionID: partitionID,
newConsumer: consumer,
consumerGroup: kafkaCfg.GetConsumerGroup(instanceID, partitionID),
consumedOffsetWatcher: newPartitionOffsetWatcher(),
consumedOffsetWatcher: NewPartitionOffsetWatcher(),
concurrentFetchersMinBytesMaxWaitTime: defaultMinBytesMaxWaitTime,
logger: log.With(logger, "partition", partitionID),
reg: reg,
Expand Down

0 comments on commit ce6871c

Please sign in to comment.