Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(scheduler)!: introduce JobQueue abstraction #80

Merged
merged 1 commit into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,22 +32,23 @@ type Scheduler interface {
GetJobKeys() []int

// GetScheduledJob returns the scheduled job with the specified key.
GetScheduledJob(key int) (*ScheduledJob, error)
GetScheduledJob(key int) (ScheduledJob, error)

// DeleteJob removes the job with the specified key from the Scheduler's execution queue.
// DeleteJob removes the job with the specified key from the
// scheduler's execution queue.
DeleteJob(key int) error

// Clear removes all of the scheduled jobs.
Clear()

// Stop shutdowns the scheduler.
Stop()
Clear() error

// Wait blocks until the scheduler stops running and all jobs
// have returned. Wait will return when the context passed to
// it has expired. Until the context passed to start is
// cancelled or Stop is called directly.
Wait(context.Context)

// Stop shutdowns the scheduler.
Stop()
}
```

Expand Down Expand Up @@ -108,6 +109,10 @@ Implemented Jobs
| Day of week | YES | 1-7 or SUN-SAT | , - * ? / |
| Year | NO | empty, 1970- | , - * / |

## Distributed mode

The scheduler can use its own implementation of `quartz.JobQueue` to allow state sharing.

## Logger

To set a custom logger, use the `logger.SetDefault` function.
Expand Down
2 changes: 1 addition & 1 deletion examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func sampleScheduler(ctx context.Context, wg *sync.WaitGroup) {
return
}

fmt.Println(scheduledJob.TriggerDescription)
fmt.Println(scheduledJob.Trigger().Description())
fmt.Println("Before delete: ", sched.GetJobKeys())
_ = sched.DeleteJob(cronJob.Key())
fmt.Println("After delete: ", sched.GetJobKeys())
Expand Down
2 changes: 1 addition & 1 deletion quartz/function_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestFunctionJob(t *testing.T) {
sched.ScheduleJob(ctx, funcJob1, quartz.NewRunOnceTrigger(time.Millisecond*300))
sched.ScheduleJob(ctx, funcJob2, quartz.NewRunOnceTrigger(time.Millisecond*800))
time.Sleep(time.Second)
sched.Clear()
_ = sched.Clear()
sched.Stop()

assertEqual(t, funcJob1.JobStatus(), quartz.OK)
Expand Down
148 changes: 128 additions & 20 deletions quartz/queue.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,73 @@
package quartz

import "container/heap"
import (
"container/heap"
"errors"
)

// item is the priorityQueue item.
type item struct {
Job Job
Trigger Trigger
priority int64 // item priority, backed by the next run time.
// scheduledJob represents a scheduled job.
// It implements the ScheduledJob interface.
type scheduledJob struct {
job Job
trigger Trigger
priority int64 // job priority, backed by its next run time.
index int // maintained by the heap.Interface methods.
}

var _ ScheduledJob = (*scheduledJob)(nil)

// Job returns the scheduled job instance.
func (scheduled *scheduledJob) Job() Job {
return scheduled.job
}

// Trigger returns the trigger associated with the scheduled job.
func (scheduled *scheduledJob) Trigger() Trigger {
return scheduled.trigger
}

// NextRunTime returns the next run epoch time for the scheduled job.
func (scheduled *scheduledJob) NextRunTime() int64 {
return scheduled.priority
}

// JobQueue represents the job queue used by the scheduler.
// The default jobQueue implementation uses an in-memory priority queue
// to manage scheduled jobs.
// An alternative implementation can be provided for customization, e.g.
// to support persistent storage.
type JobQueue interface {
// Push inserts a new scheduled job at the end of the queue.
Push(job ScheduledJob) error

// Pop removes and returns the next scheduled job from the queue.
Pop() (ScheduledJob, error)

// Head returns the first scheduled job without removing it.
Head() (ScheduledJob, error)

// Remove removes and returns the scheduled job at index i.
Remove(i int) (ScheduledJob, error)

// ScheduledJobs returns the slice of all scheduled jobs in the queue.
ScheduledJobs() []ScheduledJob

// Size returns the size of the job queue.
Size() int

// Clear clears the job queue.
Clear() error
}

// priorityQueue implements the heap.Interface.
type priorityQueue []*item
type priorityQueue []*scheduledJob

var _ heap.Interface = (*priorityQueue)(nil)

// Len returns the priorityQueue length.
func (pq priorityQueue) Len() int { return len(pq) }
func (pq priorityQueue) Len() int {
return len(pq)
}

// Less is the items less comparator.
func (pq priorityQueue) Less(i, j int) bool {
Expand All @@ -29,16 +82,16 @@ func (pq priorityQueue) Swap(i, j int) {
}

// Push implements the heap.Interface.Push.
// Adds x as element Len().
func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*item)
item.index = n
// Adds an element at index Len().
func (pq *priorityQueue) Push(element interface{}) {
index := len(*pq)
item := element.(*scheduledJob)
item.index = index
*pq = append(*pq, item)
}

// Pop implements the heap.Interface.Pop.
// Removes and returns element Len() - 1.
// Removes and returns the element at Len() - 1.
func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
Expand All @@ -48,12 +101,67 @@ func (pq *priorityQueue) Pop() interface{} {
return item
}

// Head returns the first item of the priorityQueue without removing it.
func (pq *priorityQueue) Head() *item {
return (*pq)[0]
// jobQueue implements the JobQueue interface by using an in-memory
// priority queue as the storage layer.
type jobQueue struct {
delegate priorityQueue
}

var _ JobQueue = (*jobQueue)(nil)

// newJobQueue initializes and returns an empty jobQueue.
func newJobQueue() *jobQueue {
return &jobQueue{
delegate: priorityQueue{},
}
}

// Push inserts a new scheduled job at the end of the queue.
func (jq *jobQueue) Push(job ScheduledJob) error {
heap.Push(&jq.delegate, job)
return nil
}

// Pop removes and returns the next scheduled job from the queue.
func (jq *jobQueue) Pop() (ScheduledJob, error) {
if jq.Size() == 0 {
return nil, errors.New("queue is empty")
}
return heap.Pop(&jq.delegate).(ScheduledJob), nil
}

// Head returns the first scheduled job without removing it.
func (jq *jobQueue) Head() (ScheduledJob, error) {
if jq.Size() == 0 {
return nil, errors.New("queue is empty")
}
return jq.delegate[0], nil
}

// Remove removes and returns the scheduled job at index i.
func (jq *jobQueue) Remove(i int) (ScheduledJob, error) {
if jq.Size() <= i {
return nil, errors.New("index out of range")
}
return heap.Remove(&jq.delegate, i).(ScheduledJob), nil
}

// ScheduledJobs returns the slice of all scheduled jobs in the queue.
func (jq *jobQueue) ScheduledJobs() []ScheduledJob {
scheduledJobs := make([]ScheduledJob, len(jq.delegate))
for i, job := range jq.delegate {
scheduledJobs[i] = ScheduledJob(job)
}
return scheduledJobs
}

// Size returns the size of the job queue.
func (jq *jobQueue) Size() int {
return len(jq.delegate)
}

// Remove removes and returns the element at index i from the priorityQueue.
func (pq *priorityQueue) Remove(i int) interface{} {
return heap.Remove(pq, i)
// Clear clears the job queue.
func (jq *jobQueue) Clear() error {
jq.delegate = priorityQueue{}
return nil
}
Loading