-
Notifications
You must be signed in to change notification settings - Fork 10
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Job queue #247
Merged
Job queue #247
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
9bb4aa2
Added writeJobQueue.
pstibrany 54ea17e
Use writeJobQueue instead of chan chunkWriteJob.
pstibrany c087ab1
Address review feedback.
pstibrany 79c21ea
Update segment implementation to use indexes for reading and writing,…
pstibrany 39b4a1f
Rename fields.
pstibrany cda7ab4
Don't use large segment size if queue size is small.
pstibrany 9e46633
Always perform at least one read, if there is anything to read.
pstibrany 4c3495f
Added test to check that pop() blocks until channel is closed.
pstibrany File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,127 @@ | ||
package chunks | ||
|
||
import "sync" | ||
|
||
// writeJobQueue is similar to buffered channel of chunkWriteJob, but manages its own buffers | ||
// to avoid using a lot of memory when it's empty. It does that by storing elements into segments | ||
// of equal size (segmentSize). When segment is not used anymore, reference to it are removed, | ||
// so it can be treated as a garbage. | ||
type writeJobQueue struct { | ||
maxSize int | ||
segmentSize int | ||
|
||
mtx sync.Mutex // protects all following variables | ||
pushed, popped *sync.Cond // signalled when something is pushed into the queue or popped from it | ||
first, last *writeJobQueueSegment // pointer to first and last segment, if any | ||
size int // total size of the queue | ||
closed bool // after closing the queue, nothing can be pushed to it | ||
} | ||
|
||
type writeJobQueueSegment struct { | ||
segment []chunkWriteJob | ||
nextRead, nextWrite int // index of next read and next write in this segment. | ||
nextSegment *writeJobQueueSegment // next segment, if any | ||
} | ||
|
||
func newWriteJobQueue(maxSize, segmentSize int) *writeJobQueue { | ||
if maxSize <= 0 || segmentSize <= 0 { | ||
replay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
panic("invalid queue") | ||
} | ||
|
||
q := &writeJobQueue{ | ||
maxSize: maxSize, | ||
segmentSize: segmentSize, | ||
} | ||
|
||
q.pushed = sync.NewCond(&q.mtx) | ||
q.popped = sync.NewCond(&q.mtx) | ||
return q | ||
} | ||
|
||
func (q *writeJobQueue) close() { | ||
q.mtx.Lock() | ||
defer q.mtx.Unlock() | ||
|
||
q.closed = true | ||
|
||
// unblock all blocked goroutines | ||
q.pushed.Broadcast() | ||
q.popped.Broadcast() | ||
} | ||
|
||
// push blocks until there is space available in the queue, and then adds job to the queue. | ||
// If queue is closed or gets closed while waiting for space, push returns false. | ||
func (q *writeJobQueue) push(job chunkWriteJob) bool { | ||
q.mtx.Lock() | ||
defer q.mtx.Unlock() | ||
|
||
// wait until queue has more space or is closed | ||
for !q.closed && q.size >= q.maxSize { | ||
q.popped.Wait() | ||
} | ||
|
||
if q.closed { | ||
return false | ||
} | ||
|
||
// Check if this segment has more space for writing, and create new one if not. | ||
if q.last == nil || q.last.nextWrite >= q.segmentSize { | ||
prevLast := q.last | ||
q.last = &writeJobQueueSegment{ | ||
segment: make([]chunkWriteJob, q.segmentSize), | ||
} | ||
|
||
if prevLast != nil { | ||
prevLast.nextSegment = q.last | ||
} | ||
if q.first == nil { | ||
q.first = q.last | ||
} | ||
} | ||
|
||
q.last.segment[q.last.nextWrite] = job | ||
q.last.nextWrite++ | ||
q.size++ | ||
q.pushed.Signal() | ||
return true | ||
} | ||
|
||
// pop returns first job from the queue, and true. | ||
// if queue is empty, pop blocks until there is a job (returns true), or until queue is closed (returns false). | ||
// If queue was already closed, pop first returns all remaining elements from the queue (with true value), and only then returns false. | ||
func (q *writeJobQueue) pop() (chunkWriteJob, bool) { | ||
q.mtx.Lock() | ||
defer q.mtx.Unlock() | ||
|
||
// wait until something is pushed to the queue, or queue is closed. | ||
for q.size == 0 { | ||
if q.closed { | ||
return chunkWriteJob{}, false | ||
} | ||
|
||
q.pushed.Wait() | ||
} | ||
replay marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
res := q.first.segment[q.first.nextRead] | ||
q.first.segment[q.first.nextRead] = chunkWriteJob{} // clear just-read element | ||
q.first.nextRead++ | ||
q.size-- | ||
|
||
// If we have read all possible elements from first segment, we can drop it. | ||
if q.first.nextRead >= q.segmentSize { | ||
q.first = q.first.nextSegment | ||
if q.first == nil { | ||
q.last = nil | ||
} | ||
} | ||
|
||
q.popped.Signal() | ||
return res, true | ||
} | ||
|
||
func (q *writeJobQueue) length() int { | ||
q.mtx.Lock() | ||
defer q.mtx.Unlock() | ||
|
||
return q.size | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment should be updated too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think comment in
queueSize()
needs to be updated too.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, thanks for catching this. Will send new PR.