Skip to content
This repository has been archived by the owner on Nov 4, 2021. It is now read-only.

Commit

Permalink
Clean up Jobqueues, minor fixes for S3 Queue (#451)
Browse files Browse the repository at this point in the history
* fix s3 consumer, cleanup graphile queue

* tests

* address comments
  • Loading branch information
neilkakkar authored Jun 4, 2021
1 parent 2ba112b commit b499825
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 53 deletions.
37 changes: 4 additions & 33 deletions src/main/job-queues/concurrent/graphile-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,18 @@ import { Pool } from 'pg'
import { EnqueuedJob, JobQueue, OnJobCallback, PluginsServerConfig } from '../../../types'
import { status } from '../../../utils/status'
import { createPostgresPool } from '../../../utils/utils'
import { JobQueueBase } from '../job-queue-base'

export class GraphileQueue implements JobQueue {
export class GraphileQueue extends JobQueueBase {
serverConfig: PluginsServerConfig
started: boolean
paused: boolean
onJob: OnJobCallback | null
runner: Runner | null
consumerPool: Pool | null
producerPool: Pool | null
workerUtilsPromise: Promise<WorkerUtils> | null

constructor(serverConfig: PluginsServerConfig) {
super()
this.serverConfig = serverConfig
this.started = false
this.paused = false
this.onJob = null
this.runner = null
this.consumerPool = null
this.producerPool = null
Expand Down Expand Up @@ -68,32 +64,7 @@ export class GraphileQueue implements JobQueue {

// consumer

async startConsumer(onJob: OnJobCallback): Promise<void> {
this.started = true
this.onJob = onJob
await this.syncState()
}

async stopConsumer(): Promise<void> {
this.started = false
await this.syncState()
}

async pauseConsumer(): Promise<void> {
this.paused = true
await this.syncState()
}

isConsumerPaused(): boolean {
return this.paused
}

async resumeConsumer(): Promise<void> {
this.paused = false
await this.syncState()
}

private async syncState(): Promise<void> {
protected async syncState(): Promise<void> {
if (this.started && !this.paused) {
if (!this.runner) {
this.consumerPool = await this.createPool()
Expand Down
12 changes: 8 additions & 4 deletions src/main/job-queues/job-queue-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ export class JobQueueBase implements JobQueue {
startConsumer(onJob: OnJobCallback): void
// eslint-disable-next-line @typescript-eslint/require-await
async startConsumer(onJob: OnJobCallback): Promise<void> {
this.started = true
this.onJob = onJob
await this.syncState()
if (!this.started) {
this.started = true
await this.syncState()
}
}

stopConsumer(): void
Expand All @@ -62,8 +64,10 @@ export class JobQueueBase implements JobQueue {
resumeConsumer(): void
// eslint-disable-next-line @typescript-eslint/require-await
async resumeConsumer(): Promise<void> {
this.paused = false
await this.syncState()
if (this.paused) {
this.paused = false
await this.syncState()
}
}

// eslint-disable-next-line @typescript-eslint/require-await
Expand Down
18 changes: 2 additions & 16 deletions src/main/job-queues/redlocked/s3-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,17 @@ import { S3Wrapper } from '../../../utils/db/s3-wrapper'
import { UUIDT } from '../../../utils/utils'
import { JobQueueBase } from '../job-queue-base'

const S3_POLL_INTERVAL = 5000
const S3_POLL_INTERVAL = 5

export class S3Queue extends JobQueueBase {
serverConfig: PluginsServerConfig
s3Wrapper: S3Wrapper | null
runner: NodeJS.Timeout | null

constructor(serverConfig: PluginsServerConfig) {
super()
this.serverConfig = serverConfig
this.s3Wrapper = null
this.runner = null
this.intervalSeconds = S3_POLL_INTERVAL
}

// producer
Expand Down Expand Up @@ -50,19 +49,6 @@ export class S3Queue extends JobQueueBase {

// consumer

protected async syncState(): Promise<void> {
if (this.started && !this.paused) {
if (!this.runner) {
await this.connectS3()
this.runner = setTimeout(() => this.readState(), S3_POLL_INTERVAL)
}
} else {
if (this.runner) {
clearTimeout(this.runner)
}
}
}

async readState(): Promise<boolean> {
if (!this.s3Wrapper) {
throw new Error('S3 object not initialized')
Expand Down
63 changes: 63 additions & 0 deletions tests/jobs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,30 @@ describe('job queues', () => {
await waitForLogEntries(2)
expect(testConsole.read()).toEqual([['processEvent'], ['reply', 'runIn']])
})

test('polls for jobs in future', async () => {
const DELAY = 3000 // 3s

// return something to be picked up after a few loops (poll interval is 100ms)
const now = Date.now()

const job: EnqueuedJob = {
type: 'pluginJob',
payload: { key: 'value' },
timestamp: now + DELAY,
pluginConfigId: 2,
pluginConfigTeam: 3,
}

server.hub.jobQueueManager.enqueue(job)
const consumedJob: EnqueuedJob = await new Promise((resolve, reject) => {
server.hub.jobQueueManager.startConsumer((consumedJob) => {
resolve(consumedJob[0])
})
})

expect(consumedJob).toEqual(job)
})
})

describe('connection', () => {
Expand Down Expand Up @@ -284,5 +308,44 @@ describe('job queues', () => {
Key: `prefix/2020-01-01/20200101-123456.123Z-deadbeef.json.gz`,
})
})

test('polls for new jobs', async () => {
const DELAY = 10000 // 10s
// calls the right functions to read the enqueued job
mS3WrapperInstance.mockClear()

// return something to be picked up after a few loops (poll interval is 5s)
const now = Date.now()
const date = new Date(now + DELAY).toISOString()
const [day, time] = date.split('T')
const dayTime = `${day.split('-').join('')}-${time.split(':').join('')}`

const job: EnqueuedJob = {
type: 'pluginJob',
payload: { key: 'value' },
timestamp: now,
pluginConfigId: 2,
pluginConfigTeam: 3,
}

mS3WrapperInstance.listObjectsV2.mockReturnValue({
Contents: [{ Key: `prefix/${day}/${dayTime}-deadbeef.json.gz` }],
})
mS3WrapperInstance.getObject.mockReturnValueOnce({
Body: gzipSync(Buffer.from(JSON.stringify(job), 'utf8')),
})

const consumedJob: EnqueuedJob = await new Promise((resolve, reject) => {
hub.jobQueueManager.startConsumer((consumedJob) => {
resolve(consumedJob[0])
})
})
expect(consumedJob).toEqual(job)
await delay(10)
expect(mS3WrapperInstance.deleteObject).toBeCalledWith({
Bucket: 'bucket-name',
Key: `prefix/${day}/${dayTime}-deadbeef.json.gz`,
})
})
})
})

0 comments on commit b499825

Please sign in to comment.