Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improved queue processing #562

Merged
merged 24 commits into from
Jan 11, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
803870d
fix: improved queue processing
uhliksk Jan 1, 2023
8d80a0e
fix: process everything at once
uhliksk Jan 1, 2023
2d37ee2
Merge remote-tracking branch 'upstream/master' into fix/improved-queu…
chrisleekr Jan 2, 2023
a9a25e8
Merge remote-tracking branch 'upstream/master' into fix/improved-queu…
chrisleekr Jan 3, 2023
631a973
refactor: reduce eslint disable
chrisleekr Jan 3, 2023
70d8977
refactor: change queue.hold()/executeFor() to execute()
uhliksk Jan 9, 2023
ff4139a
refactor: manual trade tests
uhliksk Jan 9, 2023
897ed30
refactor: manual trade tests
uhliksk Jan 9, 2023
5dd24b5
refactor: symbol tests
uhliksk Jan 9, 2023
6b4c3ed
refactor: binance tests
uhliksk Jan 9, 2023
ce7b5b8
refactor: cancel order test
uhliksk Jan 9, 2023
85880f8
refactor: handle open orders test
uhliksk Jan 9, 2023
5458e9d
refactor: binance tickers tests
uhliksk Jan 9, 2023
648c4d9
refactor: queue tests
uhliksk Jan 9, 2023
59b3cf9
refactor: ensure grid trade order executed tests
uhliksk Jan 9, 2023
c425292
refactor: symbol tests cleanup
uhliksk Jan 9, 2023
a3d84cb
refactor: updated vscode spell words
uhliksk Jan 9, 2023
9e7ec09
refactor: used chain of responsibilities pattern for queue
chrisleekr Jan 10, 2023
3f22882
test: updated queue test
chrisleekr Jan 10, 2023
4b657a1
chore: remove bull packages
chrisleekr Jan 10, 2023
b1e4ad3
refactor: simplify queue.execute logic
uhliksk Jan 11, 2023
c3fd12a
Fixed error ALLOW_CONFIG_MUTATIONS
chrisleekr Jan 11, 2023
bff3868
refactor: corrected promise.all
chrisleekr Jan 11, 2023
8d2be45
docs: updated CHANGELOG.md
chrisleekr Jan 11, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion app/__tests__/server-binance.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ describe('server-binance', () => {
};
mockQueue = {
init: jest.fn().mockResolvedValue(true),
executeFor: jest.fn().mockResolvedValue(true)
executeFor: jest.fn().mockResolvedValue(true),
hold: jest.fn().mockResolvedValue(true)
};
mockSlack = {
sendMessage: jest.fn().mockResolvedValue(true)
Expand Down
3 changes: 2 additions & 1 deletion app/binance/__tests__/candles.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ describe('candles.js', () => {
});

mockQueue = {
executeFor: jest.fn().mockResolvedValue(true)
executeFor: jest.fn().mockResolvedValue(true),
hold: jest.fn().mockResolvedValue(true)
};

jest.mock('../../cronjob/trailingTradeHelper/queue', () => mockQueue);
Expand Down
3 changes: 2 additions & 1 deletion app/binance/__tests__/tickers.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ describe('tickers.js', () => {
cacheMock = cache;

mockQueue = {
executeFor: jest.fn().mockResolvedValue(true)
executeFor: jest.fn().mockResolvedValue(true),
hold: jest.fn().mockResolvedValue(true)
};

jest.mock('../../cronjob/trailingTradeHelper/queue', () => mockQueue);
Expand Down
3 changes: 2 additions & 1 deletion app/binance/__tests__/user.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ describe('user.js', () => {
loggerMock = logger;

mockQueue = {
executeFor: jest.fn().mockResolvedValue(true)
executeFor: jest.fn().mockResolvedValue(true),
hold: jest.fn().mockResolvedValue(true)
};

jest.mock('../../cronjob/trailingTradeHelper/queue', () => mockQueue);
Expand Down
2 changes: 2 additions & 0 deletions app/binance/candles.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ const syncCandles = async (logger, symbols) => {
}
}));

await queue.hold(logger, symbol);

await mongo.bulkWrite(logger, 'trailing-trade-candles', operations);

queue.executeFor(logger, symbol, { correlationId: uuidv4() });
Expand Down
2 changes: 2 additions & 0 deletions app/binance/tickers.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ const setupTickersWebsocket = async (logger, symbols) => {
);

if (canExecuteTrailingTrade) {
await queue.hold(symbolLogger, monitoringSymbol);

queue.executeFor(symbolLogger, monitoringSymbol, { correlationId });
}
});
Expand Down
4 changes: 4 additions & 0 deletions app/binance/user.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ const setupUserWebsocket = async logger => {
transactTime
};

await queue.hold(symbolLogger, symbol);

await updateGridTradeLastOrder(
symbolLogger,
symbol,
Expand All @@ -123,6 +125,8 @@ const setupUserWebsocket = async logger => {
const manualOrder = await getManualOrder(symbolLogger, symbol, orderId);

if (_.isEmpty(manualOrder) === false) {
await queue.hold(symbolLogger, symbol);

await saveManualOrder(symbolLogger, symbol, orderId, {
...manualOrder,
status: orderStatus,
Expand Down
84 changes: 83 additions & 1 deletion app/cronjob/trailingTradeHelper/__tests__/queue.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ describe('queue', () => {
let mockQueueProcess;
let mockQueueObliterate;
let mockQueueAdd;
let mockQueuePause;
let mockQueueResume;
let mockQueueGetActiveCount;
let mockQueueGetWaitingCount;
let mockQueueIsPaused;
let mockQueue;

let mockExecuteTrailingTrade;
Expand All @@ -25,12 +30,22 @@ describe('queue', () => {

mockQueueObliterate = jest.fn().mockResolvedValue(true);
mockQueueAdd = jest.fn().mockResolvedValue(true);
mockQueuePause = jest.fn().mockResolvedValue(true);
mockQueueResume = jest.fn().mockResolvedValue(true);
mockQueueGetActiveCount = jest.fn().mockResolvedValue(0);
mockQueueGetWaitingCount = jest.fn().mockResolvedValue(0);
mockQueueIsPaused = jest.fn().mockResolvedValue(false);
mockExecuteTrailingTrade = jest.fn().mockResolvedValue(true);
mockSetBullBoardQueues = jest.fn().mockResolvedValue(true);

mockQueue = jest.fn().mockImplementation((_queueName, _redisUrl) => ({
process: mockQueueProcess,
add: mockQueueAdd,
pause: mockQueuePause,
resume: mockQueueResume,
getActiveCount: mockQueueGetActiveCount,
getWaitingCount: mockQueueGetWaitingCount,
isPaused: mockQueueIsPaused,
obliterate: mockQueueObliterate
}));

Expand All @@ -45,6 +60,73 @@ describe('queue', () => {
}));
});

describe('hold', () => {
describe('when symbol does not exist in the queue', () => {
beforeEach(async () => {
queue = require('../queue');

await queue.init(logger, ['BTCUSDT']);
await queue.hold(logger, 'ETHUSDT');
});

it('does not trigger queue.pause for ETHUSDT', () => {
expect(mockQueuePause).not.toHaveBeenCalled();
});
});

describe('when symbol does exist in the queue', () => {
describe('paused one time', () => {
beforeEach(async () => {
queue = require('../queue');

await queue.init(logger, ['BTCUSDT']);
await queue.hold(logger, 'BTCUSDT');
mockQueueIsPaused.mockReturnValueOnce(true);
await queue.executeFor(logger, 'BTCUSDT');
});

it('does trigger queue.pause once for BTCUSDT', () => {
expect(mockQueuePause).toHaveBeenCalledTimes(1);
});
});

describe('paused two times with active job', () => {
beforeEach(async () => {
mockQueueGetActiveCount.mockReturnValueOnce(1);

queue = require('../queue');

await queue.init(logger, ['BTCUSDT']);
await queue.hold(logger, 'BTCUSDT');
mockQueueIsPaused.mockReturnValueOnce(true);
queue.hold(logger, 'BTCUSDT');
await queue.executeFor(logger, 'BTCUSDT');
});

it('does trigger queue.pause once for BTCUSDT', () => {
expect(mockQueuePause).toHaveBeenCalledTimes(1);
});
});

describe('paused two times with waiting job', () => {
beforeEach(async () => {
mockQueueGetWaitingCount.mockReturnValueOnce(1);
queue = require('../queue');

await queue.init(logger, ['BTCUSDT']);
await queue.hold(logger, 'BTCUSDT');
mockQueueIsPaused.mockReturnValueOnce(true);
queue.hold(logger, 'BTCUSDT');
await queue.executeFor(logger, 'BTCUSDT');
});

it('does trigger queue.pause once for BTCUSDT', () => {
expect(mockQueuePause).toHaveBeenCalledTimes(1);
});
});
});
});

describe('init', () => {
describe('called one time', () => {
beforeEach(async () => {
Expand All @@ -57,7 +139,7 @@ describe('queue', () => {
expect(mockQueue).toHaveBeenCalledWith('BTCUSDT', expect.any(String), {
prefix: `bull`,
limiter: {
max: 1,
max: 100,
duration: 10000,
bounceBack: true
}
Expand Down
93 changes: 89 additions & 4 deletions app/cronjob/trailingTradeHelper/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,88 @@ const { executeTrailingTrade } = require('../index');
const { setBullBoardQueues } = require('../../frontend/bull-board/configure');

let queues = {};
let paused = {};
let resumed = {};

const REDIS_URL = `redis://:${config.get('redis.password')}@${config.get(
'redis.host'
)}:${config.get('redis.port')}/${config.get('redis.db')}`;

const pause = async (funcLogger, symbol) => {
const logger = funcLogger.child({ helper: 'queue' });

// eslint-disable-next-line no-plusplus
const pos = paused[symbol]++;
if (pos > resumed[symbol]) {
logger.info({ symbol }, `Queue ${symbol} pause #${pos} queued`);
while (pos > resumed[symbol]) {
// eslint-disable-next-line no-await-in-loop, no-promise-executor-return
await new Promise(r => setTimeout(r, 10));
}
}

await queues[symbol].pause();

logger.info({ symbol }, `Queue ${symbol} paused #${pos}`);
};

const resume = async (funcLogger, symbol) => {
const logger = funcLogger.child({ helper: 'queue' });

// eslint-disable-next-line no-plusplus
const pos = resumed[symbol]++;

if (paused[symbol] === resumed[symbol]) {
// eslint-disable-next-line no-multi-assign
paused[symbol] = resumed[symbol] = 0;

await queues[symbol].resume();
}

logger.info({ symbol }, `Queue ${symbol} resumed #${pos}`);

if (paused[symbol] === 0) {
logger.info({ symbol }, `Queue ${symbol} resumed last`);
}
};

const waitForJob = async (funcLogger, symbol) => {
const logger = funcLogger.child({ helper: 'queue' });

let active = await queues[symbol].getActiveCount();
if (active > 0) {
logger.info({ symbol }, `Queue ${symbol} wait for ${active} job(s)`);

while (active > 0) {
// eslint-disable-next-line no-await-in-loop, no-promise-executor-return
await new Promise(r => setTimeout(r, 10));
// eslint-disable-next-line no-await-in-loop
active = await queues[symbol].getActiveCount();
}
}

logger.info({ symbol }, `Queue ${symbol} inactive`);
};

const hold = async (funcLogger, symbol) => {
const logger = funcLogger.child({ helper: 'queue' });

if (!(symbol in queues)) {
logger.error({ symbol }, `No queue created for ${symbol} pause`);
return;
}

await pause(funcLogger, symbol);
await waitForJob(funcLogger, symbol);
};

const create = (funcLogger, symbol) => {
const logger = funcLogger.child({ helper: 'queue' });

const queue = new Queue(symbol, REDIS_URL, {
prefix: `bull`,
limiter: {
max: 1,
max: 100,
duration: 10000, // 10 seconds
// bounceBack: When jobs get rate limited, they stay in the waiting queue and are not moved to the delayed queue
bounceBack: true
Expand All @@ -40,10 +110,14 @@ const init = async (funcLogger, symbols) => {
// Completely remove all queues with their data
await Promise.all(_.map(queues, queue => queue.obliterate({ force: true })));
queues = {};
paused = {};
resumed = {};

await Promise.all(
_.map(symbols, async symbol => {
queues[symbol] = create(funcLogger, symbol);
paused[symbol] = 0;
resumed[symbol] = 0;
})
);

Expand All @@ -65,12 +139,23 @@ const executeFor = async (funcLogger, symbol, jobData = {}) => {
return;
}

await queues[symbol].add(jobData, {
removeOnComplete: 100 // number specified the amount of jobs to keep.
});
const waiting = await queues[symbol].getWaitingCount();
if (waiting > 0) {
logger.info({ symbol }, `Already waiting ${waiting} job(s) in ${symbol}`);
} else {
logger.info({ symbol }, `Added job for ${symbol}`);
await queues[symbol].add(jobData, {
removeOnComplete: 100 // number specified the amount of jobs to keep.
});
}

if (await queues[symbol].isPaused()) {
await resume(funcLogger, symbol);
}
};

module.exports = {
init,
hold,
executeFor
};
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ describe('cancel-order.js', () => {
}));

mockQueue = {
executeFor: jest.fn().mockResolvedValue(true)
executeFor: jest.fn().mockResolvedValue(true),
hold: jest.fn().mockResolvedValue(true)
};

jest.mock('../../../../cronjob/trailingTradeHelper/queue', () => mockQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ describe('manual-trade-all-symbols.js', () => {
}));

mockQueue = {
executeFor: jest.fn().mockResolvedValue(true)
executeFor: jest.fn().mockResolvedValue(true),
hold: jest.fn().mockResolvedValue(true)
};

jest.mock('../../../../cronjob/trailingTradeHelper/queue', () => mockQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ describe('manual-trade.js', () => {
}));

mockQueue = {
executeFor: jest.fn().mockResolvedValue(true)
executeFor: jest.fn().mockResolvedValue(true),
hold: jest.fn().mockResolvedValue(true)
};

jest.mock('../../../../cronjob/trailingTradeHelper/queue', () => mockQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ describe('symbol-enable-action.test.js', () => {
};

mockQueue = {
executeFor: jest.fn().mockResolvedValue(true)
executeFor: jest.fn().mockResolvedValue(true),
hold: jest.fn().mockResolvedValue(true)
};

jest.mock('../../../../cronjob/trailingTradeHelper/queue', () => mockQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ describe('symbol-grid-trade-delete.test.js', () => {
};

mockQueue = {
executeFor: jest.fn().mockResolvedValue(true)
executeFor: jest.fn().mockResolvedValue(true),
hold: jest.fn().mockResolvedValue(true)
};

jest.mock('../../../../cronjob/trailingTradeHelper/queue', () => mockQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ describe('symbol-setting-delete.test.js', () => {
};

mockQueue = {
executeFor: jest.fn().mockResolvedValue(true)
executeFor: jest.fn().mockResolvedValue(true),
hold: jest.fn().mockResolvedValue(true)
};

jest.mock('../../../../cronjob/trailingTradeHelper/queue', () => mockQueue);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ describe('symbol-setting-update.test.js', () => {
};

mockQueue = {
executeFor: jest.fn().mockResolvedValue(true)
executeFor: jest.fn().mockResolvedValue(true),
hold: jest.fn().mockResolvedValue(true)
};

jest.mock('../../../../cronjob/trailingTradeHelper/queue', () => mockQueue);
Expand Down
Loading