Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: improved queue processing #562

Merged
merged 24 commits into from
Jan 11, 2023

Conversation

uhliksk
Copy link
Contributor

@uhliksk uhliksk commented Jan 1, 2023

Description

  • Solve issues caused by parallel processing when bot state is modified while current queued job is still running.
  • Removed queue limiter (1 job per 10 seconds) to better detect spikes in market.
  • Fix partially filled orders to be processed when in volatile market order can be cancelled before fully filled.
  • Enqueueing the whole jobs instead of just trailing trade to prevent unnecessary interference of parallel processing.

Related Issue

#456, #487, #560, #574, #575, #576

Motivation and Context

It will prevent the bot to be stuck in unexpected states causing unwanted loss in bear market because when bot is stuck it's not even able to sell.

How Has This Been Tested?

I tested with tight buy and sell triggers to make as much events as possible to test queue hold.

Screenshots (if appropriate):

@chrisleekr chrisleekr added the bug Something isn't working label Jan 2, 2023
@chrisleekr chrisleekr linked an issue Jan 2, 2023 that may be closed by this pull request
@habibalkhabbaz
Copy link
Contributor

Thanks @uhliksk
You solved an issue that we may face and didn't notice it!
Your solution is good 💯
However I am thinking if we can just add any job for a symbol inside its queue instead of pausing and resuming.
Otherwise your solution will be better 👌🏼

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 2, 2023

However I am thinking if we can just add any job for a symbol inside its queue instead of pausing and resuming.

Pausing and resuming is neccessary to be sure there is no running job for current symbol while we are processing it. Without it another already queued job can start before we made neccessary changes in database which can cause interference again.

