Skip to content

Commit

Permalink
reviewer suggestions implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
fforbeck committed Oct 29, 2024
1 parent 232c5fc commit 67cb972
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 54 deletions.
71 changes: 23 additions & 48 deletions src/handlers/egress-tracker.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,13 @@ import { Accounting } from '../services/accounting.js'
*
* @type {Middleware<EgressTrackerContext, EgressTrackerContext, Environment>}
*/
export function withEgressHandler (handler) {
export function withEgressHandler(handler) {
return async (req, env, ctx) => {
if (env.FF_EGRESS_TRACKER_ENABLED !== 'true') {
return handler(req, env, ctx)
}

let response
try {
response = await handler(req, env, ctx)
} catch (error) {
console.error('Error in egress tracker handler:', error)
throw error
}

const response = await handler(req, env, ctx)
if (!response.ok || !response.body) {
return response
}
Expand All @@ -38,17 +31,18 @@ export function withEgressHandler (handler) {
serviceURL: env.ACCOUNTING_SERVICE_URL
})

const { readable, writable } = createEgressPassThroughStream(ctx, accounting, dataCid)

try {
ctx.waitUntil(response.body.pipeTo(writable))
} catch (error) {
console.error('Error in egress tracker handler:', error)
// Original response in case of an error to avoid breaking the chain and serve the content
return response
}
const responseBody = response.body.pipeThrough(
createByteCountStream((totalBytesServed) => {
// Non-blocking call to the accounting service to record egress
if (totalBytesServed > 0) {
ctx.waitUntil(
accounting.record(dataCid, totalBytesServed, new Date().toISOString())
);
}
})
);

return new Response(readable, {
return new Response(responseBody, {
status: response.status,
statusText: response.statusText,
headers: response.headers
Expand All @@ -60,58 +54,39 @@ export function withEgressHandler (handler) {
* Creates a TransformStream to count bytes served to the client.
* It records egress when the stream is finalized without an error.
*
* @param {import('@web3-storage/gateway-lib/middleware').Context} ctx - The context object.
* @param {AccountingService} accounting - The accounting service instance to record egress.
* @param {import('@web3-storage/gateway-lib/handlers').CID} dataCid - The CID of the served content.
* @returns {TransformStream} - The created TransformStream.
* @param {(totalBytesServed: number) => void} onClose
* @template {Uint8Array} T
* @returns {TransformStream<T, T>} - The created TransformStream.
*/
function createEgressPassThroughStream (ctx, accounting, dataCid) {
function createByteCountStream(onClose) {
let totalBytesServed = 0

return new TransformStream({
/**
* The start function is called when the stream is being initialized.
* It resets the total bytes served to 0.
*/
start () {
totalBytesServed = 0
},
/**
* The transform function is called for each chunk of the response body.
* It enqueues the chunk and updates the total bytes served.
* If an error occurs, it signals an error to the controller and logs it.
* The bytes are not counted in case of enqueuing an error.
* @param {Uint8Array} chunk
* @param {TransformStreamDefaultController} controller
*/
async transform (chunk, controller) {
async transform(chunk, controller) {
try {
controller.enqueue(chunk)
totalBytesServed += chunk.byteLength
} catch (error) {
console.error('Error while counting egress bytes:', error)
console.error('Error while counting bytes:', error)
controller.error(error)
}
},

/**
* The flush function is called when the stream is being finalized,
* which is when the response is being sent to the client.
* So before the response is sent, we record the egress.
* It is called only once and it triggers a non-blocking call to the accounting service.
* So before the response is sent, we record the egress using the callback.
* If an error occurs, the egress is not recorded.
* NOTE: The flush function is NOT called in case of an stream error.
* NOTE: The flush function is NOT called in case of a stream error.
*/
async flush (controller) {
try {
// Non-blocking call to the accounting service to record egress
if (totalBytesServed > 0) {
ctx.waitUntil(accounting.record(dataCid, totalBytesServed, new Date().toISOString()))
}
} catch (error) {
console.error('Error while recording egress:', error)
controller.error(error)
}
async flush() {
onClose(totalBytesServed)
}
})
}
2 changes: 1 addition & 1 deletion test/fixtures/worker-fixture.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const __dirname = path.dirname(__filename)
*/
const wranglerEnv = process.env.WRANGLER_ENV || 'integration'

const DEBUG = process.env.DEBUG === 'true' || false
const DEBUG = process.env.DEBUG === 'true'

/**
* Worker information object
Expand Down
10 changes: 5 additions & 5 deletions test/unit/middlewares/egress-tracker.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ const env =
})

const accountingRecordMethodStub = sinon.stub()
// @ts-expect-error
.returns(async (cid, bytes, servedAt) => {
console.log(`[mock] record called with cid: ${cid}, bytes: ${bytes}, servedAt: ${servedAt}`)
return Promise.resolve()
})
.returns(
/** @type {import('../../../src/bindings.js').AccountingService['record']} */
async (cid, bytes, servedAt) => {
console.log(`[mock] record called with cid: ${cid}, bytes: ${bytes}, servedAt: ${servedAt}`)
})

/**
* Mock implementation of the AccountingService.
Expand Down

0 comments on commit 67cb972

Please sign in to comment.