Skip to content

Commit

Permalink
Move serverState cache processing to each process function
Browse files Browse the repository at this point in the history
This cleans up the pickNextTask function and also removes redundant
error handling for each "process" function.
  • Loading branch information
samholmes committed Jul 31, 2024
1 parent efd805d commit 6242154
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 68 deletions.
2 changes: 1 addition & 1 deletion src/common/utxobased/engine/ServerStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import { pushUpdate, removeIdFromQueue } from '../network/socketQueue'
import { MAX_CONNECTIONS, NEW_CONNECTIONS } from './constants'
import { UtxoInitOptions } from './types'

interface ServerState {
export interface ServerState {
blockbook: Blockbook
blockSubscriptionStatus: 'unsubscribed' | 'subscribing' | 'subscribed'
blockHeight: number
Expand Down
109 changes: 42 additions & 67 deletions src/common/utxobased/engine/UtxoEngineProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import Deferred from '../network/Deferred'
import { WsTask } from '../network/Socket'
import AwaitLock from './await-lock'
import { CACHE_THROTTLE } from './constants'
import { makeServerStates, ServerStates } from './ServerStates'
import { makeServerStates, ServerState, ServerStates } from './ServerStates'
import {
getFormatSupportedBranches,
getScriptTypeFromPurposeType,
Expand Down Expand Up @@ -855,11 +855,10 @@ export const pickNextTask = async (
}
cacheItem.processing = true
removeItem(blockbookUtxoCache, utxoId)
const wsTask = await processBlockbookUtxo(common, {
return await processBlockbookUtxo(common, {
serverUri,
cacheItem
})
return wsTask
}
}

Expand All @@ -875,21 +874,12 @@ export const pickNextTask = async (
removeItem(addressForUtxosCache, address)

// Fetch and process address for UTXOs
const wsTask = await processAddressForUtxos(common, {
return await processAddressForUtxos(common, {
address,
cacheItem,
serverState,
serverUri
})
wsTask.deferred.promise
.then(() => {
serverState.addresses.add(address)
})
.catch(err => {
addressForUtxosCache[address] = cacheItem
console.error(err)
common.log('error while processing address for UTXOs:', err)
})
return wsTask
}
}

Expand Down Expand Up @@ -952,24 +942,11 @@ export const pickNextTask = async (
hasProcessedAtLeastOnce = true
cacheItem.processing = true
removeItem(transactionSpecificUpdateCache, txId)
const updateTransactionSpecificTask = processTransactionsSpecificUpdate(
common,
{ serverUri, txId }
)
// once resolved, add the txid to the server cache
updateTransactionSpecificTask.deferred.promise
.then(() => {
serverState.txids.add(txId)
})
.catch(err => {
transactionSpecificUpdateCache[txId] = cacheItem
console.error(err)
common.log(
'error while processing transaction specific update:',
err
)
})
return updateTransactionSpecificTask
return processTransactionsSpecificUpdate(common, {
serverState,
serverUri,
txId
})
}
}
// This condition prevents infinite loops
Expand All @@ -987,21 +964,11 @@ export const pickNextTask = async (
hasProcessedAtLeastOnce = true
cacheItem.processing = true
removeItem(transactionUpdateCache, txId)
const updateTransactionTask = processTransactionUpdate(common, {
return processTransactionUpdate(common, {
serverState,
serverUri,
txId
})
// once resolved, add the txid to the server cache
updateTransactionTask.deferred.promise
.then(() => {
serverState.txids.add(txId)
})
.catch(err => {
transactionUpdateCache[txId] = cacheItem
console.error(err)
common.log('error while processing transaction update:', err)
})
return updateTransactionTask
}
}
// This condition prevents infinite loops
Expand All @@ -1021,21 +988,12 @@ export const pickNextTask = async (
removeItem(addressForTransactionsCache, address)

// Fetch and process address UTXOs
const wsTask = await processAddressForTransactions(common, {
return await processAddressForTransactions(common, {
address,
cacheItem,
serverState,
serverUri
})
wsTask.deferred.promise
.then(() => {
serverState.addresses.add(address)
})
.catch(err => {
addressForTransactionsCache[address] = cacheItem
console.error(err)
common.log('error while processing address for transactions:', err)
})
return wsTask
}
}

Expand All @@ -1050,9 +1008,13 @@ export const pickNextTask = async (
*/
const processTransactionsSpecificUpdate = (
common: CommonParams,
args: { serverUri: string; txId: string }
args: {
serverState: ServerState
serverUri: string
txId: string
}
): WsTask<unknown> => {
const { serverUri, txId } = args
const { serverState, serverUri, txId } = args
const deferred = new Deferred<unknown>()
deferred.promise
.then(async (txResponse: unknown) => {
Expand All @@ -1079,6 +1041,9 @@ const processTransactionsSpecificUpdate = (
pluginInfo: common.pluginInfo,
tx: processedTx
})

// Add the txid to the server cache
serverState.txids.add(txId)
})
.catch(err => {
console.error(err)
Expand All @@ -1102,11 +1067,12 @@ const processTransactionsSpecificUpdate = (
const processTransactionUpdate = (
common: CommonParams,
args: {
serverState: ServerState
serverUri: string
txId: string
}
): WsTask<TransactionResponse> => {
const { serverUri, txId } = args
const { serverState, serverUri, txId } = args
const deferred = new Deferred<TransactionResponse>()
deferred.promise
.then(async (txResponse: TransactionResponse) => {
Expand Down Expand Up @@ -1150,6 +1116,9 @@ const processTransactionUpdate = (
processing: false
}
}

// Add the txid to the server cache
serverState.txids.add(txId)
})
.catch(err => {
console.error(err)
Expand All @@ -1169,10 +1138,11 @@ const processAddressForTransactions = async (
args: {
address: string
cacheItem: AddressForTransactionsCache[string]
serverState: ServerState
serverUri: string
}
): Promise<WsTask<AddressResponse>> => {
const { address, cacheItem, serverUri } = args
const { address, cacheItem, serverState, serverUri } = args
const { page = 1, blockHeight } = cacheItem
const addressForTransactionsCache =
common.taskCache.addressForTransactionsCache
Expand Down Expand Up @@ -1241,12 +1211,16 @@ const processAddressForTransactions = async (

// Call setLookAhead to update the lookahead
await setLookAhead(common)

// Add the address to the server cache
serverState.addresses.add(address)
})
.catch(err => {
// Log the error for debugging purposes without crashing the engine
// This will cause frozen wallet syncs
console.error(err)
addressForTransactionsCache[address] = cacheItem
common.log('error while processing address for transactions:', err)
addressForTransactionsCache[address] = { ...cacheItem, processing: false }
})

return common.serverStates.addressQueryTask(
Expand Down Expand Up @@ -1332,10 +1306,11 @@ const processAddressForUtxos = async (
args: {
address: string
cacheItem: AddressForUtxosCache[string]
serverState: ServerState
serverUri: string
}
): Promise<WsTask<AddressUtxosResponse>> => {
const { address, cacheItem, serverUri } = args
const { address, cacheItem, serverState, serverUri } = args
const {
addressForUtxosCache,
blockbookUtxoCache,
Expand Down Expand Up @@ -1371,13 +1346,13 @@ const processAddressForUtxos = async (
address: { ...addressData, path: addressData.path }
}
}

serverState.addresses.add(address)
})
.catch(() => {
cacheItem.processing = false
addressForUtxosCache[address] = {
processing: cacheItem.processing,
path: cacheItem.path
}
.catch((err: unknown) => {
console.error(err)
common.log('error while processing address for UTXOs:', err)
addressForUtxosCache[address] = { ...cacheItem, processing: false }
})

return common.serverStates.utxoListQueryTask(serverUri, address, deferred)
Expand Down

0 comments on commit 6242154

Please sign in to comment.