@@ -11,6 +11,8 @@ const handleSymbolSettingDelete = async (logger, ws, payload) => {

const { symbol } = symbolInfo;

await queue.hold(logger, symbol);
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@uhliksk

I think we can refactor this pattern

  • queue.hold
  • some action
  • queue.executeFor

Something like this
image

If you agree, I will refactor the code.

@habibalkhabbaz
Copy link
Contributor

habibalkhabbaz commented Jan 3, 2023

Pausing and resuming is neccessary to be sure there is no running job for current symbol while we are processing it. Without it another already queued job can start before we made neccessary changes in database which can cause interference again.

Hello @uhliksk

What I mean is that we can add every critical job to the queue of the symbol and not only the executeTrailingTrade
By doing that, the queue will do its job and nothing will interfere and without pausing and resuming.

Let's say that we have the following:

BTCUSDT Queue

  1. Executing trailing trade job...
  2. after a few seconds, the bot 🤖 received an order update while executing trailing trade steps. (outside the queue)
  3. ⚠️ Updated order is done. (outside the queue)
  4. Executed trailing trade job.

This is an issue based on your investigation and it is happened because the order is updated before finishing the trailing trade steps.

So my proposed solution like this:

  1. Executing trailing trade job...
  2. after a few seconds, the bot 🤖 received an order update while executing trailing trade steps.
  3. ✅ Add the order update to the queue.
  4. Executed trailing trade job.
  5. ✅ Order update is done.

The above scenarios based only on the order update but we can do the same for the rest jobs that interfere with executeTrailingTrade

Sample code:

queues['BTCUSDT'].process('trailingTrade', executeTrailingTrade);
queues['BTCUSDT'].process('orders', processOrderUpdate);
queues['BTCUSDT'].process('whatever', processWhatever);

These called named jobs based on the documentation. And it will not affect the queue mechanics to process jobs. The queue will still process them as FIFO

https://optimalbits.github.io/bull/

Let me know if you agree with me. If you agree I can help and try to implement it.

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 3, 2023

Hello @habibalkhabbaz,

yes, this is cleaner solution but much harder to do as all processes currently between hold() and executeFor() have to be moved to separate functions which can be called by queue processor. I can try rework one, for example buy order, this way as a proof of concept and I'll check if there are any issues in real world with that.

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 4, 2023

@habibalkhabbaz I found if we put everything into queue, then trailing trade will be running after every single operation. With current holdAndExecute solution the trailing trade is launched only single time.

Currently when events A, B, C and D are received in short period of time it will:

  1. Process A
  2. Process B
  3. Process C
  4. Process D
  5. Execute trailing trade

If we put everything into queue then it will run:

  1. Process A
  2. Execute trailing trade
  3. Process B
  4. Execute trailing trade
  5. Process C
  6. Execute trailing trade
  7. Process D
  8. Execute trailing trade

Thus I think current holdAndExecute() solution is more effective.

@habibalkhabbaz
Copy link
Contributor

Hello @uhliksk

all processes currently between hold() and executeFor() have to be moved to separate functions

I can see multiple manual actions only using override action. So it can be combined in one process function.

I found if we put everything into queue, then trailing trade will be running after every single operation. With current holdAndExecute solution the trailing trade is launched only single time.

Yes exactly but I think this is what we expect, the trailing trade to run after every change or action. Specially in manual actions.

@chrisleekr may have opinion on this too.

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 5, 2023

Hello @habibalkhabbaz

I found if we put everything into queue, then trailing trade will be running after every single operation. With current holdAndExecute solution the trailing trade is launched only single time.

Yes exactly but I think this is what we expect, the trailing trade to run after every change or action. Specially in manual actions.

If we implement this then queue is not needed anymore as there will be no more than 1 job in queue everytime. Then we just need pause/resume mechanism I already made which will allow running only single job at once in sequence. We can completely remove bull queue and just put executeTrailingTrade() instead of queues[].add() directly. I think counting paused[] and resumed[] will have much less cpu overhead than using Bull queue for those single job queues.

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 5, 2023

Yes exactly but I think this is what we expect, the trailing trade to run after every change or action. Specially in manual actions.

I removed bull queue, renamed paused to started, resumed to finished and it work as expected, jobs running in sequence waiting for each other.

queue.js

const config = require('config');
const _ = require('lodash');
const { executeTrailingTrade } = require('../index');

let started = {};
let finished = {};

const REDIS_URL = `redis://:${config.get('redis.password')}@${config.get(
  'redis.host'
)}:${config.get('redis.port')}/${config.get('redis.db')}`;

const pause = async (funcLogger, symbol) => {
  const logger = funcLogger.child({ helper: 'queue' });

  // eslint-disable-next-line no-plusplus
  const pos = started[symbol]++;
  if (pos > finished[symbol]) {
    logger.info({ symbol }, `Queue ${symbol} job #${pos} waiting`);
    while (pos > finished[symbol]) {
      // eslint-disable-next-line no-await-in-loop, no-promise-executor-return
      await new Promise(r => setTimeout(r, 10));
    }
  }

  logger.info({ symbol }, `Queue ${symbol} job #${pos} started`);
};

const resume = async (funcLogger, symbol) => {
  const logger = funcLogger.child({ helper: 'queue' });

  // eslint-disable-next-line no-plusplus
  const pos = finished[symbol]++;

  if (started[symbol] === finished[symbol]) {
    // eslint-disable-next-line no-multi-assign
    started[symbol] = finished[symbol] = 0;
  }


  logger.info({ symbol }, `Queue ${symbol} job #${pos} finished`);

  if (started[symbol] === 0) {
    logger.info({ symbol }, `Queue ${symbol} finished last`);
  }
};

const hold = async (funcLogger, symbol) => {
  const logger = funcLogger.child({ helper: 'queue' });

  if (!(symbol in started)) {
    logger.info({ symbol }, `No queue created for ${symbol}`);
    return;
  }

  await pause(funcLogger, symbol);
};

const init = async (funcLogger, symbols) => {
  started = {};
  finished = {};

  await Promise.all(
    _.map(symbols, async symbol => {
      started[symbol] = 0;
      finished[symbol] = 0;
    })
  );
};

/**
 * Add executeTrailingTrade job to the queue of a symbol
 *
 * @param {*} funcLogger
 * @param {*} symbol
 */
const executeFor = async (funcLogger, symbol, jobData = {}) => {
  const logger = funcLogger.child({ helper: 'queue' });

  if (!(symbol in started)) {
    logger.error({ symbol }, `No queue created for ${symbol}`);
    return;
  }

  await executeTrailingTrade(
    logger,
    symbol,
    _.get(jobData.data, 'correlationId')
  );

  await resume(funcLogger, symbol);
};

module.exports = {
  init,
  hold,
  executeFor
};

For example see ADABUSD jobs:
image

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 5, 2023

The queue as @habibalkhabbaz suggested is very stable, running executeTrailingTrade sequentially after every operation. If you @chrisleekr haven't done holdAndExecuteFor yet then wait and I'll rework this PR with current state of queue.js and make a new commit. If you're working on it, I'll wait and make neccesarry changes after you make your commit.

But more importantly I finally found the root cause of all random issues with orders here:

const updateGridTradeLastOrder = async (logger, symbol, side, newOrder) => {
await saveGridTradeOrder(
logger,
`${symbol}-grid-trade-last-${side}-order`,
newOrder
);
logger.info(`Updated grid trade last ${side} order to cache`);
};

When binance will send order event out of order then updateGridTradeLastOrder is trying to update already non-existent order. As it is calling upsertOne it will create new one and this will then cause bot stuck at There is a last gird trade buy order. Wait.. Since I added a check if order already exist I haven't single issue with buy/sell orders stuck.

const updateGridTradeLastOrder = async (logger, symbol, side, newOrder) => {
  const lastOrder = await getGridTradeLastOrder(
          logger,
          symbol,
          side.toLowerCase()
        );

  if (_.isEmpty(lastOrder) === false) {
    await saveGridTradeOrder(
      logger,
      `${symbol}-grid-trade-last-${side}-order`,
      newOrder
    );

    logger.info(`Updated grid trade last ${side} order to cache`);
  } else {
    logger.error(`Grid trade last ${side} order doesn't exist`);
  }
};

@chrisleekr
Copy link
Owner

@uhliksk

I haven't made any changes, so if you have changes go ahead.

Sorry for not being helpful.

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 5, 2023

@chrisleekr

You don't have to be sorry. Your idea is good and I'll implement it into new solution.

@habibalkhabbaz
Copy link
Contributor

Good catch! @uhliksk

@habibalkhabbaz
Copy link
Contributor

habibalkhabbaz commented Jan 7, 2023

Hello @uhliksk

I just checked that updateGridTradeLastOrder has two usages.
First one in app/binance/orders.js and the second one in app/binance/user.js.
The one in app/binance/user.js already checking for order existence except app/binance/orders.js which is calling that function only once on bot restart to sync the orders.
Can you confirm?

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 7, 2023

Hello @uhliksk

I just checked that updateGridTradeLastOrder has two usages. First one in app/binance/orders.js and the second one in app/binance/user.js. The one in app/binance/user.js already checking for order existence except app/binance/orders.js which is calling that function only once on bot restart to sync the orders. Can you confirm?

Hello @habibalkhabbaz

Thank you for your suggestion. Yes, I'm aware of both usages, but thanks to logging of last trade order mismatch in updateGridTradeLastOrder() I found the problem was we were reading getGridTradeLastOrder() in user.js before queue was processed and this was actually main cause of problems if too many events come in short period of time. I fixed a problem and now this check in updateGridTradeLastOrder is no longer necessary and I reverted it back.

Now I have rock solid version tested for last 24 hours with thousands of orders made by tight trigger values without a single issue (just a little bit of financial loss because of trading fees were higher than profit 🙂). I'm reworking lint tests now and will commit updated code today or tomorrow for review.

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 8, 2023

@habibalkhabbaz @chrisleekr
I can't figure why lint test is not working for manual-trade.test.js. I made changes to run everything in queue.

I have this code in queue.js:

 // Preprocess
  let canExecuteTrailingTrade;
  if (parameters.preprocessFn) {
    canExecuteTrailingTrade = await parameters.preprocessFn();

    logger.info({ symbol }, `Queue ${symbol} job preprocessed`);
  }

  // Execute (if preprocessed)
  if (parameters.execute !== undefined) {
    canExecuteTrailingTrade = parameters.execute;
  }
  if (canExecuteTrailingTrade) {
    await executeTrailingTrade(logger, symbol, jobData);
  }

In manual-trade.js I have this code:

  const saveOverrideActionFn = async () => {
    await saveOverrideAction(
      logger,
      symbol,
      {
        action: 'manual-trade',
        order,
        actionAt: moment().toISOString(),
        triggeredBy: 'user'
      },
      'The manual order received by the bot. Wait for placing the order.'
    );
  };

  queue.execute(
    logger,
    symbol,
    {
      start: true,
      preprocessFn: saveOverrideActionFn,
      execute: true,
      finish: true
    },
    {
      correlationId: _.get(logger, 'fields.correlationId', '')
    }
  );

In manual-trade.test.js I have this code:

    mockSaveOverrideAction = jest.fn().mockResolvedValue(true);

    jest.mock('../../../../cronjob/trailingTradeHelper/common', () => ({
      saveOverrideAction: mockSaveOverrideAction
    }));

    mockExecuteTrailingTrade = jest.fn().mockResolvedValue(true);

    jest.mock('../../../../cronjob', async () => ({
      executeTrailingTrade: mockExecuteTrailingTrade
    }));

...

 it('triggers saveOverrideAction', () => {
    expect(mockSaveOverrideAction).toHaveBeenCalledWith(
      loggerMock,
      'BTCUSDT',
      {
        action: 'manual-trade',
        order: {
          some: 'value'
        },
        actionAt: expect.any(String),
        triggeredBy: 'user'
      },
      'The manual order received by the bot. Wait for placing the order.'
    );
  });

  it('triggers executeTrailingTrade', () => {
    expect(mockExecuteTrailingTrade).toHaveBeenCalledWith(
      loggerMock,
      'BTCUSDT',
      {
        correlationId: 'correlationId'
      }
    );
  });

For me it looks OK, but when I try npm run test it will result in executeTrailingTrade is not detected.

image

Any idea what's wrong? The code is fully working in production, I just have issues with tests. Can I force commit without lint tests so you can see the whole thing?

@habibalkhabbaz
Copy link
Contributor

habibalkhabbaz commented Jan 8, 2023

Hi @uhliksk

You are doing it right. Only in the test you have to mock queue.execute to have been called instead of executeTrailingTrade

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 8, 2023

Hi @uhliksk

You are doing it right. Only in the test you have to mock queue.execute to have been called instead of executeTrailingTrade

I tried but then const saveOverrideActionFn line is marked as uncovered.

@chrisleekr
Copy link
Owner

chrisleekr commented Jan 9, 2023

@uhliksk let me try.

Oh, can you commit the changes?

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 9, 2023

@chrisleekr I finally achieved all tests passed. Should I squash all those commits into single one or can I keep this way? PR is ready to review and merge if you don't find any issues.

@chrisleekr
Copy link
Owner

@uhliksk wow, nice work!!!

I will review tonight and get back to you!

}

