Skip to content

Commit

Permalink
task queue: support multiple client subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
agungdwiprasetyo committed Dec 3, 2020
1 parent 2e05fef commit 56db39b
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 99 deletions.
80 changes: 53 additions & 27 deletions codebase/app/task_queue_worker/graphql_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,15 @@ type rootResolver struct {
worker *taskQueueWorker
}

func (r *rootResolver) Tagline(ctx context.Context) string {
return "GraphQL service for Task Queue Worker dashboard"
func (r *rootResolver) Tagline(ctx context.Context) (res TaglineResolver) {
for taskClient := range clientTaskSubscribers {
res.TaskListClientSubscribers = append(res.TaskListClientSubscribers, taskClient)
}
for client := range clientJobTaskSubscribers {
res.JobListClientSubscribers = append(res.JobListClientSubscribers, client)
}
res.Tagline = "GraphQL service for Task Queue Worker dashboard"
return
}

func (r *rootResolver) AddJob(ctx context.Context, input struct {
Expand All @@ -67,7 +74,8 @@ func (r *rootResolver) StopJob(ctx context.Context, input struct {
}

job.Status = string(statusStopped)
r.worker.broadcastEvent(&job)
repo.saveJob(job)
broadcastAllToSubscribers()

return "Success stop job " + input.JobID, nil
}
Expand All @@ -87,10 +95,10 @@ func (r *rootResolver) RetryJob(ctx context.Context, input struct {
job.Retries = 0
}
job.Status = string(statusQueueing)
repo.saveJob(job)
queue.PushJob(&job)
registerJobToWorker(&job, task.workerIndex)
r.worker.broadcastRefreshClientSubscriber(&job)
repo.saveJob(job)
broadcastAllToSubscribers()
}(job)

return "Success retry job " + input.JobID, nil
Expand All @@ -101,57 +109,75 @@ func (r *rootResolver) CleanJob(ctx context.Context, input struct {
}) (string, error) {

repo.cleanJob(input.TaskName)
go r.worker.broadcastRefreshClientSubscriber(&Job{TaskName: input.TaskName})
go broadcastAllToSubscribers()

return "Success clean all job in task " + input.TaskName, nil
}

func (r *rootResolver) SubscribeAllTask(ctx context.Context) <-chan []TaskResolver {
func (r *rootResolver) SubscribeAllTask(ctx context.Context) (<-chan []TaskResolver, error) {
output := make(chan []TaskResolver)

httpHeader := candishared.GetValueFromContext(ctx, candishared.ContextKeyHTTPHeader).(http.Header)
clientID := httpHeader.Get("Sec-WebSocket-Key")

if err := registerNewTaskListSubscriber(clientID, output); err != nil {
return nil, err
}

go func() {
r.worker.listenUpdatedTask(output)

broadcastTaskList()

select {
case <-ctx.Done():
removeTaskListSubscriber(clientID)
return
}
}()

return output
return output, nil
}

func (r *rootResolver) ListenTask(ctx context.Context, input struct {
TaskName string
Page, Limit int32
Search *string
Status []string
}) <-chan JobListResolver {
}) (<-chan JobListResolver, error) {

output := make(chan JobListResolver)

go func() {
httpHeader := candishared.GetValueFromContext(ctx, candishared.ContextKeyHTTPHeader).(http.Header)
clientID := httpHeader.Get("Sec-WebSocket-Key")

httpHeader := candishared.GetValueFromContext(ctx, candishared.ContextKeyHTTPHeader).(http.Header)
if input.Page <= 0 {
input.Page = 1
}
if input.Limit <= 0 || input.Limit > 10 {
input.Limit = 10
}

if input.Page <= 0 {
input.Page = 1
}
if input.Limit <= 0 || input.Limit > 10 {
input.Limit = 10
}
filter := Filter{
Page: int(input.Page), Limit: int(input.Limit), Search: input.Search, Status: input.Status, TaskName: input.TaskName,
}

filter := Filter{
Page: int(input.Page), Limit: int(input.Limit), Search: input.Search, Status: input.Status,
}
meta, jobs := repo.findAllJob(input.TaskName, filter)
if err := registerNewJobListSubscriber(input.TaskName, clientID, filter, output); err != nil {
return nil, err
}

go func() {

meta, jobs := repo.findAllJob(filter)
output <- JobListResolver{
Meta: meta, Data: jobs,
}

r.worker.registerNewSubscriber(input.TaskName, filter, output)

select {
case <-ctx.Done():
fmt.Println("close", httpHeader.Get("Sec-WebSocket-Key"))
// close(output)
removeJobListSubscriber(input.TaskName, clientID)
return
}
}()

return output
return output, nil
}
8 changes: 7 additions & 1 deletion codebase/app/task_queue_worker/graphql_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const schema = `schema {
}
type Query {
tagline(): String!
tagline(): TaglineType!
}
type Mutation {
Expand All @@ -22,6 +22,12 @@ type Subscription {
listen_task(task_name: String!, page: Int!, limit: Int!, search: String, status: [String!]!): JobListType!
}
type TaglineType {
tagline: String!
task_list_client_subscribers: [String!]!
job_list_client_subscribers: [String!]!
}
type MetaType {
page: Int!
limit: Int!
Expand Down
1 change: 1 addition & 0 deletions codebase/app/task_queue_worker/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func AddJob(taskName string, maxRetry int, args []byte) (err error) {
queue.PushJob(&job)
registerJobToWorker(&job, workerIndex)
repo.saveJob(job)
broadcastAllToSubscribers()
}(newJob, task.workerIndex)

return nil
Expand Down
22 changes: 11 additions & 11 deletions codebase/app/task_queue_worker/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func createMongoIndex(db *mongo.Database) {
},
{
Keys: bson.M{
"arguments": 1,
"arguments": "text",
},
Options: &options.IndexOptions{},
},
Expand All @@ -59,7 +59,7 @@ type storage struct {
mongoRead, mongoWrite *mongo.Database
}

func (s *storage) findAllJob(taskName string, filter Filter) (meta Meta, jobs []Job) {
func (s *storage) findAllJob(filter Filter) (meta Meta, jobs []Job) {
ctx := context.Background()

lim := int64(filter.Limit)
Expand All @@ -71,7 +71,7 @@ func (s *storage) findAllJob(taskName string, filter Filter) (meta Meta, jobs []
}

pipeQuery := []bson.M{
{"task_name": taskName},
{"task_name": filter.TaskName},
}
if filter.Search != nil && *filter.Search != "" {
pipeQuery = append(pipeQuery, bson.M{
Expand Down Expand Up @@ -105,22 +105,22 @@ func (s *storage) findAllJob(taskName string, filter Filter) (meta Meta, jobs []
jobs = append(jobs, job)
}

meta.Detail.GiveUp = repo.countTaskJobDetail(taskName, statusFailure)
meta.Detail.Retrying = repo.countTaskJobDetail(taskName, statusRetrying)
meta.Detail.Success = repo.countTaskJobDetail(taskName, statusSuccess)
meta.Detail.Queueing = repo.countTaskJobDetail(taskName, statusQueueing)
meta.Detail.Stopped = repo.countTaskJobDetail(taskName, statusStopped)
meta.TotalRecords = s.countTaskJob(taskName, filter)
meta.Detail.GiveUp = repo.countTaskJobDetail(filter.TaskName, statusFailure)
meta.Detail.Retrying = repo.countTaskJobDetail(filter.TaskName, statusRetrying)
meta.Detail.Success = repo.countTaskJobDetail(filter.TaskName, statusSuccess)
meta.Detail.Queueing = repo.countTaskJobDetail(filter.TaskName, statusQueueing)
meta.Detail.Stopped = repo.countTaskJobDetail(filter.TaskName, statusStopped)
meta.TotalRecords = s.countTaskJob(filter)
meta.Page, meta.Limit = filter.Page, filter.Limit
meta.TotalPages = int(math.Ceil(float64(meta.TotalRecords) / float64(meta.Limit)))
return
}

func (s *storage) countTaskJob(taskName string, filter Filter) int {
func (s *storage) countTaskJob(filter Filter) int {
ctx := context.Background()

pipeQuery := []bson.M{
{"task_name": taskName},
{"task_name": filter.TaskName},
}
if filter.Search != nil && *filter.Search != "" {
pipeQuery = append(pipeQuery, bson.M{
Expand Down
84 changes: 84 additions & 0 deletions codebase/app/task_queue_worker/subscribers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package taskqueueworker

import (
"errors"
)

const maxClientSubscribers = 2

var errClientLimitExceeded = errors.New("client limit exceeded, please try again later")

func registerNewTaskListSubscriber(clientID string, clientChannel chan []TaskResolver) error {
if len(clientTaskSubscribers) >= maxClientSubscribers {
return errClientLimitExceeded
}

mutex.Lock()
defer mutex.Unlock()

clientTaskSubscribers[clientID] = clientChannel
return nil
}

func removeTaskListSubscriber(clientID string) {
mutex.Lock()
defer mutex.Unlock()

delete(clientTaskSubscribers, clientID)
}

func registerNewJobListSubscriber(taskName, clientID string, filter Filter, clientChannel chan JobListResolver) error {
if len(clientJobTaskSubscribers) >= maxClientSubscribers {
return errClientLimitExceeded
}

mutex.Lock()
defer mutex.Unlock()

clientJobTaskSubscribers[clientID] = clientJobTaskSubscriber{
c: clientChannel, filter: filter,
}
return nil
}

func removeJobListSubscriber(taskName, clientID string) {
mutex.Lock()
defer mutex.Unlock()

delete(clientJobTaskSubscribers, clientID)
}

func broadcastAllToSubscribers() {
go broadcastTaskList()
go broadcastJobList()
}

func broadcastTaskList() {
var taskRes []TaskResolver
for _, task := range tasks {
var tsk = TaskResolver{
Name: task,
}
tsk.Detail.GiveUp = repo.countTaskJobDetail(task, statusFailure)
tsk.Detail.Retrying = repo.countTaskJobDetail(task, statusRetrying)
tsk.Detail.Success = repo.countTaskJobDetail(task, statusSuccess)
tsk.Detail.Queueing = repo.countTaskJobDetail(task, statusQueueing)
tsk.Detail.Stopped = repo.countTaskJobDetail(task, statusStopped)
tsk.TotalJobs = tsk.Detail.GiveUp + tsk.Detail.Retrying + tsk.Detail.Success + tsk.Detail.Queueing + tsk.Detail.Stopped
taskRes = append(taskRes, tsk)
}

for _, subscriber := range clientTaskSubscribers {
subscriber <- taskRes
}
}

func broadcastJobList() {
for _, subscriber := range clientJobTaskSubscribers {
meta, jobs := repo.findAllJob(subscriber.filter)
subscriber.c <- JobListResolver{
Meta: meta,
Data: jobs,
}
}
}
60 changes: 4 additions & 56 deletions codebase/app/task_queue_worker/task_queue_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,16 +160,15 @@ func (t *taskQueueWorker) execJob(workerIndex int) {
if r := recover(); r != nil {
trace.SetError(fmt.Errorf("%v", r))
}
repo.saveJob(job)
broadcastAllToSubscribers()
logger.LogGreen("task queue: " + tracer.GetTraceURL(ctx))
}()

job.Retries++
job.Status = string(statusRetrying)
t.broadcastEvent(&job)

defer func() {
t.broadcastEvent(&job)
}()
repo.saveJob(job)
broadcastAllToSubscribers()

job.TraceID = tracer.GetTraceID(ctx)

Expand Down Expand Up @@ -211,8 +210,6 @@ func (t *taskQueueWorker) execJob(workerIndex int) {
delay += nextJobDelay
}

fmt.Println("DELAY:", delay.Seconds())

taskIndex.activeInterval = time.NewTicker(delay)
workers[workerIndex].Chan = reflect.ValueOf(taskIndex.activeInterval.C)

Expand All @@ -225,52 +222,3 @@ func (t *taskQueueWorker) execJob(workerIndex int) {
job.Status = string(statusSuccess)
}
}

func (t *taskQueueWorker) broadcastEvent(job *Job) {
repo.saveJob(*job)
t.broadcastRefreshClientSubscriber(job)
}

func (t *taskQueueWorker) listenUpdatedTask(clientChannel chan []TaskResolver) {
taskChannel = clientChannel
t.appendTaskDataToChannel()
}

func (t *taskQueueWorker) registerNewSubscriber(taskName string, filter Filter, clientChannel chan JobListResolver) {
mutex.Lock()
defer mutex.Unlock()

clientJobTaskSubscribers[taskName] = clientJobTaskSubscriber{
c: clientChannel, filter: filter,
}
}

func (t *taskQueueWorker) broadcastRefreshClientSubscriber(job *Job) {
clientJobTaskSubscribers := clientJobTaskSubscribers[job.TaskName]
meta, jobs := repo.findAllJob(job.TaskName, clientJobTaskSubscribers.filter)
go func() {
clientJobTaskSubscribers.c <- JobListResolver{
Meta: meta,
Data: jobs,
}
}()
go t.appendTaskDataToChannel()
}

func (t *taskQueueWorker) appendTaskDataToChannel() {
var taskRes []TaskResolver
for _, task := range tasks {
var tsk = TaskResolver{
Name: task,
}
tsk.Detail.GiveUp = repo.countTaskJobDetail(task, statusFailure)
tsk.Detail.Retrying = repo.countTaskJobDetail(task, statusRetrying)
tsk.Detail.Success = repo.countTaskJobDetail(task, statusSuccess)
tsk.Detail.Queueing = repo.countTaskJobDetail(task, statusQueueing)
tsk.Detail.Stopped = repo.countTaskJobDetail(task, statusStopped)
tsk.TotalJobs = tsk.Detail.GiveUp + tsk.Detail.Retrying + tsk.Detail.Success + tsk.Detail.Queueing + tsk.Detail.Stopped
taskRes = append(taskRes, tsk)
}

taskChannel <- taskRes
}
Loading

0 comments on commit 56db39b

Please sign in to comment.