From d3642c4598b9d13eb2b5d644d867ab9dee7a20aa Mon Sep 17 00:00:00 2001 From: Sam Holmes Date: Tue, 30 Jul 2024 17:21:28 -0700 Subject: [PATCH] Convert pickNextTask to an AsyncGenerator --- CHANGELOG.md | 1 + src/common/utxobased/engine/ServerStates.ts | 164 +++--- .../utxobased/engine/UtxoEngineProcessor.ts | 488 +++++++++--------- src/common/utxobased/network/Blockbook.ts | 82 +-- .../utxobased/network/BlockbookElectrum.ts | 154 +++--- src/common/utxobased/network/Socket.ts | 83 +-- .../utxobased/network/Blockbook.spec.ts | 12 +- 7 files changed, 527 insertions(+), 457 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d09ba9a4..516163a1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ## Unreleased - fixed: Ensure server scoring is done on all server tasks +- changed: Refactor pickNextTask to implement generators to replace Deferred promises ## 3.2.1 (2024-07-19) diff --git a/src/common/utxobased/engine/ServerStates.ts b/src/common/utxobased/engine/ServerStates.ts index 04c41bca..70fcc8d3 100644 --- a/src/common/utxobased/engine/ServerStates.ts +++ b/src/common/utxobased/engine/ServerStates.ts @@ -17,7 +17,7 @@ import { } from '../network/blockbookApi' import { makeBlockbookElectrum } from '../network/BlockbookElectrum' import Deferred from '../network/Deferred' -import { WsTask } from '../network/Socket' +import { WsTask, WsTaskGenerator } from '../network/Socket' import { SocketEmitter, SocketEvent } from '../network/SocketEmitter' import { pushUpdate, removeIdFromQueue } from '../network/socketQueue' import { MAX_CONNECTIONS, NEW_CONNECTIONS } from './constants' @@ -43,7 +43,7 @@ interface ServerStateConfig { export interface ServerStates { setPickNextTaskCB: ( - callback: (uri: string) => Promise | boolean> + callback: (uri: string) => AsyncGenerator | boolean> ) => void stop: () => void serverCanGetTx: (uri: string, txid: string) => boolean @@ -69,27 +69,23 @@ export interface ServerStates { addressQueryTask: ( serverUri: string, address: string, - params: { lastQueriedBlockHeight: number; page: number }, - deferred: Deferred - ) => WsTask + params: { lastQueriedBlockHeight: number; page: number } + ) => WsTaskGenerator transactionQueryTask: ( serverUri: string, - txId: string, - deferred: Deferred - ) => WsTask + txId: string + ) => WsTaskGenerator transactionSpecialQueryTask: ( serverUri: string, - txId: string, - deferred: Deferred - ) => WsTask + txId: string + ) => WsTaskGenerator utxoListQueryTask: ( serverUri: string, - address: string, - deferred: Deferred - ) => WsTask + address: string + ) => WsTaskGenerator } interface ServerStatesCache { @@ -165,7 +161,9 @@ export function makeServerStates(config: ServerStateConfig): ServerStates { } ) - let pickNextTaskCB: (uri: string) => Promise | boolean> + let pickNextTaskCB: ( + uri: string + ) => AsyncGenerator | boolean, boolean> const makeServerStatesCacheEntry = (blockbook: Blockbook): ServerState => ({ blockbook, @@ -225,18 +223,35 @@ export function makeServerStates(config: ServerStateConfig): ServerStates { let blockbook: Blockbook // Queue space callback - const onQueueSpace = async (): Promise | boolean> => { + async function* taskGeneratorFn(): AsyncGenerator< + WsTask | boolean, + boolean + > { // Exit if the connection is no longer active - if (!(uri in serverStatesCache)) return false - - const task = await pickNextTaskCB(uri) - if (typeof task !== 'boolean') { - const taskMessage = `${task.method} params: ${JSON.stringify( - task.params - )}` - log(`${uri} nextTask: ${taskMessage}`) + if (uri == null || !(uri in serverStatesCache)) return false + + const generator = pickNextTaskCB(uri) + let nextValue: unknown + while (true) { + const result: IteratorResult< + WsTask | boolean, + boolean + > = await generator.next(nextValue) + + if (result?.done === true) { + return result.value + } + + const task = result.value + + if (typeof task !== 'boolean') { + const taskMessage = `${task.method} params: ${JSON.stringify( + task.params + )}` + log(`${uri} nextTask: ${taskMessage}`) + } + nextValue = yield task } - return task } // Create a new blockbook instance based on the URI scheme @@ -247,7 +262,7 @@ export function makeServerStates(config: ServerStateConfig): ServerStates { connectionUri: uri, engineEmitter, log, - onQueueSpace, + taskGeneratorFn, pluginInfo, socketEmitter, walletId: walletInfo.id @@ -259,7 +274,7 @@ export function makeServerStates(config: ServerStateConfig): ServerStates { connectionUri: uri, engineEmitter, log, - onQueueSpace, + taskGeneratorFn, socketEmitter, walletId: walletInfo.id }) @@ -317,6 +332,26 @@ export function makeServerStates(config: ServerStateConfig): ServerStates { return deferredWithScoring } + function* withServerScoring( + serverUri: string, + generator: Generator + ): Generator { + const serverState = serverStatesCache[serverUri] + if (serverState == null) + throw new Error(`No blockbook connection with ${serverUri}`) + + const queryTime = Date.now() + let result: R + try { + result = yield* generator + pluginState.serverScoreUp(serverUri, Date.now() - queryTime) + } catch (error: unknown) { + pluginState.serverScoreDown(serverUri) + throw error + } + return result + } + const instance: ServerStates = { async broadcastTx(transaction: EdgeTransaction): Promise { return await new Promise((resolve, reject) => { @@ -460,9 +495,7 @@ export function makeServerStates(config: ServerStateConfig): ServerStates { }) }, - setPickNextTaskCB( - callback: (uri: string) => Promise | boolean> - ): void { + setPickNextTaskCB(callback): void { pickNextTaskCB = callback }, @@ -569,71 +602,64 @@ export function makeServerStates(config: ServerStateConfig): ServerStates { // Task Methods: // - addressQueryTask( + addressQueryTask: function* ( serverUri: string, address: string, - params: { lastQueriedBlockHeight: number; page: number }, - deferred: Deferred - ): WsTask { + params: { lastQueriedBlockHeight: number; page: number } + ) { const serverState = serverStatesCache[serverUri] if (serverState == null) throw new Error(`No blockbook connection with ${serverUri}`) - return serverState.blockbook.addressQueryTask( - address, - { - asBlockbookAddress: pluginInfo.engineInfo.asBlockbookAddress, - lastQueriedBlockHeight: params.lastQueriedBlockHeight, - page: params.page - }, - deferredWithServerScoring(serverUri, deferred) + return yield* withServerScoring( + serverUri, + (function* () { + return yield* serverState.blockbook.addressQueryTask(address, { + asBlockbookAddress: pluginInfo.engineInfo.asBlockbookAddress, + lastQueriedBlockHeight: params.lastQueriedBlockHeight, + page: params.page + }) + })() ) }, - transactionQueryTask( - serverUri: string, - txId: string, - deferred: Deferred - ): WsTask { + transactionQueryTask: function* (serverUri: string, txId: string) { const serverState = serverStatesCache[serverUri] if (serverState == null) throw new Error(`No blockbook connection with ${serverUri}`) - return serverState.blockbook.transactionQueryTask( - txId, - deferredWithServerScoring(serverUri, deferred) + return yield* withServerScoring( + serverUri, + (function* () { + return yield* serverState.blockbook.transactionQueryTask(txId) + })() ) }, - transactionSpecialQueryTask( - serverUri: string, - txId: string, - deferred: Deferred - ): WsTask { + transactionSpecialQueryTask: function* (serverUri: string, txId: string) { const serverState = serverStatesCache[serverUri] if (serverState == null) throw new Error(`No blockbook connection with ${serverUri}`) - return serverState.blockbook.transactionSpecialQueryTask( - txId, - deferredWithServerScoring(serverUri, deferred) + return yield* withServerScoring( + serverUri, + (function* () { + return serverState.blockbook.transactionSpecialQueryTask(txId) + })() ) }, - utxoListQueryTask( - serverUri: string, - address: string, - deferred: Deferred - ): WsTask { + utxoListQueryTask: function* (serverUri: string, address: string) { const serverState = serverStatesCache[serverUri] if (serverState == null) throw new Error(`No blockbook connection with ${serverUri}`) - return serverState.blockbook.utxoListQueryTask( - address, - { - asBlockbookAddress: pluginInfo.engineInfo.asBlockbookAddress - }, - deferredWithServerScoring(serverUri, deferred) + return yield* withServerScoring( + serverUri, + (function* () { + return yield* serverState.blockbook.utxoListQueryTask(address, { + asBlockbookAddress: pluginInfo.engineInfo.asBlockbookAddress + }) + })() ) } } diff --git a/src/common/utxobased/engine/UtxoEngineProcessor.ts b/src/common/utxobased/engine/UtxoEngineProcessor.ts index 75d9bc88..c8bc4363 100644 --- a/src/common/utxobased/engine/UtxoEngineProcessor.ts +++ b/src/common/utxobased/engine/UtxoEngineProcessor.ts @@ -38,8 +38,7 @@ import { SubscribeAddressResponse, TransactionResponse } from '../network/blockbookApi' -import Deferred from '../network/Deferred' -import { WsTask } from '../network/Socket' +import { WsTask, WsTaskAsyncGenerator } from '../network/Socket' import AwaitLock from './await-lock' import { CACHE_THROTTLE } from './constants' import { makeServerStates, ServerState, ServerStates } from './ServerStates' @@ -196,8 +195,8 @@ export function makeUtxoEngineProcessor( lock } - serverStates.setPickNextTaskCB(async serverUri => { - return await pickNextTask(common, serverUri) + serverStates.setPickNextTaskCB(serverUri => { + return pickNextTask(common, serverUri) }) let running = false @@ -794,10 +793,22 @@ const needsTxSpecific = (common: CommonParams): boolean => { return common.pluginInfo.engineInfo.txSpecificHandling != null } -export const pickNextTask = async ( +/** + * The pickNextTask generator is responsible for selecting the next WsTask + * to be done by the network layer. It will yield WsTasks or booleans for early + * exit, and it will return a boolean when it's done. + */ +type PickNextTaskGenerator = AsyncGenerator< + boolean | WsTask, + boolean, + // Expect only these responses from yielding: + AddressUtxosResponse & AddressResponse & TransactionResponse +> + +export async function* pickNextTask( common: CommonParams, serverUri: string -): Promise | boolean> => { +): PickNextTaskGenerator { const { addressForTransactionsCache, addressForUtxosCache, @@ -855,7 +866,7 @@ export const pickNextTask = async ( } cacheItem.processing = true removeItem(blockbookUtxoCache, utxoId) - return await processBlockbookUtxo(common, { + yield* processBlockbookUtxo(common, { serverUri, cacheItem }) @@ -874,7 +885,7 @@ export const pickNextTask = async ( removeItem(addressForUtxosCache, address) // Fetch and process address for UTXOs - return await processAddressForUtxos(common, { + yield* processAddressForUtxos(common, { address, cacheItem, serverState, @@ -942,7 +953,7 @@ export const pickNextTask = async ( hasProcessedAtLeastOnce = true cacheItem.processing = true removeItem(transactionSpecificUpdateCache, txId) - return processTransactionsSpecificUpdate(common, { + yield* processTransactionsSpecificUpdate(common, { serverState, serverUri, txId @@ -964,7 +975,7 @@ export const pickNextTask = async ( hasProcessedAtLeastOnce = true cacheItem.processing = true removeItem(transactionUpdateCache, txId) - return processTransactionUpdate(common, { + yield* processTransactionUpdate(common, { serverState, serverUri, txId @@ -988,7 +999,7 @@ export const pickNextTask = async ( removeItem(addressForTransactionsCache, address) // Fetch and process address UTXOs - return await processAddressForTransactions(common, { + yield* processAddressForTransactions(common, { address, cacheItem, serverState, @@ -1006,134 +1017,133 @@ export const pickNextTask = async ( * by querying the network for the transaction data using the specific handling * query and processing the data into the DataLayer. */ -const processTransactionsSpecificUpdate = ( +async function* processTransactionsSpecificUpdate( common: CommonParams, args: { serverState: ServerState serverUri: string txId: string } -): WsTask => { +): WsTaskAsyncGenerator { const { serverState, serverUri, txId } = args - const deferred = new Deferred() - deferred.promise - .then(async (txResponse: unknown) => { - // Grab tx to update it - const txs = await common.dataLayer.fetchTransactions({ txId }) - let tx = txs[0] - if (tx == null) return - - if (common.pluginInfo.engineInfo.txSpecificHandling != null) { - // Do coin-specific things to it - tx = common.pluginInfo.engineInfo.txSpecificHandling(tx, txResponse) - } - - // Process and save new tx - const processedTx = await common.dataLayer.saveTransaction({ - tx - }) - - await transactionChanged({ - walletId: common.walletInfo.id, - emitter: common.emitter, - walletTools: common.walletTools, - dataLayer: common.dataLayer, - pluginInfo: common.pluginInfo, - tx: processedTx - }) + try { + const txResponse: unknown = yield* common.serverStates.transactionSpecialQueryTask( + serverUri, + txId + ) + // Grab tx to update it + const txs = await common.dataLayer.fetchTransactions({ txId }) + let tx = txs[0] + if (tx == null) return true + + if (common.pluginInfo.engineInfo.txSpecificHandling != null) { + // Do coin-specific things to it + tx = common.pluginInfo.engineInfo.txSpecificHandling(tx, txResponse) + } - // Add the txid to the server cache - serverState.txids.add(txId) + // Process and save new tx + const processedTx = await common.dataLayer.saveTransaction({ + tx }) - .catch(err => { - console.error(err) - common.log('error while processing transaction specific update:', err) - common.taskCache.transactionSpecificUpdateCache[txId] = { - processing: false - } + + await transactionChanged({ + walletId: common.walletInfo.id, + emitter: common.emitter, + walletTools: common.walletTools, + dataLayer: common.dataLayer, + pluginInfo: common.pluginInfo, + tx: processedTx }) - return common.serverStates.transactionSpecialQueryTask( - serverUri, - txId, - deferred - ) + // Add the txid to the server cache + serverState.txids.add(txId) + + return true + } catch (err) { + console.error(err) + common.log('error while processing transaction specific update:', err) + common.taskCache.transactionSpecificUpdateCache[txId] = { + processing: false + } + return false + } } /** * Processes a transaction update from the TransactionUpdateCache by querying * the network for the transaction data and processing it into the DataLayer. */ -const processTransactionUpdate = ( +async function* processTransactionUpdate( common: CommonParams, args: { serverState: ServerState serverUri: string txId: string } -): WsTask => { +): WsTaskAsyncGenerator { const { serverState, serverUri, txId } = args - const deferred = new Deferred() - deferred.promise - .then(async (txResponse: TransactionResponse) => { - // check if raw tx is still not confirmed, if so, don't change anything - if (txResponse.blockHeight < 1) return - // Create new tx from raw tx - const tx = processTransactionResponse(common, { txResponse }) - // Remove any existing input utxos from the dataLayer - for (const input of tx.inputs) { - await common.dataLayer.removeUtxos([ - `${input.txId}_${input.outputIndex}` - ]) - } - // Update output utxos's blockHeight any existing input utxos from the dataLayer - const utxoIds = tx.outputs.map(output => `${tx.txid}_${output.n}`) - const utxos = await common.dataLayer.fetchUtxos({ - utxoIds - }) - for (const utxo of utxos) { - if (utxo == null) continue - utxo.blockHeight = tx.blockHeight - await common.dataLayer.saveUtxo(utxo) - } - // Process and save new tx - const processedTx = await common.dataLayer.saveTransaction({ - tx - }) - await transactionChanged({ - walletId: common.walletInfo.id, - emitter: common.emitter, - walletTools: common.walletTools, - dataLayer: common.dataLayer, - pluginInfo: common.pluginInfo, - tx: processedTx - }) + try { + const txResponse: TransactionResponse = yield* common.serverStates.transactionQueryTask( + serverUri, + txId + ) + // check if blockbook tx is still not confirmed, if so, don't change anything + if (txResponse.blockHeight < 1) return false + // Create new tx from raw tx + const tx = processTransactionResponse(common, { txResponse }) + // Remove any existing input utxos from the dataLayer + for (const input of tx.inputs) { + await common.dataLayer.removeUtxos([`${input.txId}_${input.outputIndex}`]) + } + // Update output utxos's blockHeight any existing input utxos from the dataLayer + const utxoIds = tx.outputs.map(output => `${tx.txid}_${output.n}`) + const utxos = await common.dataLayer.fetchUtxos({ + utxoIds + }) + for (const utxo of utxos) { + if (utxo == null) continue + utxo.blockHeight = tx.blockHeight + await common.dataLayer.saveUtxo(utxo) + } + // Process and save new tx + const processedTx = await common.dataLayer.saveTransaction({ + tx + }) - if (needsTxSpecific(common)) { - // Add task to grab transactionSpecific payload - common.taskCache.transactionSpecificUpdateCache[txId] = { - processing: false - } + await transactionChanged({ + walletId: common.walletInfo.id, + emitter: common.emitter, + walletTools: common.walletTools, + dataLayer: common.dataLayer, + pluginInfo: common.pluginInfo, + tx: processedTx + }) + + if (needsTxSpecific(common)) { + // Add task to grab transactionSpecific payload + common.taskCache.transactionSpecificUpdateCache[txId] = { + processing: false } + } - // Add the txid to the server cache - serverState.txids.add(txId) - }) - .catch(err => { - console.error(err) - common.log('error while processing transaction update:', err) - common.taskCache.transactionUpdateCache[txId] = { processing: false } - }) + // Add the txid to the server cache + serverState.txids.add(txId) - return common.serverStates.transactionQueryTask(serverUri, txId, deferred) + return true + } catch (err) { + console.error(err) + common.log('error while processing transaction update:', err) + common.taskCache.transactionUpdateCache[txId] = { processing: false } + return false + } } /** * Processes an address for transactions by querying the network for the * transaction data. */ -const processAddressForTransactions = async ( +async function* processAddressForTransactions( common: CommonParams, args: { address: string @@ -1141,7 +1151,7 @@ const processAddressForTransactions = async ( serverState: ServerState serverUri: string } -): Promise> => { +): WsTaskAsyncGenerator { const { address, cacheItem, serverState, serverUri } = args const { page = 1, blockHeight } = cacheItem const addressForTransactionsCache = @@ -1153,85 +1163,88 @@ const processAddressForTransactions = async ( throw new Error(`could not find address with script pubkey ${scriptPubkey}`) } - const deferred = new Deferred() - deferred.promise - .then(async (value: AddressResponse) => { - const { transactions = [], txs, unconfirmedTxs, totalPages } = value - - // If address is used and previously not marked as used, mark as used. - const used = txs > 0 || unconfirmedTxs > 0 - if (!addressData.used && used && page === 1) { - addressData.used = true + try { + const addressResponse = yield* common.serverStates.addressQueryTask( + serverUri, + address, + { + lastQueriedBlockHeight: addressData.lastQueriedBlockHeight, + page } + ) + const { + transactions = [], + txs, + unconfirmedTxs, + totalPages + } = addressResponse + + // If address is used and previously not marked as used, mark as used. + const used = txs > 0 || unconfirmedTxs > 0 + if (!addressData.used && used && page === 1) { + addressData.used = true + } - // Process and save the address's transactions - for (const txResponse of transactions) { - const tx = processTransactionResponse(common, { txResponse }) - const processedTx = await common.dataLayer.saveTransaction({ - tx, - scriptPubkeys: [scriptPubkey] - }) - await transactionChanged({ - walletId: common.walletInfo.id, - emitter: common.emitter, - walletTools: common.walletTools, - dataLayer: common.dataLayer, - pluginInfo: common.pluginInfo, - tx: processedTx - }) + // Process and save the address's transactions + for (const txResponse of transactions) { + const tx = processTransactionResponse(common, { txResponse }) + const processedTx = await common.dataLayer.saveTransaction({ + tx, + scriptPubkeys: [scriptPubkey] + }) + await transactionChanged({ + walletId: common.walletInfo.id, + emitter: common.emitter, + walletTools: common.walletTools, + dataLayer: common.dataLayer, + pluginInfo: common.pluginInfo, + tx: processedTx + }) - if (needsTxSpecific(common)) { - // Add task to grab transactionSpecific payload - common.taskCache.transactionSpecificUpdateCache[tx.txid] = { - processing: false - } + if (needsTxSpecific(common)) { + // Add task to grab transactionSpecific payload + common.taskCache.transactionSpecificUpdateCache[tx.txid] = { + processing: false } } + } - // Halt on finishing the processing of address transaction until - // we have progressed through all of the blockbook pages - if (page < totalPages) { - // Add the address back to the cache, incrementing the page - addressForTransactionsCache[address] = { - ...cacheItem, - processing: false, - page: page + 1 - } - return + // Halt on finishing the processing of address transaction until + // we have progressed through all of the blockbook pages + if (page < totalPages) { + // Add the address back to the cache, incrementing the page + addressForTransactionsCache[address] = { + ...cacheItem, + processing: false, + page: page + 1 } + return true + } - // Update the lastQueriedBlockHeight for the address - addressData.lastQueriedBlockHeight = blockHeight + // Update the lastQueriedBlockHeight for the address + addressData.lastQueriedBlockHeight = blockHeight - // Save/update the fully-processed address - await common.dataLayer.saveAddress(addressData) + // Save/update the fully-processed address + await common.dataLayer.saveAddress(addressData) - // Update the progress now that the transactions for an address have processed - await common.updateProgressRatio() + // Update the progress now that the transactions for an address have processed + await common.updateProgressRatio() - // Call setLookAhead to update the lookahead - await setLookAhead(common) + // Call setLookAhead to update the lookahead + await setLookAhead(common) - // Add the address to the server cache - serverState.addresses.add(address) - }) - .catch(err => { - // Log the error for debugging purposes without crashing the engine - // This will cause frozen wallet syncs - console.error(err) - common.log('error while processing address for transactions:', err) - addressForTransactionsCache[address] = { ...cacheItem, processing: false } - }) + // Add the address to the server cache + serverState.addresses.add(address) - return common.serverStates.addressQueryTask( - serverUri, - address, - { - lastQueriedBlockHeight: addressData.lastQueriedBlockHeight, - page - }, - deferred - ) + return true + } catch (err) { + // Log the error for debugging purposes without crashing the engine + // This will cause frozen wallet syncs + console.error(err) + common.log('error while processing address for transactions:', err) + addressForTransactionsCache[address] = { ...cacheItem, processing: false } + return false + } } /** @@ -1301,7 +1314,7 @@ const processTransactionResponse = ( * and processing it into the `blockbookUtxoCache` to later be processed by * `processBlockbookUtxo`. */ -const processAddressForUtxos = async ( +async function* processAddressForUtxos( common: CommonParams, args: { address: string @@ -1309,53 +1322,55 @@ const processAddressForUtxos = async ( serverState: ServerState serverUri: string } -): Promise> => { +): WsTaskAsyncGenerator { const { address, cacheItem, serverState, serverUri } = args const { addressForUtxosCache, blockbookUtxoCache, dataLayerUtxoCache } = common.taskCache - const deferred = new Deferred() - deferred.promise - .then(async (utxos: AddressUtxosResponse) => { - const scriptPubkey = common.walletTools.addressToScriptPubkey(address) - const addressData = await common.dataLayer.fetchAddress(scriptPubkey) - if (addressData == null || addressData.path == null) { - return - } + try { + const utxos: AddressUtxosResponse = yield* common.serverStates.utxoListQueryTask( + serverUri, + address + ) + const scriptPubkey = common.walletTools.addressToScriptPubkey(address) + const addressData = await common.dataLayer.fetchAddress(scriptPubkey) + if (addressData == null || addressData.path == null) { + return true + } - if (utxos.length === 0) { - addToDataLayerUtxoCache( - dataLayerUtxoCache, - cacheItem.path, - scriptPubkey, - 0 - ) - return - } + if (utxos.length === 0) { + addToDataLayerUtxoCache( + dataLayerUtxoCache, + cacheItem.path, + scriptPubkey, + 0 + ) + return true + } - for (const utxo of utxos) { - const utxoId = `${utxo.txid}_${utxo.vout}` - blockbookUtxoCache[utxoId] = { - blockbookUtxo: utxo, - processing: false, - requiredCount: utxos.length, - path: cacheItem.path, - // TypeScript yells otherwise - address: { ...addressData, path: addressData.path } - } + for (const utxo of utxos) { + const utxoId = `${utxo.txid}_${utxo.vout}` + blockbookUtxoCache[utxoId] = { + blockbookUtxo: utxo, + processing: false, + requiredCount: utxos.length, + path: cacheItem.path, + // TypeScript yells otherwise + address: { ...addressData, path: addressData.path } } + } - serverState.addresses.add(address) - }) - .catch((err: unknown) => { - console.error(err) - common.log('error while processing address for UTXOs:', err) - addressForUtxosCache[address] = { ...cacheItem, processing: false } - }) + serverState.addresses.add(address) - return common.serverStates.utxoListQueryTask(serverUri, address, deferred) + return true + } catch (err: unknown) { + console.error(err) + common.log('error while processing address for UTXOs:', err) + addressForUtxosCache[address] = { ...cacheItem, processing: false } + return false + } } /** @@ -1467,13 +1482,13 @@ const processDataLayerUtxos = async ( * After the network fetching, the UTXO is processed into a UtxoData object * to be later processed into the DataLayer. */ -const processBlockbookUtxo = async ( +async function* processBlockbookUtxo( common: CommonParams, args: { serverUri: string cacheItem: BlockbookUtxoCache[string] } -): Promise | boolean> => { +): WsTaskAsyncGenerator { const { serverUri, cacheItem } = args const { blockbookUtxoCache, dataLayerUtxoCache } = common.taskCache const purposeType = pathToPurposeType( @@ -1520,34 +1535,33 @@ const processBlockbookUtxo = async ( txId: cacheItem.blockbookUtxo.txid }) if (tx == null) { - const deferred = new Deferred() - deferred.promise - .then((txResponse: TransactionResponse) => { - const processedTx = processTransactionResponse(common, { - txResponse - }) - script = processedTx.hex - // Only after we have successfully fetched the tx, set our script and call done - done() - }) - .catch(err => { - // If something went wrong, add the UTXO back to the queue - common.log('error while processing Blockbook UTXO:', err) - const utxoId = `${cacheItem.blockbookUtxo.txid}_${cacheItem.blockbookUtxo.vout}` - blockbookUtxoCache[utxoId] = { - blockbookUtxo: cacheItem.blockbookUtxo, - processing: false, - path: cacheItem.path, - address: cacheItem.address, - requiredCount: cacheItem.requiredCount - } - }) + try { + const txResponse: TransactionResponse = yield* common.serverStates.transactionQueryTask( + serverUri, + cacheItem.blockbookUtxo.txid + ) - return common.serverStates.transactionQueryTask( - serverUri, - cacheItem.blockbookUtxo.txid, - deferred - ) + const processedTx = processTransactionResponse(common, { + txResponse + }) + script = processedTx.hex + // Only after we have successfully fetched the tx, set our script and call done + done() + + return true + } catch (err) { + // If something went wrong, add the UTXO back to the queue + common.log('error while processing Blockbook UTXO:', err) + const utxoId = `${cacheItem.blockbookUtxo.txid}_${cacheItem.blockbookUtxo.vout}` + blockbookUtxoCache[utxoId] = { + blockbookUtxo: cacheItem.blockbookUtxo, + processing: false, + path: cacheItem.path, + address: cacheItem.address, + requiredCount: cacheItem.requiredCount + } + return false + } } else { script = tx.hex } diff --git a/src/common/utxobased/network/Blockbook.ts b/src/common/utxobased/network/Blockbook.ts index 80bdfe92..68a141ab 100644 --- a/src/common/utxobased/network/Blockbook.ts +++ b/src/common/utxobased/network/Blockbook.ts @@ -25,7 +25,13 @@ import { TransactionResponse } from './blockbookApi' import Deferred from './Deferred' -import { makeSocket, OnQueueSpace, WsResponse, WsTask } from './Socket' +import { + makeSocket, + TaskGeneratorFn, + WsResponse, + WsTask, + WsTaskGenerator +} from './Socket' import { SocketEmitter } from './SocketEmitter' export type WatchAddressesCB = ( @@ -72,27 +78,19 @@ export interface Blockbook { asBlockbookAddress?: Cleaner | undefined lastQueriedBlockHeight: number page: number - }, - deferred: Deferred - ) => WsTask + } + ) => WsTaskGenerator - transactionQueryTask: ( - txId: string, - deferred: Deferred - ) => WsTask + transactionQueryTask: (txId: string) => WsTaskGenerator - transactionSpecialQueryTask: ( - txId: string, - deferred: Deferred - ) => WsTask + transactionSpecialQueryTask: (txId: string) => WsTaskGenerator utxoListQueryTask: ( address: string, params: { asBlockbookAddress?: Cleaner | undefined - }, - deferred: Deferred - ) => WsTask + } + ) => WsTaskGenerator } interface BlockbookConfig { @@ -101,7 +99,7 @@ interface BlockbookConfig { connectionUri: string engineEmitter: EngineEmitter log: EdgeLog - onQueueSpace: OnQueueSpace + taskGeneratorFn: TaskGeneratorFn ping?: () => Promise socketEmitter: SocketEmitter walletId: string @@ -114,7 +112,7 @@ export function makeBlockbook(config: BlockbookConfig): Blockbook { connectionUri, engineEmitter, log, - onQueueSpace, + taskGeneratorFn, socketEmitter, walletId } = config @@ -172,7 +170,22 @@ export function makeBlockbook(config: BlockbookConfig): Blockbook { async promisifyWsMessage(message: BlockbookTask): Promise { const deferred = new Deferred() - socket.submitTask({ ...message, deferred }) + const taskGeneratorFn = async function* (): AsyncGenerator< + WsTask, + false, + T + > { + const value = yield { ...message } + deferred.resolve(value) + return false + } + const generator = taskGeneratorFn() + const result = await generator.next() + // Assertion mask the type checker + if (result.done !== true) { + const task = result.value + socket.submitTask(task, generator) + } return await deferred.promise }, @@ -217,39 +230,32 @@ export function makeBlockbook(config: BlockbookConfig): Blockbook { // Task Methods: // - addressQueryTask(address, params, deferred) { - return { + addressQueryTask: function* (address, params) { + return yield { ...addressMessage(address, params.asBlockbookAddress, { details: 'txs', from: params.lastQueriedBlockHeight, pageSize: BLOCKBOOK_TXS_PER_PAGE, page: params.page - }), - deferred + }) } }, - transactionQueryTask(txId, deferred) { - return { - ...transactionMessage(txId), - deferred + transactionQueryTask: function* (txId) { + return yield { + ...transactionMessage(txId) } }, - transactionSpecialQueryTask( - txId: string, - deferred: Deferred - ): WsTask { - return { - ...transactionMessageSpecific(txId), - deferred + transactionSpecialQueryTask: function* (txId: string) { + return yield { + ...transactionMessageSpecific(txId) } }, - utxoListQueryTask(address, params, deferred) { - return { - ...addressUtxosMessage(address, params.asBlockbookAddress), - deferred + utxoListQueryTask: function* (address, params) { + return yield { + ...addressUtxosMessage(address, params.asBlockbookAddress) } } } @@ -257,7 +263,7 @@ export function makeBlockbook(config: BlockbookConfig): Blockbook { const socket = makeSocket(connectionUri, { asResponse, healthCheck: config.ping ?? ping, - onQueueSpace, + taskGeneratorFn, log, emitter: socketEmitter, walletId diff --git a/src/common/utxobased/network/BlockbookElectrum.ts b/src/common/utxobased/network/BlockbookElectrum.ts index 15286fee..9bea8e5c 100644 --- a/src/common/utxobased/network/BlockbookElectrum.ts +++ b/src/common/utxobased/network/BlockbookElectrum.ts @@ -15,7 +15,6 @@ import { PluginInfo } from '../../plugin/types' import { addressToScriptPubkey } from '../keymanager/keymanager' import { Blockbook, makeBlockbook } from './Blockbook' import { AddressResponse, BlockbookAccountUtxo } from './blockbookApi' -import Deferred from './Deferred' import { addressTransactionsMessage, AddressTransactionsResponse, @@ -23,7 +22,12 @@ import { ListUnspentResponse, pingMessage } from './electrumApi' -import { OnQueueSpace, WsResponse, WsResponseMessage, WsTask } from './Socket' +import { + TaskGeneratorFn, + WsResponse, + WsResponseMessage, + WsTaskAsyncGenerator +} from './Socket' import { SocketEmitter } from './SocketEmitter' export interface BlockbookElectrumConfig { @@ -31,7 +35,7 @@ export interface BlockbookElectrumConfig { connectionUri: string engineEmitter: EngineEmitter log: EdgeLog - onQueueSpace: OnQueueSpace + taskGeneratorFn: TaskGeneratorFn pluginInfo: PluginInfo socketEmitter: SocketEmitter walletId: string @@ -79,7 +83,7 @@ export function makeBlockbookElectrum( connectionUri, engineEmitter, log, - onQueueSpace, + taskGeneratorFn, pluginInfo, socketEmitter, walletId @@ -102,83 +106,79 @@ export function makeBlockbookElectrum( connectionUri, engineEmitter, log, - onQueueSpace: async (uri: string): Promise | boolean> => { - const task = await onQueueSpace(uri) - if (task == null || typeof task === 'boolean') return task - - // Translate getAccountUtxo to blockchain.scripthash.listunspent: - if (task.method === 'getAccountUtxo') { - const params = task.params as { descriptor: string } - const address = - asAddress != null ? asAddress(params.descriptor) : params.descriptor - const scriptHash = addressToScriptHash(address) - - const deferred = new Deferred() - deferred.promise - .then((electrumUtxos: ListUnspentResponse): void => { - const blockbookUtxos: BlockbookAccountUtxo[] = electrumUtxos.map( - utxo => ({ - txid: utxo.tx_hash, - vout: utxo.tx_pos, - value: utxo.value.toString(), - height: utxo.height - }) - ) - task.deferred.resolve(blockbookUtxos) - }) - .catch((e): void => { - task.deferred.reject(e) - }) - - const translatedTask: WsTask = { - ...listUnspentMessage(scriptHash), - deferred + taskGeneratorFn: async function* ( + uri: string + ): WsTaskAsyncGenerator { + const generator = taskGeneratorFn(uri) + let nextValue: unknown + + while (true) { + const result = await generator.next(nextValue) + const task = result.value + + // If the generator result is not a WsTask we will return it: + if (result?.done === true) return result.value + if (typeof task === 'boolean') return task + + // Translate getAccountUtxo to blockchain.scripthash.listunspent: + if (task.method === 'getAccountUtxo') { + const params = task.params as { descriptor: string } + const address = + asAddress != null ? asAddress(params.descriptor) : params.descriptor + const scriptHash = addressToScriptHash(address) + + // Safe because yielding WsTask should get + // ListUnspentResponse next. + const electrumUtxos = (yield { + ...listUnspentMessage(scriptHash) + }) as ListUnspentResponse + + const blockbookUtxos: BlockbookAccountUtxo[] = electrumUtxos.map( + utxo => ({ + txid: utxo.tx_hash, + vout: utxo.tx_pos, + value: utxo.value.toString(), + height: utxo.height + }) + ) + nextValue = blockbookUtxos + continue } - return translatedTask - } - // Get Address Transactions: - if (task.method === 'getAccountInfo') { - const params = task.params as { descriptor: string } - const address = - asAddress != null ? asAddress(params.descriptor) : params.descriptor - const scriptHash = addressToScriptHash(address) - - const deferred = new Deferred() - deferred.promise - .then((electrumUtxos: AddressTransactionsResponse): void => { - const blockbookUtxos: AddressResponse = { - address, // unused - balance: '0', // unused - totalReceived: '0', // unused - totalSent: '0', // unused - txs: electrumUtxos.length, - unconfirmedBalance: '0', // unused - unconfirmedTxs: electrumUtxos.reduce( - (sum, tx) => (sum + tx.height >= 0 ? 1 : 0), - 0 - ), - txids: [], // unused - transactions: [], // TODO: this requires an extra query per txid - page: 0, // unused - totalPages: 1, - itemsOnPage: 0 // unused - } - task.deferred.resolve(blockbookUtxos) - }) - .catch((e): void => { - task.deferred.reject(e) - }) - - const translatedTask: WsTask = { - ...addressTransactionsMessage(scriptHash), - deferred + // Get Address Transactions: + if (task.method === 'getAccountInfo') { + const params = task.params as { descriptor: string } + const address = + asAddress != null ? asAddress(params.descriptor) : params.descriptor + const scriptHash = addressToScriptHash(address) + + // Safe because yielding WsTask should get + // AddressTransactionsResponse next. + const electrumUtxos = (yield { + ...addressTransactionsMessage(scriptHash) + }) as AddressTransactionsResponse + + const blockbookUtxos: AddressResponse = { + address, // unused + balance: '0', // unused + totalReceived: '0', // unused + totalSent: '0', // unused + txs: electrumUtxos.length, + unconfirmedBalance: '0', // unused + unconfirmedTxs: electrumUtxos.reduce( + (sum, tx) => (sum + tx.height >= 0 ? 1 : 0), + 0 + ), + txids: [], // unused + transactions: [], // TODO: this requires an extra query per txid + page: 0, // unused + totalPages: 1, + itemsOnPage: 0 // unused + } + nextValue = blockbookUtxos + continue } - return translatedTask } - - // Skip unsupported task and continue with the next one: - return true }, ping: async (): Promise => { return await instance.promisifyWsMessage(pingMessage()) diff --git a/src/common/utxobased/network/Socket.ts b/src/common/utxobased/network/Socket.ts index fc78ecdf..3daa050c 100644 --- a/src/common/utxobased/network/Socket.ts +++ b/src/common/utxobased/network/Socket.ts @@ -28,10 +28,17 @@ export type OnFailHandler = (error: Error) => void export interface WsTask { method: string params: unknown - deferred: Deferred cleaner?: Cleaner } +export type WsTaskGenerator = Generator, T, T> + +export type WsTaskAsyncGenerator = AsyncGenerator< + WsTask | boolean, + boolean, + T +> + export interface WsSubscription { method: string params: unknown @@ -45,12 +52,12 @@ export interface Socket { readyState: ReadyState connect: () => Promise disconnect: () => void - submitTask: (task: WsTask) => void + submitTask: (task: WsTask, generator: WsTaskAsyncGenerator) => void subscribe: (subscription: WsSubscription) => void isConnected: () => boolean } -export type OnQueueSpace = (uri: string) => Promise | boolean> +export type TaskGeneratorFn = (uri: string) => WsTaskAsyncGenerator interface SocketConfig { asResponse?: Cleaner @@ -60,11 +67,12 @@ interface SocketConfig { emitter: SocketEmitter log: EdgeLog healthCheck: () => Promise // function for heartbeat, should submit task itself - onQueueSpace: OnQueueSpace + taskGeneratorFn: TaskGeneratorFn } interface WsRequest { task: WsTask + generator: WsTaskAsyncGenerator startTime: number } @@ -155,11 +163,9 @@ export function makeSocket(uri: string, config: SocketConfig): Socket { cancelConnect = false trackedError = undefined for (const request of Object.values(pendingRequests)) { - try { - request.task.deferred.reject(errObj) - } catch (e) { + request.generator.throw(errObj).catch(e => { log.error(e.message) - } + }) } pendingRequests = {} try { @@ -207,12 +213,14 @@ export function makeSocket(uri: string, config: SocketConfig): Socket { lastWakeUp = Date.now() if (connected && version != null) { while (Object.keys(pendingRequests).length < queueSize) { - const task = await config.onQueueSpace(uri) + const generator = await config.taskGeneratorFn(uri) + const result = await generator.next() + const task = result.value if (typeof task === 'boolean') { if (task) continue else break } - submitTask(task) + instance.submitTask(task, generator) } } } @@ -231,14 +239,6 @@ export function makeSocket(uri: string, config: SocketConfig): Socket { } } - // add any exception, since the passed in template parameter needs to be re-assigned - const submitTask = (task: WsTask): void => { - const id = (nextId++).toString() - const request = { task, startTime: Date.now() } - pendingRequests[id] = request - transmitRequest(id, request) - } - const transmitRequest = (id: string, request: WsRequest): void => { const now = Date.now() if ( @@ -273,11 +273,11 @@ export function makeSocket(uri: string, config: SocketConfig): Socket { for (const [id, request] of Object.entries(pendingRequests)) { if (request.startTime + timeout < now) { - try { - request.task.deferred.reject(new Error(`Timeout for request ${id}`)) - } catch (e) { - log.error(e.message) - } + request.generator + .throw(new Error(`Timeout for request ${id}`)) + .catch(e => { + log.error(e.message) + }) removeItem(pendingRequests, id) } } @@ -342,16 +342,29 @@ export function makeSocket(uri: string, config: SocketConfig): Socket { 'message' in error ? error.message : error.connected throw new Error(errorMessage) } + let nextValue if (request.task.cleaner != null) { - request.task.deferred.resolve( - request.task.cleaner(responseMessage.data) - ) + nextValue = request.task.cleaner(responseMessage.data) } else { - request.task.deferred.resolve(responseMessage.data) + nextValue = responseMessage.data } + request.generator + .next(nextValue) + .then(result => { + const task = result.value + if (typeof task === 'boolean') { + return + } + instance.submitTask(task, request.generator) + }) + .catch(e => { + log.error(e.message) + }) } catch (error) { console.log({ uri, error, response: responseMessage, request }) - request.task.deferred.reject(error) + request.generator.throw(error).catch(e => { + log.error(e.message) + }) } continue } @@ -370,7 +383,7 @@ export function makeSocket(uri: string, config: SocketConfig): Socket { setupTimer() // return a Socket - return { + const instance: Socket = { get readyState(): ReadyState { return socket?.readyState ?? ReadyState.CLOSED }, @@ -421,8 +434,18 @@ export function makeSocket(uri: string, config: SocketConfig): Socket { return socket?.readyState === ReadyState.OPEN }, - submitTask, + submitTask: ( + task: WsTask, + generator: WsTaskAsyncGenerator + ): void => { + const id = (nextId++).toString() + const request: WsRequest = { task, startTime: Date.now(), generator } + pendingRequests[id] = request + transmitRequest(id, request) + }, subscribe } + + return instance } diff --git a/test/common/utxobased/network/Blockbook.spec.ts b/test/common/utxobased/network/Blockbook.spec.ts index 71757975..d8ee2740 100644 --- a/test/common/utxobased/network/Blockbook.spec.ts +++ b/test/common/utxobased/network/Blockbook.spec.ts @@ -63,9 +63,9 @@ describe('Blockbook notifications tests with dummy server', function () { const log = makeFakeLog() - const onQueueSpace = async ( + async function* taskGeneratorFn( _uri: string - ): Promise | boolean> => { + ): AsyncGenerator | boolean> { return false } @@ -78,7 +78,7 @@ describe('Blockbook notifications tests with dummy server', function () { connectionUri: 'ws://localhost:8555', engineEmitter, log, - onQueueSpace, + taskGeneratorFn, socketEmitter, walletId: '' }) @@ -156,9 +156,9 @@ describe('Blockbook', function () { let blockbook: Blockbook - const onQueueSpace = async ( + async function* taskGeneratorFn( _uri: string - ): Promise | boolean> => { + ): AsyncGenerator | boolean> { return false } @@ -167,7 +167,7 @@ describe('Blockbook', function () { connectionUri: 'wss://bitcoin.atomicwallet.io/websocket', engineEmitter, log, - onQueueSpace, + taskGeneratorFn, socketEmitter, walletId: '' })