Skip to content

Commit

Permalink
Convert pickNextTask to an AsyncGenerator
Browse files Browse the repository at this point in the history
  • Loading branch information
samholmes committed Aug 9, 2024
1 parent 6242154 commit d3642c4
Show file tree
Hide file tree
Showing 7 changed files with 527 additions and 457 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## Unreleased

- fixed: Ensure server scoring is done on all server tasks
- changed: Refactor pickNextTask to implement generators to replace Deferred promises

## 3.2.1 (2024-07-19)

Expand Down
164 changes: 95 additions & 69 deletions src/common/utxobased/engine/ServerStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
} from '../network/blockbookApi'
import { makeBlockbookElectrum } from '../network/BlockbookElectrum'
import Deferred from '../network/Deferred'
import { WsTask } from '../network/Socket'
import { WsTask, WsTaskGenerator } from '../network/Socket'
import { SocketEmitter, SocketEvent } from '../network/SocketEmitter'
import { pushUpdate, removeIdFromQueue } from '../network/socketQueue'
import { MAX_CONNECTIONS, NEW_CONNECTIONS } from './constants'
Expand All @@ -43,7 +43,7 @@ interface ServerStateConfig {

export interface ServerStates {
setPickNextTaskCB: (
callback: (uri: string) => Promise<WsTask<any> | boolean>
callback: (uri: string) => AsyncGenerator<WsTask<unknown> | boolean>
) => void
stop: () => void
serverCanGetTx: (uri: string, txid: string) => boolean
Expand All @@ -69,27 +69,23 @@ export interface ServerStates {
addressQueryTask: (
serverUri: string,
address: string,
params: { lastQueriedBlockHeight: number; page: number },
deferred: Deferred<AddressResponse>
) => WsTask<AddressResponse>
params: { lastQueriedBlockHeight: number; page: number }
) => WsTaskGenerator<AddressResponse>

transactionQueryTask: (
serverUri: string,
txId: string,
deferred: Deferred<BlockbookTransaction>
) => WsTask<BlockbookTransaction>
txId: string
) => WsTaskGenerator<BlockbookTransaction>

transactionSpecialQueryTask: (
serverUri: string,
txId: string,
deferred: Deferred<unknown>
) => WsTask<unknown>
txId: string
) => WsTaskGenerator<unknown>

utxoListQueryTask: (
serverUri: string,
address: string,
deferred: Deferred<AddressUtxosResponse>
) => WsTask<AddressUtxosResponse>
address: string
) => WsTaskGenerator<AddressUtxosResponse>
}

interface ServerStatesCache {
Expand Down Expand Up @@ -165,7 +161,9 @@ export function makeServerStates(config: ServerStateConfig): ServerStates {
}
)

let pickNextTaskCB: (uri: string) => Promise<WsTask<any> | boolean>
let pickNextTaskCB: (
uri: string
) => AsyncGenerator<WsTask<unknown> | boolean, boolean>

