From 6242154b7d9eb6e3b67a91c92c75c22e0873cbad Mon Sep 17 00:00:00 2001 From: Sam Holmes Date: Mon, 29 Jul 2024 15:47:13 -0700 Subject: [PATCH] Move serverState cache processing to each process function This cleans up the pickNextTask function and also removes redundant error handling for each "process" function. --- src/common/utxobased/engine/ServerStates.ts | 2 +- .../utxobased/engine/UtxoEngineProcessor.ts | 109 +++++++----------- 2 files changed, 43 insertions(+), 68 deletions(-) diff --git a/src/common/utxobased/engine/ServerStates.ts b/src/common/utxobased/engine/ServerStates.ts index b767b04e..04c41bca 100644 --- a/src/common/utxobased/engine/ServerStates.ts +++ b/src/common/utxobased/engine/ServerStates.ts @@ -23,7 +23,7 @@ import { pushUpdate, removeIdFromQueue } from '../network/socketQueue' import { MAX_CONNECTIONS, NEW_CONNECTIONS } from './constants' import { UtxoInitOptions } from './types' -interface ServerState { +export interface ServerState { blockbook: Blockbook blockSubscriptionStatus: 'unsubscribed' | 'subscribing' | 'subscribed' blockHeight: number diff --git a/src/common/utxobased/engine/UtxoEngineProcessor.ts b/src/common/utxobased/engine/UtxoEngineProcessor.ts index b51fa9a3..75d9bc88 100644 --- a/src/common/utxobased/engine/UtxoEngineProcessor.ts +++ b/src/common/utxobased/engine/UtxoEngineProcessor.ts @@ -42,7 +42,7 @@ import Deferred from '../network/Deferred' import { WsTask } from '../network/Socket' import AwaitLock from './await-lock' import { CACHE_THROTTLE } from './constants' -import { makeServerStates, ServerStates } from './ServerStates' +import { makeServerStates, ServerState, ServerStates } from './ServerStates' import { getFormatSupportedBranches, getScriptTypeFromPurposeType, @@ -855,11 +855,10 @@ export const pickNextTask = async ( } cacheItem.processing = true removeItem(blockbookUtxoCache, utxoId) - const wsTask = await processBlockbookUtxo(common, { + return await processBlockbookUtxo(common, { serverUri, cacheItem }) - return wsTask } } @@ -875,21 +874,12 @@ export const pickNextTask = async ( removeItem(addressForUtxosCache, address) // Fetch and process address for UTXOs - const wsTask = await processAddressForUtxos(common, { + return await processAddressForUtxos(common, { address, cacheItem, + serverState, serverUri }) - wsTask.deferred.promise - .then(() => { - serverState.addresses.add(address) - }) - .catch(err => { - addressForUtxosCache[address] = cacheItem - console.error(err) - common.log('error while processing address for UTXOs:', err) - }) - return wsTask } } @@ -952,24 +942,11 @@ export const pickNextTask = async ( hasProcessedAtLeastOnce = true cacheItem.processing = true removeItem(transactionSpecificUpdateCache, txId) - const updateTransactionSpecificTask = processTransactionsSpecificUpdate( - common, - { serverUri, txId } - ) - // once resolved, add the txid to the server cache - updateTransactionSpecificTask.deferred.promise - .then(() => { - serverState.txids.add(txId) - }) - .catch(err => { - transactionSpecificUpdateCache[txId] = cacheItem - console.error(err) - common.log( - 'error while processing transaction specific update:', - err - ) - }) - return updateTransactionSpecificTask + return processTransactionsSpecificUpdate(common, { + serverState, + serverUri, + txId + }) } } // This condition prevents infinite loops @@ -987,21 +964,11 @@ export const pickNextTask = async ( hasProcessedAtLeastOnce = true cacheItem.processing = true removeItem(transactionUpdateCache, txId) - const updateTransactionTask = processTransactionUpdate(common, { + return processTransactionUpdate(common, { + serverState, serverUri, txId }) - // once resolved, add the txid to the server cache - updateTransactionTask.deferred.promise - .then(() => { - serverState.txids.add(txId) - }) - .catch(err => { - transactionUpdateCache[txId] = cacheItem - console.error(err) - common.log('error while processing transaction update:', err) - }) - return updateTransactionTask } } // This condition prevents infinite loops @@ -1021,21 +988,12 @@ export const pickNextTask = async ( removeItem(addressForTransactionsCache, address) // Fetch and process address UTXOs - const wsTask = await processAddressForTransactions(common, { + return await processAddressForTransactions(common, { address, cacheItem, + serverState, serverUri }) - wsTask.deferred.promise - .then(() => { - serverState.addresses.add(address) - }) - .catch(err => { - addressForTransactionsCache[address] = cacheItem - console.error(err) - common.log('error while processing address for transactions:', err) - }) - return wsTask } } @@ -1050,9 +1008,13 @@ export const pickNextTask = async ( */ const processTransactionsSpecificUpdate = ( common: CommonParams, - args: { serverUri: string; txId: string } + args: { + serverState: ServerState + serverUri: string + txId: string + } ): WsTask => { - const { serverUri, txId } = args + const { serverState, serverUri, txId } = args const deferred = new Deferred() deferred.promise .then(async (txResponse: unknown) => { @@ -1079,6 +1041,9 @@ const processTransactionsSpecificUpdate = ( pluginInfo: common.pluginInfo, tx: processedTx }) + + // Add the txid to the server cache + serverState.txids.add(txId) }) .catch(err => { console.error(err) @@ -1102,11 +1067,12 @@ const processTransactionsSpecificUpdate = ( const processTransactionUpdate = ( common: CommonParams, args: { + serverState: ServerState serverUri: string txId: string } ): WsTask => { - const { serverUri, txId } = args + const { serverState, serverUri, txId } = args const deferred = new Deferred() deferred.promise .then(async (txResponse: TransactionResponse) => { @@ -1150,6 +1116,9 @@ const processTransactionUpdate = ( processing: false } } + + // Add the txid to the server cache + serverState.txids.add(txId) }) .catch(err => { console.error(err) @@ -1169,10 +1138,11 @@ const processAddressForTransactions = async ( args: { address: string cacheItem: AddressForTransactionsCache[string] + serverState: ServerState serverUri: string } ): Promise> => { - const { address, cacheItem, serverUri } = args + const { address, cacheItem, serverState, serverUri } = args const { page = 1, blockHeight } = cacheItem const addressForTransactionsCache = common.taskCache.addressForTransactionsCache @@ -1241,12 +1211,16 @@ const processAddressForTransactions = async ( // Call setLookAhead to update the lookahead await setLookAhead(common) + + // Add the address to the server cache + serverState.addresses.add(address) }) .catch(err => { // Log the error for debugging purposes without crashing the engine // This will cause frozen wallet syncs console.error(err) - addressForTransactionsCache[address] = cacheItem + common.log('error while processing address for transactions:', err) + addressForTransactionsCache[address] = { ...cacheItem, processing: false } }) return common.serverStates.addressQueryTask( @@ -1332,10 +1306,11 @@ const processAddressForUtxos = async ( args: { address: string cacheItem: AddressForUtxosCache[string] + serverState: ServerState serverUri: string } ): Promise> => { - const { address, cacheItem, serverUri } = args + const { address, cacheItem, serverState, serverUri } = args const { addressForUtxosCache, blockbookUtxoCache, @@ -1371,13 +1346,13 @@ const processAddressForUtxos = async ( address: { ...addressData, path: addressData.path } } } + + serverState.addresses.add(address) }) - .catch(() => { - cacheItem.processing = false - addressForUtxosCache[address] = { - processing: cacheItem.processing, - path: cacheItem.path - } + .catch((err: unknown) => { + console.error(err) + common.log('error while processing address for UTXOs:', err) + addressForUtxosCache[address] = { ...cacheItem, processing: false } }) return common.serverStates.utxoListQueryTask(serverUri, address, deferred)