// Postprocess
if (modifiers.postprocessFn) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

postprocessFn seems not to be used anywhere. Can remove it?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've removed it, but let me know if it is supposed to be there.

@chrisleekr
Copy link
Owner

chrisleekr commented Jan 10, 2023

@uhliksk while I was testing in TestNet, I found something weird.
I suspect my change could cause it?

Let me line up what I found first.

There was an order which is filled for BNBBTC. But it executed three times in a row. And ended up wrong quantity.

This is a log file.

BNBBTC-2023-01-10-12-01-40.zip

And here is the CSV file extracted from the log file using log-analyser.js

result.csv

The log shows that while job one is running, job two is also running. Can the job run simultaneously?

image

This is a screenshot of the log showing the last buy price calculation multiple times.

image

As you can see, the final balance becomes 0.03, which should be 0.01.

image

Is this happening to you as well? Or it's caused by my change - 9e7ec09

If you don't have the same issue, I will revert and test again.

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 10, 2023

@chrisleekr I have no issues with my code and I'm little bit lost in your refactored one. I think you have that issue because you changed the original logic as preprocessFn should be running before executeTrailingTrade and in your code executeTrailingTrade is running before and also after preprocessFn if modifiers.queue and modifiers.preprocessFn is true or undefined at once. I think it'll be better to revert it.

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 10, 2023