const makeServerStatesCacheEntry = (blockbook: Blockbook): ServerState => ({
blockbook,
Expand Down Expand Up @@ -225,18 +223,35 @@ export function makeServerStates(config: ServerStateConfig): ServerStates {
let blockbook: Blockbook

// Queue space callback
const onQueueSpace = async (): Promise<WsTask<any> | boolean> => {
async function* taskGeneratorFn(): AsyncGenerator<
WsTask<unknown> | boolean,
boolean
> {
// Exit if the connection is no longer active
if (!(uri in serverStatesCache)) return false

const task = await pickNextTaskCB(uri)
if (typeof task !== 'boolean') {
const taskMessage = `${task.method} params: ${JSON.stringify(
task.params
)}`
log(`${uri} nextTask: ${taskMessage}`)
if (uri == null || !(uri in serverStatesCache)) return false

const generator = pickNextTaskCB(uri)
let nextValue: unknown
while (true) {
const result: IteratorResult<
WsTask<unknown> | boolean,
boolean
> = await generator.next(nextValue)

if (result?.done === true) {
return result.value
}

const task = result.value

if (typeof task !== 'boolean') {
const taskMessage = `${task.method} params: ${JSON.stringify(
task.params
)}`
log(`${uri} nextTask: ${taskMessage}`)
}
nextValue = yield task
}
return task
}

// Create a new blockbook instance based on the URI scheme
Expand All @@ -247,7 +262,7 @@ export function makeServerStates(config: ServerStateConfig): ServerStates {
connectionUri: uri,
engineEmitter,
log,
onQueueSpace,
taskGeneratorFn,
pluginInfo,
socketEmitter,
walletId: walletInfo.id
Expand All @@ -259,7 +274,7 @@ export function makeServerStates(config: ServerStateConfig): ServerStates {
connectionUri: uri,
engineEmitter,
log,
onQueueSpace,
taskGeneratorFn,
socketEmitter,
walletId: walletInfo.id
})
Expand Down Expand Up @@ -317,6 +332,26 @@ export function makeServerStates(config: ServerStateConfig): ServerStates {
return deferredWithScoring
}

function* withServerScoring<T, R>(
serverUri: string,
generator: Generator<T, R>
): Generator<T, R> {
const serverState = serverStatesCache[serverUri]
if (serverState == null)
throw new Error(`No blockbook connection with ${serverUri}`)

const queryTime = Date.now()
let result: R
try {
result = yield* generator
pluginState.serverScoreUp(serverUri, Date.now() - queryTime)
} catch (error: unknown) {
pluginState.serverScoreDown(serverUri)
throw error
}
return result
}

const instance: ServerStates = {
async broadcastTx(transaction: EdgeTransaction): Promise<string> {
return await new Promise((resolve, reject) => {
Expand Down Expand Up @@ -460,9 +495,7 @@ export function makeServerStates(config: ServerStateConfig): ServerStates {
})
},

setPickNextTaskCB(
callback: (uri: string) => Promise<WsTask<any> | boolean>
): void {
setPickNextTaskCB(callback): void {
pickNextTaskCB = callback
},

Expand Down Expand Up @@ -569,71 +602,64 @@ export function makeServerStates(config: ServerStateConfig): ServerStates {
// Task Methods:
//

addressQueryTask(
addressQueryTask: function* (
serverUri: string,
address: string,
params: { lastQueriedBlockHeight: number; page: number },
deferred: Deferred<AddressResponse>
): WsTask<AddressResponse> {
params: { lastQueriedBlockHeight: number; page: number }
) {
const serverState = serverStatesCache[serverUri]
if (serverState == null)
throw new Error(`No blockbook connection with ${serverUri}`)

return serverState.blockbook.addressQueryTask(
address,
{
asBlockbookAddress: pluginInfo.engineInfo.asBlockbookAddress,
lastQueriedBlockHeight: params.lastQueriedBlockHeight,
page: params.page
},
deferredWithServerScoring(serverUri, deferred)
return yield* withServerScoring(
serverUri,
(function* () {
return yield* serverState.blockbook.addressQueryTask(address, {
asBlockbookAddress: pluginInfo.engineInfo.asBlockbookAddress,
lastQueriedBlockHeight: params.lastQueriedBlockHeight,
page: params.page
})
})()
)
},

transactionQueryTask(
serverUri: string,
txId: string,
deferred: Deferred<BlockbookTransaction>
): WsTask<BlockbookTransaction> {
transactionQueryTask: function* (serverUri: string, txId: string) {
const serverState = serverStatesCache[serverUri]
if (serverState == null)
throw new Error(`No blockbook connection with ${serverUri}`)

return serverState.blockbook.transactionQueryTask(
txId,
deferredWithServerScoring(serverUri, deferred)
return yield* withServerScoring(
serverUri,
(function* () {
return yield* serverState.blockbook.transactionQueryTask(txId)
})()
)
},

transactionSpecialQueryTask(
serverUri: string,
txId: string,
deferred: Deferred<unknown>
): WsTask<unknown> {
transactionSpecialQueryTask: function* (serverUri: string, txId: string) {
const serverState = serverStatesCache[serverUri]
if (serverState == null)
throw new Error(`No blockbook connection with ${serverUri}`)

return serverState.blockbook.transactionSpecialQueryTask(
txId,
deferredWithServerScoring(serverUri, deferred)
return yield* withServerScoring(
serverUri,
(function* () {
return serverState.blockbook.transactionSpecialQueryTask(txId)
})()
)
},
utxoListQueryTask(
serverUri: string,
address: string,
deferred: Deferred<AddressUtxosResponse>
): WsTask<AddressUtxosResponse> {
utxoListQueryTask: function* (serverUri: string, address: string) {
const serverState = serverStatesCache[serverUri]
if (serverState == null)
throw new Error(`No blockbook connection with ${serverUri}`)

return serverState.blockbook.utxoListQueryTask(
address,
{
asBlockbookAddress: pluginInfo.engineInfo.asBlockbookAddress
},
deferredWithServerScoring(serverUri, deferred)
return yield* withServerScoring(
serverUri,
(function* () {
return yield* serverState.blockbook.utxoListQueryTask(address, {
asBlockbookAddress: pluginInfo.engineInfo.asBlockbookAddress
})
})()
)
}
}
Expand Down
Loading

0 comments on commit d3642c4

Please sign in to comment.