Skip to content

Commit

Permalink
Make the Mirror Queue a queue
Browse files Browse the repository at this point in the history
Convert the Mirror syncing queue to a modules/queue instead of the old simple queue.

Signed-off-by: Andrew Thornton <[email protected]>
  • Loading branch information
zeripath committed Oct 16, 2021
1 parent 8edda8b commit 6154804
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 47 deletions.
4 changes: 2 additions & 2 deletions custom/conf/app.example.ini
Original file line number Diff line number Diff line change
Expand Up @@ -769,10 +769,10 @@ PATH =
;; Global limit of repositories per user, applied at creation time. -1 means no limit
;MAX_CREATION_LIMIT = -1
;;
;; Mirror sync queue length, increase if mirror syncing starts hanging
;; Mirror sync queue length, increase if mirror syncing starts hanging (DEPRECATED: please use [queue.mirror] LENGTH instead)
;MIRROR_QUEUE_LENGTH = 1000
;;
;; Patch test queue length, increase if pull request patch testing starts hanging
;; Patch test queue length, increase if pull request patch testing starts hanging (DEPRECATED: please use [queue.pr_patch_checker] LENGTH instead)
;PULL_REQUEST_QUEUE_LENGTH = 1000
;;
;; Preferred Licenses to place at the top of the List
Expand Down
24 changes: 21 additions & 3 deletions docs/content/doc/advanced/config-cheat-sheet.en-us.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ Values containing `#` or `;` must be quoted using `` ` `` or `"""`.
- `DEFAULT_PUSH_CREATE_PRIVATE`: **true**: Default private when creating a new repository with push-to-create.
- `MAX_CREATION_LIMIT`: **-1**: Global maximum creation limit of repositories per user,
`-1` means no limit.
- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it
- `PULL_REQUEST_QUEUE_LENGTH`: **1000**: Length of pull request patch test queue, make it. **DEPRECATED** use `LENGTH` in `[queue.pr_patch_checker]`.
as large as possible. Use caution when editing this value.
- `MIRROR_QUEUE_LENGTH`: **1000**: Patch test queue length, increase if pull request patch
testing starts hanging.
testing starts hanging. **DEPRECATED** use `LENGTH` in `[queue.mirror]`.
- `PREFERRED_LICENSES`: **Apache License 2.0,MIT License**: Preferred Licenses to place at
the top of the list. Name must match file name in options/license or custom/options/license.
- `DISABLE_HTTP_GIT`: **false**: Disable the ability to interact with repositories over the
Expand Down Expand Up @@ -382,6 +382,8 @@ relation to port exhaustion.

## Queue (`queue` and `queue.*`)

Configuration at `[queue]` will set defaults for all queues with overrides for individual queues at `[queue.*]`.

- `TYPE`: **persistable-channel**: General queue type, currently support: `persistable-channel` (uses a LevelDB internally), `channel`, `level`, `redis`, `dummy`
- `DATADIR`: **queues/**: Base DataDir for storing persistent and level queues. `DATADIR` for individual queues can be set in `queue.name` sections but will default to `DATADIR/`**`common`**. (Previously each queue would default to `DATADIR/`**`name`**.)
- `LENGTH`: **20**: Maximal queue size before channel queues block
Expand All @@ -400,6 +402,22 @@ relation to port exhaustion.
- `BOOST_TIMEOUT`: **5m**: Boost workers will timeout after this long.
- `BOOST_WORKERS`: **1** (v1.14 and before: **5**): This many workers will be added to the worker pool if there is a boost.

Gitea creates the following non-unique queues:

- `code_indexer`
- `issue_indexer`
- `notification-service`
- `task`
- `mail`
- `push_update`

And the following unique queues:

- `repo_stats_update`
- `repo-archive`
- `mirror`
- `pr_patch_checker`

## Admin (`admin`)

- `DEFAULT_EMAIL_NOTIFICATIONS`: **enabled**: Default configuration for email notifications for users (user configurable). Options: enabled, onmention, disabled
Expand Down Expand Up @@ -588,7 +606,7 @@ Define allowed algorithms and their minimum key length (use -1 to disable a type
command or full path).
- `SENDMAIL_ARGS`: **_empty_**: Specify any extra sendmail arguments.
- `SENDMAIL_TIMEOUT`: **5m**: default timeout for sending email through sendmail
- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue.
- `SEND_BUFFER_LEN`: **100**: Buffer length of mailing queue. **DEPRECATED** use `LENGTH` in `[queue.mailer]`

## Cache (`cache`)

Expand Down
34 changes: 25 additions & 9 deletions modules/queue/unique_queue_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"sync"

"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
)

Expand All @@ -29,7 +30,7 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration
type ChannelUniqueQueue struct {
*WorkerPool
lock sync.Mutex
table map[Data]bool
table map[string]bool
shutdownCtx context.Context
shutdownCtxCancel context.CancelFunc
terminateCtx context.Context
Expand All @@ -54,7 +55,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)

queue := &ChannelUniqueQueue{
table: map[Data]bool{},
table: map[string]bool{},
shutdownCtx: shutdownCtx,
shutdownCtxCancel: shutdownCtxCancel,
terminateCtx: terminateCtx,
Expand All @@ -65,9 +66,14 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
}
queue.WorkerPool = NewWorkerPool(func(data ...Data) {
for _, datum := range data {
queue.lock.Lock()
delete(queue.table, datum)
queue.lock.Unlock()
bs, err := json.Marshal(datum)
if err != nil {
log.Error("unable to marshal data: %v", datum)
} else {
queue.lock.Lock()
delete(queue.table, string(bs))
queue.lock.Unlock()
}
handle(datum)
}
}, config.WorkerPoolConfiguration)
Expand All @@ -94,23 +100,28 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {
if !assignableTo(data, q.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, q.exemplar, q.name)
}

bs, err := json.Marshal(data)
if err != nil {
return err
}
q.lock.Lock()
locked := true
defer func() {
if locked {
q.lock.Unlock()
}
}()
if _, ok := q.table[data]; ok {
if _, ok := q.table[string(bs)]; ok {
return ErrAlreadyInQueue
}
// FIXME: We probably need to implement some sort of limit here
// If the downstream queue blocks this table will grow without limit
q.table[data] = true
q.table[string(bs)] = true
if fn != nil {
err := fn()
if err != nil {
delete(q.table, data)
delete(q.table, string(bs))
return err
}
}
Expand All @@ -122,9 +133,14 @@ func (q *ChannelUniqueQueue) PushFunc(data Data, fn func() error) error {

// Has checks if the data is in the queue
func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
bs, err := json.Marshal(data)
if err != nil {
return false, err
}

q.lock.Lock()
defer q.lock.Unlock()
_, has := q.table[data]
_, has := q.table[string(bs)]
return has, nil
}

Expand Down
12 changes: 12 additions & 0 deletions modules/setting/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,16 @@ func NewQueueService() {
if _, ok := sectionMap["LENGTH"]; !ok {
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.PullRequestQueueLength))
}

// Handle the old mirror queue configuration
// Please note this will be a unique queue
section = Cfg.Section("queue.mirror")
sectionMap = map[string]bool{}
for _, key := range section.Keys() {
sectionMap[key.Name()] = true
}
if _, ok := sectionMap["LENGTH"]; !ok {
_, _ = section.NewKey("LENGTH", fmt.Sprintf("%d", Repository.MirrorQueueLength))
}

}
99 changes: 66 additions & 33 deletions services/mirror/mirror.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,43 @@ package mirror
import (
"context"
"fmt"
"strconv"
"strings"

"code.gitea.io/gitea/models"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/queue"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/modules/sync"
)

// mirrorQueue holds an UniqueQueue object of the mirror
var mirrorQueue = sync.NewUniqueQueue(setting.Repository.MirrorQueueLength)
var mirrorQueue queue.UniqueQueue

// RequestType type of mirror request
type RequestType int

const (
// PullRequestType for pull mirrors
PullRequestType RequestType = iota
// PushRequestType for push mirrors
PushRequestType
)

// Request for the mirror queue
type Request struct {
Type RequestType
RepoID int64
}

// doMirror causes this request to mirror itself
func doMirror(ctx context.Context, req *Request) {
switch req.Type {
case PushRequestType:
_ = SyncPushMirror(ctx, req.RepoID)
case PullRequestType:
_ = SyncPullMirror(ctx, req.RepoID)
default:
log.Error("Unknown Request type in queue: %v for RepoID[%d]", req.Type, req.RepoID)
}
}

// Update checks and updates mirror repositories.
func Update(ctx context.Context) error {
Expand All @@ -29,19 +54,25 @@ func Update(ctx context.Context) error {
log.Trace("Doing: Update")

handler := func(idx int, bean interface{}) error {
var item string
var item Request
if m, ok := bean.(*models.Mirror); ok {
if m.Repo == nil {
log.Error("Disconnected mirror found: %d", m.ID)
return nil
}
item = fmt.Sprintf("pull %d", m.RepoID)
item = Request{
Type: PullRequestType,
RepoID: m.RepoID,
}
} else if m, ok := bean.(*models.PushMirror); ok {
if m.Repo == nil {
log.Error("Disconnected push-mirror found: %d", m.ID)
return nil
}
item = fmt.Sprintf("push %d", m.ID)
item = Request{
Type: PushRequestType,
RepoID: m.RepoID,
}
} else {
log.Error("Unknown bean: %v", bean)
return nil
Expand All @@ -51,8 +82,7 @@ func Update(ctx context.Context) error {
case <-ctx.Done():
return fmt.Errorf("Aborted")
default:
mirrorQueue.Add(item)
return nil
return mirrorQueue.Push(&item)
}
}

Expand All @@ -68,26 +98,10 @@ func Update(ctx context.Context) error {
return nil
}

// syncMirrors checks and syncs mirrors.
// FIXME: graceful: this should be a persistable queue
func syncMirrors(ctx context.Context) {
// Start listening on new sync requests.
for {
select {
case <-ctx.Done():
mirrorQueue.Close()
return
case item := <-mirrorQueue.Queue():
id, _ := strconv.ParseInt(item[5:], 10, 64)
if strings.HasPrefix(item, "pull") {
_ = SyncPullMirror(ctx, id)
} else if strings.HasPrefix(item, "push") {
_ = SyncPushMirror(ctx, id)
} else {
log.Error("Unknown item in queue: %v", item)
}
mirrorQueue.Remove(item)
}
func queueHandle(data ...queue.Data) {
for _, datum := range data {
req := datum.(*Request)
doMirror(graceful.GetManager().ShutdownContext(), req)
}
}

Expand All @@ -96,21 +110,40 @@ func InitSyncMirrors() {
if !setting.Mirror.Enabled {
return
}
go graceful.GetManager().RunWithShutdownContext(syncMirrors)
mirrorQueue = queue.CreateUniqueQueue("mirror", queueHandle, new(Request))

go graceful.GetManager().RunWithShutdownFns(mirrorQueue.Run)
}

// StartToMirror adds repoID to mirror queue
func StartToMirror(repoID int64) {
if !setting.Mirror.Enabled {
return
}
go mirrorQueue.Add(fmt.Sprintf("pull %d", repoID))
go func() {
err := mirrorQueue.Push(&Request{
Type: PushRequestType,
RepoID: repoID,
})
if err != nil {
log.Error("Unable to push push mirror request to the queue for repo[%d]: Error: %v", repoID, err)
}
}()
}

// AddPushMirrorToQueue adds the push mirror to the queue
func AddPushMirrorToQueue(mirrorID int64) {
if !setting.Mirror.Enabled {
return
}
go mirrorQueue.Add(fmt.Sprintf("push %d", mirrorID))
go func() {

err := mirrorQueue.Push(&Request{
Type: PullRequestType,
RepoID: mirrorID,
})
if err != nil {
log.Error("Unable to push pull mirror request to the queue for repo[%d]: Error: %v", mirrorID, err)
}
}()
}

0 comments on commit 6154804

Please sign in to comment.