@chrisleekr Maybe I misunderstood the logic behind your code as it is confusing for me you created two parts with executeTrailingTrade call, but still if there is modifiers.queue truthy then you are not doing preprocessing at all. This is wrong.

logger.info({ symbol }, `Queue ${symbol} job #${startPos} started`);
await executeTrailingTrade(
funcLogger,
symbol,
_.get(jobData, 'correlationId')
);
const finishPos = (finishedJobs[symbol] += 1) - 1;

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 10, 2023

@chrisleekr If you want I can refactor my code a little bit to make it cleaner to understand a logic behind it as I think you missed a point of how canExecuteTrailingTrade logic works.

@chrisleekr
Copy link
Owner

chrisleekr commented Jan 10, 2023

@uhliksk yes please. 👍

Just note it means ‘queue’ test is not covering all scenarios.
After refactoring the code if the test is passed, then I assume refactoring is ok.

Anyway, I am happy you revert my change and refactor the code!

Thank you so much for your work!
It’s really good code.

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 10, 2023

@uhliksk yes please. 👍

Just note it means ‘queue’ test is not covering all scenarios. After refactoring the code if the test is passed, then I assume refactoring is ok.

That's true if you don't remove any previous test, if you know what I mean. ;) But still you're right. I should cover all possible scenarios in tests and not just what I think is enough to test the logic.

