Skip to content

Commit

Permalink
increased test coverage.
Browse files Browse the repository at this point in the history
  • Loading branch information
smeshkov committed Jun 30, 2024
1 parent 6f4dcf4 commit 2e0df77
Show file tree
Hide file tree
Showing 6 changed files with 100 additions and 13 deletions.
5 changes: 1 addition & 4 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ jobs:
run: go get -t -v ./...

- name: Test
run: go test -coverprofile=coverage.txt -covermode=atomic

- name: Test race
run: go test -race ./...
run: go test -race -coverprofile=coverage.txt -covermode=atomic ./...

- name: Get the version
id: get_version
Expand Down
8 changes: 5 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ jobs:
run: go build -v ./...

- name: Test
run: go test -coverprofile=coverage.txt -covermode=atomic
run: go test -race -coverprofile=coverage.txt -covermode=atomic ./...

- name: Test race
run: go test -race ./...
- name: Upload coverage reports to Codecov
uses: codecov/[email protected]
with:
token: ${{ secrets.CODECOV_TOKEN }}
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ So the **microbatch** is a aimed to provide simple primitive for adopting micro-
## Usage

- You'd need to implement `BatchProcessor` interface to handle your specific jobs.
- New instance of the `MicroBatch` can be created with the `New(yourBatchProcessor, options...)`, options are used for customising configuration.
- New instance of the `MicroBatch` can be created with the `New(yourBatchProcessor, options...)`, options are used for customising configuration (see `Option`).
- Created instance of the `MicroBatch` can be started with the `yourMicroBatch.Start(c)`, where c - is a `context.Context`, `context.Context` is used for signalling when `MicroBatch` needs to stop (so use `context.WithCancel`).
- New running instance of the `MicroBatch` can be created with the `NewRunning(c, yourBatchProcessor)`.
- New job can be submitted to the `MicroBatch` via `yourMicroBatch.Submit(yourJob)`.
Expand Down
5 changes: 1 addition & 4 deletions _bin/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ go install honnef.co/go/tools/cmd/staticcheck@latest
staticcheck ./...

# tests & coverage
go test -coverprofile=_dist/coverage.out -v ./...
go test -race -coverprofile=_dist/coverage.out -v ./...
go tool cover -func=_dist/coverage.out

# check race conditions
go test -race ./...

# clean after self
go mod tidy
67 changes: 67 additions & 0 deletions microbatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,73 @@ func Test_Submit_processes_job_eventually(t *testing.T) {
assert.True(t, check.Load())
}

func Test_Submit_submits_and_completes_all_the_jobs(t *testing.T) {
c, cancel := context.WithCancel(context.TODO())
defer cancel()

bp := newTestBatchProcessor()
limit := Limit[*JobInput, *JobOutput](1) // set batch size to 1
mb, _ := New(bp, limit)
go mb.Start(c)
time.Sleep(1 * time.Millisecond) // give it a tick so that Start routine is triggered

var wg sync.WaitGroup
wg.Add(3)

// submit 3 jobs, which is more than the batch size 1
mb.Submit(func(t *JobInput) *JobResult[*JobOutput] {
defer wg.Done()
return &JobResult[*JobOutput]{}
})
mb.Submit(func(t *JobInput) *JobResult[*JobOutput] {
defer wg.Done()
return &JobResult[*JobOutput]{}
})
mb.Submit(func(t *JobInput) *JobResult[*JobOutput] {
defer wg.Done()
return &JobResult[*JobOutput]{}
})

// make sure to wait for the jobs completion
wg.Wait()

assert.Equal(t, 3, bp.getJobsDone())
}

func Test_Submit_submits_and_completes_all_the_jobs_eventually(t *testing.T) {
c, cancel := context.WithCancel(context.TODO())
defer cancel()

bp := newTestBatchProcessor()
limit := Limit[*JobInput, *JobOutput](5) // set batch size to 5
cycle := Cycle[*JobInput, *JobOutput](1 * time.Millisecond)
mb, _ := New(bp, limit, cycle)
go mb.Start(c)
time.Sleep(1 * time.Millisecond) // give it a tick so that Start routine is triggered

var wg sync.WaitGroup
wg.Add(3)

// submit only 3 jobs, which is less than the batch size 5
mb.Submit(func(t *JobInput) *JobResult[*JobOutput] {
defer wg.Done()
return &JobResult[*JobOutput]{}
})
mb.Submit(func(t *JobInput) *JobResult[*JobOutput] {
defer wg.Done()
return &JobResult[*JobOutput]{}
})
mb.Submit(func(t *JobInput) *JobResult[*JobOutput] {
defer wg.Done()
return &JobResult[*JobOutput]{}
})

// make sure to wait for the jobs completion
wg.Wait()

assert.Equal(t, 3, bp.getJobsDone())
}

func newTestMicroBatch(options ...Option[*JobInput, *JobOutput]) (*MicroBatch[*JobInput, *JobOutput], error) {
return New(newTestBatchProcessor(), options...)
}
26 changes: 25 additions & 1 deletion model_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package microbatch

import "sync/atomic"

type JobInput struct{}
type JobOutput struct{}

type testBatchProcessor[T *JobInput, R *JobOutput] struct{}
type testBatchProcessor[T *JobInput, R *JobOutput] struct {
jobsDone atomic.Int32
}

func newTestBatchProcessor() *testBatchProcessor[*JobInput, *JobOutput] {
return &testBatchProcessor[*JobInput, *JobOutput]{}
Expand All @@ -12,9 +16,29 @@ func newTestBatchProcessor() *testBatchProcessor[*JobInput, *JobOutput] {
func (bp *testBatchProcessor[T, R]) Process(batch []Job[*JobInput, *JobOutput]) error {
for _, j := range batch {
j(&JobInput{})
bp.incrementJobs()
}
return nil
}

func (bp *testBatchProcessor[T, R]) incrementJobs() {
for {
// Load current balance atomically
current := bp.jobsDone.Load()

// Calculate new balance
new := current + 1

// Try to update balance atomically
if bp.jobsDone.CompareAndSwap(current, new) {
return
}
}
}

func (bp *testBatchProcessor[T, R]) getJobsDone() int {
return int(bp.jobsDone.Load())
}

// Ensure testBatchProcessor conforms to the BatchProcessor interface.
var _ BatchProcessor[*JobInput, *JobOutput] = &testBatchProcessor[*JobInput, *JobOutput]{}

0 comments on commit 2e0df77

Please sign in to comment.