Skip to content

Commit

Permalink
fix(move): Extend move operaiton lock automatically
Browse files Browse the repository at this point in the history
  • Loading branch information
andris9 committed Mar 14, 2024
1 parent b7f0aa6 commit b1ba513
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
25 changes: 24 additions & 1 deletion lib/api/messages.js
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,34 @@ module.exports = (db, server, messageHandler, userHandler, storageHandler, setti
let lockKey = ['mbwr', mailbox.toString()].join(':');

let lock;
let extendLockIntervalTimer = null;

try {
lock = await server.lock.waitAcquireLock(lockKey, 60 * 60 * 1000, 1 * 60 * 1000);
const LOCK_TTL = 2 * 60 * 1000;

lock = await server.lock.waitAcquireLock(lockKey, LOCK_TTL, 1 * 60 * 1000);
if (!lock.success) {
throw new Error('Failed to get folder write lock');
}
log.verbose(
'API',
'Acquired lock for moving messages user=%s mailbox=%s message=%s moveTo=%s lock=%s',
user.toString(),
mailbox.toString(),
message,
moveTo,
lock.id
);
extendLockIntervalTimer = setInterval(() => {
server.lock
.extendLock(lock, LOCK_TTL)
.then(info => {
log.verbose('API', `Lock extended lock=${info.id} result=${info.success ? 'yes' : 'no'}`);
})
.catch(err => {
log.verbose('API', 'Failed to extend lock lock=%s error=%s', lock?.id, err.message);
});
}, Math.round(LOCK_TTL * 0.8));
} catch (err) {
res.status(500);
return res.json({
Expand All @@ -272,6 +294,7 @@ module.exports = (db, server, messageHandler, userHandler, storageHandler, setti
code: err.code
});
} finally {
clearInterval(extendLockIntervalTimer);
await server.lock.releaseLock(lock);
}

Expand Down
41 changes: 40 additions & 1 deletion lib/handlers/on-expunge.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ module.exports = (server, messageHandler) => (mailbox, update, session, callback
);
};

const LOCK_TTL = 2 * 60 * 1000;
let lockKey = ['mbwr', mailboxData._id.toString()].join(':');
server.lock.waitAcquireLock(lockKey, 60 * 60 * 1000, 1 * 60 * 1000, (err, lock) => {
server.lock.waitAcquireLock(lockKey, LOCK_TTL, 1 * 60 * 1000, (err, lock) => {
if (err) {
return callback(err);
}
Expand All @@ -84,12 +85,48 @@ module.exports = (server, messageHandler) => (mailbox, update, session, callback
return callback(null, new Error('Failed to get folder write lock'));
}

server.logger.debug(
{
tnx: 'MOVE'
},
'Acquired lock for deleting messages user=%s mailbox=%s message=%s lock=%s',
session.user.id.toString(),
mailbox.toString(),
mailboxData._id.toString(),
lock.id
);

let extendLockIntervalTimer = setInterval(() => {
server.lock
.extendLock(lock, LOCK_TTL)
.then(info => {
server.logger.debug(
{
tnx: 'MOVE'
},
`Lock extended lock=${info.id} result=${info.success ? 'yes' : 'no'}`
);
})
.catch(err => {
server.logger.debug(
{
tnx: 'MOVE',
err
},
'Failed to extend lock lock=%s error=%s',
lock?.id,
err.message
);
});
}, Math.round(LOCK_TTL * 0.8));

// fetch entire messages as these need to be copied to the archive
let cursor = db.database.collection('messages').find(query).sort({ uid: 1 }).maxTimeMS(consts.DB_MAX_TIME_MESSAGES);

let processNext = () => {
cursor.next((err, messageData) => {
if (err) {
clearInterval(extendLockIntervalTimer);
return server.lock.releaseLock(lock, () => {
updateQuota(() => callback(err));
});
Expand All @@ -111,6 +148,7 @@ module.exports = (server, messageHandler) => (mailbox, update, session, callback
]
});
}
clearInterval(extendLockIntervalTimer);
return server.lock.releaseLock(lock, () => {
updateQuota(() => callback(null, true));
});
Expand Down Expand Up @@ -140,6 +178,7 @@ module.exports = (server, messageHandler) => (mailbox, update, session, callback
logdata._code = err.code;
logdata._response = err.response;
server.loggelf(logdata);
clearInterval(extendLockIntervalTimer);
return cursor.close(() => server.lock.releaseLock(lock, () => updateQuota(() => callback(err))));
}

Expand Down

0 comments on commit b1ba513

Please sign in to comment.