Anyway, I am happy you revert my change and refactor the code!

Thank you so much for your work! It’s really good code.

Thank you. I'll implement postprocessFn also and I'll make it cleaner in execute() as I mentioned before. I was in a hurry little bit with this code as I wanted to use it in production ASAP because of some unexpected losses I had with previous queue implementation.

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 11, 2023

@chrisleekr

I simplified the logic as I removed the modifier queue because it was necessary once in tickers.js and it can be solved without it.

I moved preprocessFn and postprocessFn into jobData and renamed it to jobPayload as it contain mixed content of data and functions now.

I removed await operator from queue.execute calls as all critical code is processed in this asynchronous job and thus there is no need to wait to finish the jobs anymore.

I added more tests into queue.test.js to better cover all expected scenarios.

PR is ready for review. I hope you'll like it.

@chrisleekr
Copy link
Owner

chrisleekr commented Jan 11, 2023

@uhliksk New change looks very good. I will quickly go through and merge in!

@chrisleekr chrisleekr merged commit 98b756e into chrisleekr:master Jan 11, 2023
@chrisleekr
Copy link
Owner

This is a really good fix. I've learnt a lot from your code @uhliksk

Of course, thanks to @habibalkhabbaz for your input as well. You know I always appreciate you. 💯

As I merged to master, it will build a development image; let's run it for 1-2 days, and I will release it.

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 11, 2023

@chrisleekr There is a hidden bug need to be fixed before release. I'll prepare PR.

This queue.execute have to have await as syncAll is resetting queue counters and if there is still running job it will mess with counters and start running jobs in unexpected order. It can happen for example when bot configuration is changed.

await Promise.all(
symbols.map(async symbol => queue.execute(logger, symbol))
);
});
await syncAll(logger);

@uhliksk
Copy link
Contributor Author

uhliksk commented Jan 12, 2023

@chrisleekr Don't need to add await. I fixed it in more accurate way. Please, merge #581 before release.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Buy grid stuck after last buy order is not removed from mongodb
3 participants