Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Big Refactor of the combiner #10513

Merged
merged 19 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions packages/phone-number-privacy/combiner/src/common/action.ts

This file was deleted.

306 changes: 161 additions & 145 deletions packages/phone-number-privacy/combiner/src/common/combine.ts
Original file line number Diff line number Diff line change
@@ -1,175 +1,191 @@
import {
ErrorMessage,
KeyVersionInfo,
OdisRequest,
OdisResponse,
responseHasExpectedKeyVersion,
SignerEndpoint,
WarningMessage,
} from '@celo/phone-number-privacy-common'
import { Response as FetchResponse } from 'node-fetch'
import Logger from 'bunyan'
import { Request } from 'express'
import * as t from 'io-ts'
import { PerformanceObserver } from 'perf_hooks'
import { OdisConfig } from '../config'
import { Action } from './action'
import { IO } from './io'
import { Session } from './session'
import { fetchSignerResponseWithFallback, SignerResponse } from './io'

export interface Signer {
url: string
fallbackUrl?: string
}

export abstract class CombineAction<R extends OdisRequest> implements Action<R> {
readonly signers: Signer[]
public constructor(readonly config: OdisConfig, readonly io: IO<R>) {
this.signers = JSON.parse(config.odisServices.signers)
}
export interface ThresholdCallToSignersOptions<R extends OdisRequest> {
signers: Signer[]
endpoint: SignerEndpoint
requestTimeoutMS: number
shouldCheckKeyVersion: boolean
keyVersionInfo: KeyVersionInfo
request: Request<{}, {}, R>
responseSchema: t.Type<OdisResponse<R>, OdisResponse<R>, unknown>
}

export async function thresholdCallToSigners<R extends OdisRequest>(
logger: Logger,
options: ThresholdCallToSignersOptions<R>,
processResult: (res: OdisResponse<R>) => Promise<boolean> = (_) => Promise.resolve(false)
): Promise<{ signerResponses: Array<SignerResponse<R>>; maxErrorCode?: number }> {
const obs = new PerformanceObserver((list) => {
// Possible race condition here: if multiple signers take exactly the same
// amount of time, the PerformanceObserver callback may be called twice with
// both entries present. Node 12 doesn't allow for entries to be deleted by name,
// and eliminating the race condition requires a more significant redesign of
// the measurement code.
// This is only used for monitoring purposes, so a rare
// duplicate latency measure for the signer should have minimal impact.
list.getEntries().forEach((entry) => {
logger.info({ latency: entry, signer: entry.name }, 'Signer response latency measured')
})
})
obs.observe({ entryTypes: ['measure'], buffered: false })

abstract combine(session: Session<R>): void
const {
signers,
endpoint,
requestTimeoutMS,
shouldCheckKeyVersion,
keyVersionInfo,
request,
responseSchema,
} = options

async perform(session: Session<R>) {
await this.distribute(session)
this.combine(session)
}
const manualAbort = new AbortController()
const timeoutSignal = AbortSignal.timeout(requestTimeoutMS)
const abortSignal = (AbortSignal as any).any([manualAbort.signal, timeoutSignal]) as AbortSignal

let errorCount = 0
const errorCodes: Map<number, number> = new Map<number, number>()

async distribute(session: Session<R>): Promise<void> {
const obs = new PerformanceObserver((list) => {
// Possible race condition here: if multiple signers take exactly the same
// amount of time, the PerformanceObserver callback may be called twice with
// both entries present. Node 12 doesn't allow for entries to be deleted by name,
// and eliminating the race condition requires a more significant redesign of
// the measurement code.
// This is only used for monitoring purposes, so a rare
// duplicate latency measure for the signer should have minimal impact.
list.getEntries().forEach((entry) => {
session.logger.info(
{ latency: entry, signer: entry.name },
'Signer response latency measured'
const requiredThreshold = keyVersionInfo.threshold

const responses: Array<SignerResponse<R>> = []
// Forward request to signers
// An unexpected error in handling the result for one signer should not
// block a threshold of correct responses, but should be logged.
await Promise.all(
signers.map(async (signer) => {
try {
const signerFetchResult = await fetchSignerResponseWithFallback(
signer,
endpoint,
keyVersionInfo.keyVersion,
request,
logger,
abortSignal
)
})
})
obs.observe({ entryTypes: ['measure'], buffered: false })

const timeout = setTimeout(() => {
session.timedOut = true
session.abort.abort()
}, this.config.odisServices.timeoutMilliSeconds)

// Forward request to signers
// An unexpected error in handling the result for one signer should not
// block a threshold of correct responses, but should be logged.
await Promise.all(
this.signers.map(async (signer) => {
try {
await this.forwardToSigner(signer, session)
} catch (err) {
session.logger.error({
signer: signer.url,
message: 'Unexpected error caught while distributing request to signer',
err,
})

if (!signerFetchResult.ok) {
errorCount++
errorCodes.set(
signerFetchResult.status,
(errorCodes.get(signerFetchResult.status) ?? 0) + 1
)

if (signers.length - errorCount < requiredThreshold) {
logger.warn('Not possible to reach a threshold of signer responses. Failing fast')
manualAbort.abort()
}
return
}
})
)
// TODO Resolve race condition where a session can both receive a successful
// response in time and be aborted

clearTimeout(timeout)
// DO NOT call performance.clearMarks() as this also deletes marks used to
// measure e2e combiner latency.
obs.disconnect()
}
if (
shouldCheckKeyVersion &&
!responseHasExpectedKeyVersion(signerFetchResult, keyVersionInfo.keyVersion, logger)
) {
throw new Error(ErrorMessage.INVALID_KEY_VERSION_RESPONSE)
}

protected async forwardToSigner(signer: Signer, session: Session<R>): Promise<void> {
let signerFetchResult: FetchResponse | undefined
try {
signerFetchResult = await this.io.fetchSignerResponseWithFallback(signer, session)
session.logger.info({
message: 'Received signerFetchResult',
signer: signer.url,
status: signerFetchResult.status,
})
} catch (err) {
session.logger.debug({ err, signer: signer.url, message: 'signer request failure' })
if (err instanceof Error && err.name === 'AbortError' && session.abort.signal.aborted) {
if (session.timedOut) {
session.logger.error({ signer }, ErrorMessage.TIMEOUT_FROM_SIGNER)
} else {
session.logger.info({ signer }, WarningMessage.CANCELLED_REQUEST_TO_SIGNER)
const data: any = await signerFetchResult.json()
logger.info(
{ signer, res: data, status: signerFetchResult.status },
`received 'OK' response from signer`
)

const odisResponse: OdisResponse<R> = parseSchema(responseSchema, data, logger)
if (!odisResponse.success) {
logger.error(
{ err: odisResponse.error, signer: signer.url },
`Signer request to failed with 'OK' status`
)
throw new Error(ErrorMessage.SIGNER_RESPONSE_FAILED_WITH_OK_STATUS)
}
} else {
// Logging the err & message simultaneously fails to log the message in some cases
session.logger.error({ signer }, ErrorMessage.SIGNER_REQUEST_ERROR)
session.logger.error({ signer, err })
}
}
return this.handleFetchResult(signer, session, signerFetchResult)
}

protected async handleFetchResult(
signer: Signer,
session: Session<R>,
signerFetchResult?: FetchResponse
): Promise<void> {
if (signerFetchResult?.ok) {
try {
// Throws if response is not actually successful
await this.receiveSuccess(signerFetchResult, signer.url, session)
return
responses.push({ res: odisResponse, url: signer.url })

if (await processResult(odisResponse)) {
// we already have enough responses
manualAbort.abort()
}
} catch (err) {
session.logger.error(err)
if (isTimeoutError(err)) {
logger.error({ signer }, ErrorMessage.TIMEOUT_FROM_SIGNER)
} else if (isAbortError(err)) {
logger.info({ signer }, WarningMessage.CANCELLED_REQUEST_TO_SIGNER)
} else {
// Logging the err & message simultaneously fails to log the message in some cases
logger.error({ signer }, ErrorMessage.SIGNER_REQUEST_ERROR)
logger.error({ signer, err })

errorCount++
if (signers.length - errorCount < requiredThreshold) {
logger.warn('Not possible to reach a threshold of signer responses. Failing fast')
manualAbort.abort()
}
}
}
}
if (signerFetchResult) {
session.logger.info({
message: 'Received signerFetchResult on unsuccessful signer response',
res: await signerFetchResult.text(),
status: signerFetchResult.status,
signer: signer.url,
})
}
return this.addFailureToSession(signer, signerFetchResult?.status, session)
}
})
)

protected async receiveSuccess(
signerFetchResult: FetchResponse,
url: string,
session: Session<R>
): Promise<OdisResponse<R>> {
if (!signerFetchResult.ok) {
throw new Error(`Implementation Error: receiveSuccess should only receive 'OK' responses`)
}
const { status } = signerFetchResult
const data: string = await signerFetchResult.text()
session.logger.info({ signer: url, res: data, status }, `received 'OK' response from signer`)
const signerResponse: OdisResponse<R> = this.io.validateSignerResponse(
data,
url,
session.logger
// DO NOT call performance.clearMarks() as this also deletes marks used to
// measure e2e combiner latency.
obs.disconnect()

if (errorCodes.size > 1) {
logger.error(
{ errorCodes: JSON.stringify([...errorCodes]) },
ErrorMessage.INCONSISTENT_SIGNER_RESPONSES
)
if (!signerResponse.success) {
session.logger.error(
{ err: signerResponse.error, signer: url, status },
`Signer request to ${url + this.io.signerEndpoint} failed with 'OK' status`
)
throw new Error(ErrorMessage.SIGNER_RESPONSE_FAILED_WITH_OK_STATUS)
}
session.logger.info({ signer: url }, `Signer request successful`)
session.responses.push({ url, res: signerResponse, status })
return signerResponse

return { signerResponses: responses, maxErrorCode: getMajorityErrorCode(errorCodes) }
} else {
return { signerResponses: responses }
}
}

private addFailureToSession(signer: Signer, errorCode: number | undefined, session: Session<R>) {
// Tracking failed request count via signer url prevents
// double counting the same failed request by mistake
session.failedSigners.add(signer.url)
session.logger.warn(
`Received failure from ${session.failedSigners.size}/${this.signers.length} signers`
)
if (errorCode) {
session.incrementErrorCodeCount(errorCode)
}
const { threshold } = session.keyVersionInfo
if (this.signers.length - session.failedSigners.size < threshold) {
session.logger.warn('Not possible to reach a threshold of signer responses. Failing fast')
session.abort.abort()
}
function parseSchema<T>(schema: t.Type<T, T, unknown>, data: unknown, logger: Logger): T {
if (!schema.is(data)) {
logger.error({ data }, `Malformed schema`)
throw new Error(ErrorMessage.INVALID_SIGNER_RESPONSE)
}
return data
}

function isTimeoutError(err: unknown) {
return err instanceof Error && err.name === 'TimeoutError'
}

function isAbortError(err: unknown) {
return err instanceof Error && err.name === 'AbortError'
}

function getMajorityErrorCode(errorCodes: Map<number, number>): number {
let maxErrorCode = -1
let maxCount = -1
errorCodes.forEach((count, errorCode) => {
// This gives priority to the lower status codes in the event of a tie
// because 400s are more helpful than 500s for user feedback
if (count > maxCount || (count === maxCount && errorCode < maxErrorCode)) {
maxCount = count
maxErrorCode = errorCode
}
})
return maxErrorCode
}
25 changes: 0 additions & 25 deletions packages/phone-number-privacy/combiner/src/common/controller.ts

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export abstract class CryptoClient {
/**
* Returns true if the number of valid signatures is enough to perform a combination
*/
// TODO (mcortesi) remove
public hasSufficientSignatures(): boolean {
return this.allSignaturesLength >= this.keyVersionInfo.threshold
}
Expand Down

This file was deleted.

Loading