diff --git a/packages/node/src/handlers.ts b/packages/node/src/handlers.ts index 5ee94c355..24d974d68 100644 --- a/packages/node/src/handlers.ts +++ b/packages/node/src/handlers.ts @@ -23,10 +23,34 @@ import * as crypto from "crypto"; import { type DRPNode } from "./index.js"; import { log } from "./logger.js"; import { deserializeStateMessage, serializeStateMessage } from "./utils.js"; -/* - Handler for all DRP messages, including pubsub messages and direct messages - You need to setup stream xor data -*/ + +interface HandleParams { + node: DRPNode; + message: Message; + stream?: Stream; +} + +interface IHandlerStrategy { + (handleParams: HandleParams): Promise<void> | void; +} + +const messageHandlers: Record<MessageType, IHandlerStrategy | undefined> = { + [MessageType.MESSAGE_TYPE_UNSPECIFIED]: undefined, + [MessageType.MESSAGE_TYPE_FETCH_STATE]: fetchStateHandler, + [MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE]: fetchStateResponseHandler, + [MessageType.MESSAGE_TYPE_UPDATE]: updateHandler, + [MessageType.MESSAGE_TYPE_SYNC]: syncHandler, + [MessageType.MESSAGE_TYPE_SYNC_ACCEPT]: syncAcceptHandler, + [MessageType.MESSAGE_TYPE_SYNC_REJECT]: syncRejectHandler, + [MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE]: attestationUpdateHandler, + [MessageType.MESSAGE_TYPE_CUSTOM]: undefined, + [MessageType.UNRECOGNIZED]: undefined, +}; + +/** + * Handler for all DRP messages, including pubsub messages and direct messages + * You need to setup stream xor data + */ export async function drpMessagesHandler( node: DRPNode, stream?: Stream, @@ -48,43 +72,19 @@ export async function drpMessagesHandler( return; } - switch (message.type) { - case MessageType.MESSAGE_TYPE_FETCH_STATE: - fetchStateHandler(node, message.sender, message.data); - break; - case MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE: - fetchStateResponseHandler(node, message.data); - break; - case MessageType.MESSAGE_TYPE_UPDATE: - await updateHandler(node, message.sender, message.data); - break; - case MessageType.MESSAGE_TYPE_SYNC: - if (!stream) { - log.error("::messageHandler: Stream is undefined"); - return; - } - await syncHandler(node, message.sender, message.data); - break; - case MessageType.MESSAGE_TYPE_SYNC_ACCEPT: - if (!stream) { - log.error("::messageHandler: Stream is undefined"); - return; - } - await syncAcceptHandler(node, message.sender, message.data); - break; - case MessageType.MESSAGE_TYPE_SYNC_REJECT: - syncRejectHandler(node, message.data); - break; - case MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE: - attestationUpdateHandler(node, message.sender, message.data); - break; - default: - log.error("::messageHandler: Invalid operation"); - break; + const handler = messageHandlers[message.type]; + if (!handler) { + log.error("::messageHandler: Invalid operation"); + return; + } + const result = handler({ node, message, stream }); + if (result instanceof Promise) { + await result; } } -function fetchStateHandler(node: DRPNode, sender: string, data: Uint8Array): void { +function fetchStateHandler({ node, message }: HandleParams): ReturnType<IHandlerStrategy> { + const { data, sender } = message; const fetchState = FetchState.decode(data); const drpObject = node.objectStore.get(fetchState.objectId); if (!drpObject) { @@ -101,17 +101,18 @@ function fetchStateHandler(node: DRPNode, sender: string, data: Uint8Array): voi drpState: serializeStateMessage(drpState), }); - const message = Message.create({ + const messageFetchStateResponse = Message.create({ sender: node.networkNode.peerId, type: MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE, data: FetchStateResponse.encode(response).finish(), }); - node.networkNode.sendMessage(sender, message).catch((e) => { + node.networkNode.sendMessage(sender, messageFetchStateResponse).catch((e) => { log.error("::fetchStateHandler: Error sending message", e); }); } -function fetchStateResponseHandler(node: DRPNode, data: Uint8Array): void { +function fetchStateResponseHandler({ node, message }: HandleParams): ReturnType<IHandlerStrategy> { + const { data } = message; const fetchStateResponse = FetchStateResponse.decode(data); if (!fetchStateResponse.drpState && !fetchStateResponse.aclState) { log.error("::fetchStateResponseHandler: No state found"); @@ -147,7 +148,8 @@ function fetchStateResponseHandler(node: DRPNode, data: Uint8Array): void { } } -function attestationUpdateHandler(node: DRPNode, sender: string, data: Uint8Array): void { +function attestationUpdateHandler({ node, message }: HandleParams): ReturnType<IHandlerStrategy> { + const { data, sender } = message; const attestationUpdate = AttestationUpdate.decode(data); const object = node.objectStore.get(attestationUpdate.objectId); if (!object) { @@ -164,12 +166,14 @@ function attestationUpdateHandler(node: DRPNode, sender: string, data: Uint8Arra data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } operations array doesn't contain the full remote operations array */ -async function updateHandler(node: DRPNode, sender: string, data: Uint8Array): Promise<boolean> { +async function updateHandler({ node, message }: HandleParams): Promise<void> { + const { sender, data } = message; + const updateMessage = Update.decode(data); const object = node.objectStore.get(updateMessage.objectId); if (!object) { log.error("::updateHandler: Object not found"); - return false; + return; } let verifiedVertices: Vertex[] = []; @@ -210,15 +214,18 @@ async function updateHandler(node: DRPNode, sender: string, data: Uint8Array): P } node.objectStore.put(object.id, object); - - return true; } /* data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } operations array contain the full remote operations array */ -async function syncHandler(node: DRPNode, sender: string, data: Uint8Array): Promise<void> { +async function syncHandler({ node, message, stream }: HandleParams): Promise<void> { + if (!stream) { + log.error("::syncHandler: Stream is undefined"); + return; + } + const { sender, data } = message; // (might send reject) <- TODO: when should we reject? const syncMessage = Sync.decode(data); const object = node.objectStore.get(syncMessage.objectId); @@ -244,7 +251,7 @@ async function syncHandler(node: DRPNode, sender: string, data: Uint8Array): Pro const attestations = getAttestations(object, [...requested]); - const message = Message.create({ + const messageSyncAccept = Message.create({ sender: node.networkNode.peerId, type: MessageType.MESSAGE_TYPE_SYNC_ACCEPT, // add data here @@ -258,7 +265,7 @@ async function syncHandler(node: DRPNode, sender: string, data: Uint8Array): Pro ).finish(), }); - node.networkNode.sendMessage(sender, message).catch((e) => { + node.networkNode.sendMessage(sender, messageSyncAccept).catch((e) => { log.error("::syncHandler: Error sending message", e); }); } @@ -267,7 +274,12 @@ async function syncHandler(node: DRPNode, sender: string, data: Uint8Array): Pro data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } operations array contain the full remote operations array */ -async function syncAcceptHandler(node: DRPNode, sender: string, data: Uint8Array): Promise<void> { +async function syncAcceptHandler({ node, message, stream }: HandleParams): Promise<void> { + if (!stream) { + log.error("::syncAcceptHandler: Stream is undefined"); + return; + } + const { data, sender } = message; const syncAcceptMessage = SyncAccept.decode(data); const object = node.objectStore.get(syncAcceptMessage.objectId); if (!object) { @@ -304,7 +316,7 @@ async function syncAcceptHandler(node: DRPNode, sender: string, data: Uint8Array const attestations = getAttestations(object, requested); - const message = Message.create({ + const messageSyncAccept = Message.create({ sender: node.networkNode.peerId, type: MessageType.MESSAGE_TYPE_SYNC_ACCEPT, data: SyncAccept.encode( @@ -316,13 +328,13 @@ async function syncAcceptHandler(node: DRPNode, sender: string, data: Uint8Array }) ).finish(), }); - node.networkNode.sendMessage(sender, message).catch((e) => { + node.networkNode.sendMessage(sender, messageSyncAccept).catch((e) => { log.error("::syncAcceptHandler: Error sending message", e); }); } /* data: { id: string } */ -function syncRejectHandler(_node: DRPNode, _data: Uint8Array): void { +function syncRejectHandler(_handleParams: HandleParams): ReturnType<IHandlerStrategy> { // TODO: handle reject. Possible actions: // - Retry sync // - Ask sync from another peer