From 2d94f74ea3990e3a17265be3fd64f4095d86efab Mon Sep 17 00:00:00 2001 From: anhnd350309 <anhnd350309@gmail.com> Date: Thu, 6 Mar 2025 14:51:20 +0700 Subject: [PATCH 1/8] init run handler strategy --- packages/node/src/handlers.ts | 320 ++++++++++++++++++++++++++++++---- 1 file changed, 287 insertions(+), 33 deletions(-) diff --git a/packages/node/src/handlers.ts b/packages/node/src/handlers.ts index 5ee94c355..b548dd6f4 100644 --- a/packages/node/src/handlers.ts +++ b/packages/node/src/handlers.ts @@ -23,6 +23,285 @@ import * as crypto from "crypto"; import { type DRPNode } from "./index.js"; import { log } from "./logger.js"; import { deserializeStateMessage, serializeStateMessage } from "./utils.js"; + +interface HandleParams { + node: DRPNode; + message: Message; + stream?: Stream; +} + +interface IHandlerStrategy { + handle: (handleParams: HandleParams) => Promise<void> | void; +} + +class FetchStateHandler implements IHandlerStrategy { + handle({ node, message }: HandleParams) { + const fetchState = FetchState.decode(message.data); + const drpObject = node.objectStore.get(fetchState.objectId); + if (!drpObject) { + log.error("::fetchStateHandler: Object not found"); + return; + } + + const aclState = drpObject.aclStates.get(fetchState.vertexHash); + const drpState = drpObject.drpStates.get(fetchState.vertexHash); + const response = FetchStateResponse.create({ + objectId: fetchState.objectId, + vertexHash: fetchState.vertexHash, + aclState: serializeStateMessage(aclState), + drpState: serializeStateMessage(drpState), + }); + + const responseMessage = Message.create({ + sender: node.networkNode.peerId, + type: MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE, + data: FetchStateResponse.encode(response).finish(), + }); + node.networkNode.sendMessage(message.sender, responseMessage).catch((e) => { + log.error("::fetchStateHandler: Error sending message", e); + }); + } +} + +class FetchStateResponseHandler implements IHandlerStrategy { + handle({ node, message }: HandleParams) { + const { data } = message; + const fetchStateResponse = FetchStateResponse.decode(data); + if (!fetchStateResponse.drpState && !fetchStateResponse.aclState) { + log.error("::fetchStateResponseHandler: No state found"); + } + const object = node.objectStore.get(fetchStateResponse.objectId); + if (!object) { + log.error("::fetchStateResponseHandler: Object not found"); + return; + } + if (!object.acl) { + log.error("::fetchStateResponseHandler: ACL not found"); + return; + } + + const aclState = deserializeStateMessage(fetchStateResponse.aclState); + const drpState = deserializeStateMessage(fetchStateResponse.drpState); + if (fetchStateResponse.vertexHash === HashGraph.rootHash) { + const state = aclState; + object.aclStates.set(fetchStateResponse.vertexHash, state); + for (const e of state.state) { + if (object.originalObjectACL) object.originalObjectACL[e.key] = e.value; + (object.acl as ACL)[e.key] = e.value; + } + node.objectStore.put(object.id, object); + return; + } + + if (fetchStateResponse.aclState) { + object.aclStates.set(fetchStateResponse.vertexHash, aclState as DRPState); + } + if (fetchStateResponse.drpState) { + object.drpStates.set(fetchStateResponse.vertexHash, drpState as DRPState); + } + } +} + +class UpdateHandler implements IHandlerStrategy { + async handle({ node, message }: HandleParams) { + const { sender, data } = message; + const updateMessage = Update.decode(data); + const object = node.objectStore.get(updateMessage.objectId); + if (!object) { + log.error("::updateHandler: Object not found"); + return false; + } + + let verifiedVertices: Vertex[] = []; + if ((object.acl as ACL).permissionless) { + verifiedVertices = updateMessage.vertices; + } else { + verifiedVertices = await verifyACLIncomingVertices(updateMessage.vertices); + } + + const [merged, _] = object.merge(verifiedVertices); + + if (!merged) { + await node.syncObject(updateMessage.objectId, sender); + } else { + // add their signatures + object.finalityStore.addSignatures(sender, updateMessage.attestations); + + // add my signatures + const attestations = signFinalityVertices(node, object, verifiedVertices); + + if (attestations.length !== 0) { + // broadcast the attestations + const message = Message.create({ + sender: node.networkNode.peerId, + type: MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE, + data: AttestationUpdate.encode( + AttestationUpdate.create({ + objectId: object.id, + attestations: attestations, + }) + ).finish(), + }); + + node.networkNode.broadcastMessage(object.id, message).catch((e) => { + log.error("::updateHandler: Error broadcasting message", e); + }); + } + } + + node.objectStore.put(object.id, object); + + return true; + } +} + +class SyncHandler implements IHandlerStrategy { + async handle({ node, message, stream }: HandleParams) { + if (!stream) { + log.error("::messageSyncHandler: Stream is undefined"); + return; + } + const { sender, data } = message; + // (might send reject) <- TODO: when should we reject? + const syncMessage = Sync.decode(data); + const object = node.objectStore.get(syncMessage.objectId); + if (!object) { + log.error("::syncHandler: Object not found"); + return; + } + + await signGeneratedVertices(node, object.vertices); + + const requested: Set<Vertex> = new Set(object.vertices); + const requesting: string[] = []; + for (const h of syncMessage.vertexHashes) { + const vertex = object.vertices.find((v) => v.hash === h); + if (vertex) { + requested.delete(vertex); + } else { + requesting.push(h); + } + } + + if (requested.size === 0 && requesting.length === 0) return; + + const attestations = getAttestations(object, [...requested]); + + const messageSyncAccept = Message.create({ + sender: node.networkNode.peerId, + type: MessageType.MESSAGE_TYPE_SYNC_ACCEPT, + // add data here + data: SyncAccept.encode( + SyncAccept.create({ + objectId: object.id, + requested: [...requested], + attestations, + requesting, + }) + ).finish(), + }); + + node.networkNode.sendMessage(sender, messageSyncAccept).catch((e) => { + log.error("::syncHandler: Error sending message", e); + }); + } +} + +class SyncAcceptHandler implements IHandlerStrategy { + async handle({ node, message, stream }: HandleParams) { + if (!stream) { + log.error("::messageSyncAcceptHandler: Stream is undefined"); + return; + } + const { data, sender } = message; + const syncAcceptMessage = SyncAccept.decode(data); + const object = node.objectStore.get(syncAcceptMessage.objectId); + if (!object) { + log.error("::syncAcceptHandler: Object not found"); + return; + } + + let verifiedVertices: Vertex[] = []; + if ((object.acl as ACL).permissionless) { + verifiedVertices = syncAcceptMessage.requested; + } else { + verifiedVertices = await verifyACLIncomingVertices(syncAcceptMessage.requested); + } + + if (verifiedVertices.length !== 0) { + object.merge(verifiedVertices); + object.finalityStore.mergeSignatures(syncAcceptMessage.attestations); + node.objectStore.put(object.id, object); + } + + await signGeneratedVertices(node, object.vertices); + signFinalityVertices(node, object, object.vertices); + + // send missing vertices + const requested: Vertex[] = []; + for (const h of syncAcceptMessage.requesting) { + const vertex = object.vertices.find((v) => v.hash === h); + if (vertex) { + requested.push(vertex); + } + } + + if (requested.length === 0) return; + + const attestations = getAttestations(object, requested); + + const messageSyncAccept = Message.create({ + sender: node.networkNode.peerId, + type: MessageType.MESSAGE_TYPE_SYNC_ACCEPT, + data: SyncAccept.encode( + SyncAccept.create({ + objectId: object.id, + requested, + attestations, + requesting: [], + }) + ).finish(), + }); + node.networkNode.sendMessage(sender, messageSyncAccept).catch((e) => { + log.error("::syncAcceptHandler: Error sending message", e); + }); + } +} + +class SyncRejectHandler implements IHandlerStrategy { + async handle({ node, message }: HandleParams) { + // TODO: handle reject. Possible actions: + // - Retry sync + // - Ask sync from another peer + // - Do nothing + } +} + +class AttestationUpdateHandler implements IHandlerStrategy { + async handle({ node, message }: HandleParams) { + const attestationUpdate = AttestationUpdate.decode(message.data); + const object = node.objectStore.get(attestationUpdate.objectId); + if (!object) { + log.error("::attestationUpdateHandler: Object not found"); + return; + } + + if ((object.acl as ACL).query_isFinalitySigner(message.sender)) { + object.finalityStore.addSignatures(message.sender, attestationUpdate.attestations); + } + } +} + +const messageHandlers: Map<MessageType, IHandlerStrategy> = new Map([ + [MessageType.MESSAGE_TYPE_FETCH_STATE, new FetchStateHandler()], + [MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE, new FetchStateResponseHandler()], + [MessageType.MESSAGE_TYPE_UPDATE, new UpdateHandler()], + [MessageType.MESSAGE_TYPE_SYNC, new SyncHandler()], + [MessageType.MESSAGE_TYPE_SYNC_ACCEPT, new SyncAcceptHandler()], + [MessageType.MESSAGE_TYPE_SYNC_REJECT, new SyncRejectHandler()], + [MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE, new AttestationUpdateHandler()], +]); + /* Handler for all DRP messages, including pubsub messages and direct messages You need to setup stream xor data @@ -48,39 +327,14 @@ export async function drpMessagesHandler( return; } - switch (message.type) { - case MessageType.MESSAGE_TYPE_FETCH_STATE: - fetchStateHandler(node, message.sender, message.data); - break; - case MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE: - fetchStateResponseHandler(node, message.data); - break; - case MessageType.MESSAGE_TYPE_UPDATE: - await updateHandler(node, message.sender, message.data); - break; - case MessageType.MESSAGE_TYPE_SYNC: - if (!stream) { - log.error("::messageHandler: Stream is undefined"); - return; - } - await syncHandler(node, message.sender, message.data); - break; - case MessageType.MESSAGE_TYPE_SYNC_ACCEPT: - if (!stream) { - log.error("::messageHandler: Stream is undefined"); - return; - } - await syncAcceptHandler(node, message.sender, message.data); - break; - case MessageType.MESSAGE_TYPE_SYNC_REJECT: - syncRejectHandler(node, message.data); - break; - case MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE: - attestationUpdateHandler(node, message.sender, message.data); - break; - default: - log.error("::messageHandler: Invalid operation"); - break; + const handler = messageHandlers.get(message.type); + if (!handler) { + log.error("::messageHandler: Invalid operation"); + return; + } + const result = handler.handle({ node, message, stream }); + if (result instanceof Promise) { + await result; } } From 181ffd1b1cf64de9fa2b059678c0e5162f64b667 Mon Sep 17 00:00:00 2001 From: anhnd350309 <anhnd350309@gmail.com> Date: Thu, 6 Mar 2025 14:53:49 +0700 Subject: [PATCH 2/8] feat: update --- packages/node/src/handlers.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/node/src/handlers.ts b/packages/node/src/handlers.ts index b548dd6f4..789ccba31 100644 --- a/packages/node/src/handlers.ts +++ b/packages/node/src/handlers.ts @@ -109,7 +109,7 @@ class UpdateHandler implements IHandlerStrategy { const object = node.objectStore.get(updateMessage.objectId); if (!object) { log.error("::updateHandler: Object not found"); - return false; + return; } let verifiedVertices: Vertex[] = []; @@ -151,7 +151,7 @@ class UpdateHandler implements IHandlerStrategy { node.objectStore.put(object.id, object); - return true; + return; } } From 58e5311898f1f3fd19c540c52509ba70a7c1923b Mon Sep 17 00:00:00 2001 From: anhnd350309 <anhnd350309@gmail.com> Date: Thu, 6 Mar 2025 17:03:20 +0700 Subject: [PATCH 3/8] update comment --- packages/node/src/handlers.ts | 260 ++-------------------------------- 1 file changed, 14 insertions(+), 246 deletions(-) diff --git a/packages/node/src/handlers.ts b/packages/node/src/handlers.ts index 789ccba31..59b176471 100644 --- a/packages/node/src/handlers.ts +++ b/packages/node/src/handlers.ts @@ -103,6 +103,10 @@ class FetchStateResponseHandler implements IHandlerStrategy { } class UpdateHandler implements IHandlerStrategy { + /* + data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } + operations array doesn't contain the full remote operations array + */ async handle({ node, message }: HandleParams) { const { sender, data } = message; const updateMessage = Update.decode(data); @@ -156,6 +160,10 @@ class UpdateHandler implements IHandlerStrategy { } class SyncHandler implements IHandlerStrategy { + /* + data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } + operations array contain the full remote operations array + */ async handle({ node, message, stream }: HandleParams) { if (!stream) { log.error("::messageSyncHandler: Stream is undefined"); @@ -208,6 +216,10 @@ class SyncHandler implements IHandlerStrategy { } class SyncAcceptHandler implements IHandlerStrategy { + /* + data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } + operations array contain the full remote operations array + */ async handle({ node, message, stream }: HandleParams) { if (!stream) { log.error("::messageSyncAcceptHandler: Stream is undefined"); @@ -269,7 +281,8 @@ class SyncAcceptHandler implements IHandlerStrategy { } class SyncRejectHandler implements IHandlerStrategy { - async handle({ node, message }: HandleParams) { + /* data: { id: string } */ + async handle() { // TODO: handle reject. Possible actions: // - Retry sync // - Ask sync from another peer @@ -338,251 +351,6 @@ export async function drpMessagesHandler( } } -function fetchStateHandler(node: DRPNode, sender: string, data: Uint8Array): void { - const fetchState = FetchState.decode(data); - const drpObject = node.objectStore.get(fetchState.objectId); - if (!drpObject) { - log.error("::fetchStateHandler: Object not found"); - return; - } - - const aclState = drpObject.aclStates.get(fetchState.vertexHash); - const drpState = drpObject.drpStates.get(fetchState.vertexHash); - const response = FetchStateResponse.create({ - objectId: fetchState.objectId, - vertexHash: fetchState.vertexHash, - aclState: serializeStateMessage(aclState), - drpState: serializeStateMessage(drpState), - }); - - const message = Message.create({ - sender: node.networkNode.peerId, - type: MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE, - data: FetchStateResponse.encode(response).finish(), - }); - node.networkNode.sendMessage(sender, message).catch((e) => { - log.error("::fetchStateHandler: Error sending message", e); - }); -} - -function fetchStateResponseHandler(node: DRPNode, data: Uint8Array): void { - const fetchStateResponse = FetchStateResponse.decode(data); - if (!fetchStateResponse.drpState && !fetchStateResponse.aclState) { - log.error("::fetchStateResponseHandler: No state found"); - } - const object = node.objectStore.get(fetchStateResponse.objectId); - if (!object) { - log.error("::fetchStateResponseHandler: Object not found"); - return; - } - if (!object.acl) { - log.error("::fetchStateResponseHandler: ACL not found"); - return; - } - - const aclState = deserializeStateMessage(fetchStateResponse.aclState); - const drpState = deserializeStateMessage(fetchStateResponse.drpState); - if (fetchStateResponse.vertexHash === HashGraph.rootHash) { - const state = aclState; - object.aclStates.set(fetchStateResponse.vertexHash, state); - for (const e of state.state) { - if (object.originalObjectACL) object.originalObjectACL[e.key] = e.value; - (object.acl as ACL)[e.key] = e.value; - } - node.objectStore.put(object.id, object); - return; - } - - if (fetchStateResponse.aclState) { - object.aclStates.set(fetchStateResponse.vertexHash, aclState as DRPState); - } - if (fetchStateResponse.drpState) { - object.drpStates.set(fetchStateResponse.vertexHash, drpState as DRPState); - } -} - -function attestationUpdateHandler(node: DRPNode, sender: string, data: Uint8Array): void { - const attestationUpdate = AttestationUpdate.decode(data); - const object = node.objectStore.get(attestationUpdate.objectId); - if (!object) { - log.error("::attestationUpdateHandler: Object not found"); - return; - } - - if ((object.acl as ACL).query_isFinalitySigner(sender)) { - object.finalityStore.addSignatures(sender, attestationUpdate.attestations); - } -} - -/* - data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } - operations array doesn't contain the full remote operations array -*/ -async function updateHandler(node: DRPNode, sender: string, data: Uint8Array): Promise<boolean> { - const updateMessage = Update.decode(data); - const object = node.objectStore.get(updateMessage.objectId); - if (!object) { - log.error("::updateHandler: Object not found"); - return false; - } - - let verifiedVertices: Vertex[] = []; - if ((object.acl as ACL).permissionless) { - verifiedVertices = updateMessage.vertices; - } else { - verifiedVertices = await verifyACLIncomingVertices(updateMessage.vertices); - } - - const [merged, _] = object.merge(verifiedVertices); - - if (!merged) { - await node.syncObject(updateMessage.objectId, sender); - } else { - // add their signatures - object.finalityStore.addSignatures(sender, updateMessage.attestations); - - // add my signatures - const attestations = signFinalityVertices(node, object, verifiedVertices); - - if (attestations.length !== 0) { - // broadcast the attestations - const message = Message.create({ - sender: node.networkNode.peerId, - type: MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE, - data: AttestationUpdate.encode( - AttestationUpdate.create({ - objectId: object.id, - attestations: attestations, - }) - ).finish(), - }); - - node.networkNode.broadcastMessage(object.id, message).catch((e) => { - log.error("::updateHandler: Error broadcasting message", e); - }); - } - } - - node.objectStore.put(object.id, object); - - return true; -} - -/* - data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } - operations array contain the full remote operations array -*/ -async function syncHandler(node: DRPNode, sender: string, data: Uint8Array): Promise<void> { - // (might send reject) <- TODO: when should we reject? - const syncMessage = Sync.decode(data); - const object = node.objectStore.get(syncMessage.objectId); - if (!object) { - log.error("::syncHandler: Object not found"); - return; - } - - await signGeneratedVertices(node, object.vertices); - - const requested: Set<Vertex> = new Set(object.vertices); - const requesting: string[] = []; - for (const h of syncMessage.vertexHashes) { - const vertex = object.vertices.find((v) => v.hash === h); - if (vertex) { - requested.delete(vertex); - } else { - requesting.push(h); - } - } - - if (requested.size === 0 && requesting.length === 0) return; - - const attestations = getAttestations(object, [...requested]); - - const message = Message.create({ - sender: node.networkNode.peerId, - type: MessageType.MESSAGE_TYPE_SYNC_ACCEPT, - // add data here - data: SyncAccept.encode( - SyncAccept.create({ - objectId: object.id, - requested: [...requested], - attestations, - requesting, - }) - ).finish(), - }); - - node.networkNode.sendMessage(sender, message).catch((e) => { - log.error("::syncHandler: Error sending message", e); - }); -} - -/* - data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } - operations array contain the full remote operations array -*/ -async function syncAcceptHandler(node: DRPNode, sender: string, data: Uint8Array): Promise<void> { - const syncAcceptMessage = SyncAccept.decode(data); - const object = node.objectStore.get(syncAcceptMessage.objectId); - if (!object) { - log.error("::syncAcceptHandler: Object not found"); - return; - } - - let verifiedVertices: Vertex[] = []; - if ((object.acl as ACL).permissionless) { - verifiedVertices = syncAcceptMessage.requested; - } else { - verifiedVertices = await verifyACLIncomingVertices(syncAcceptMessage.requested); - } - - if (verifiedVertices.length !== 0) { - object.merge(verifiedVertices); - object.finalityStore.mergeSignatures(syncAcceptMessage.attestations); - node.objectStore.put(object.id, object); - } - - await signGeneratedVertices(node, object.vertices); - signFinalityVertices(node, object, object.vertices); - - // send missing vertices - const requested: Vertex[] = []; - for (const h of syncAcceptMessage.requesting) { - const vertex = object.vertices.find((v) => v.hash === h); - if (vertex) { - requested.push(vertex); - } - } - - if (requested.length === 0) return; - - const attestations = getAttestations(object, requested); - - const message = Message.create({ - sender: node.networkNode.peerId, - type: MessageType.MESSAGE_TYPE_SYNC_ACCEPT, - data: SyncAccept.encode( - SyncAccept.create({ - objectId: object.id, - requested, - attestations, - requesting: [], - }) - ).finish(), - }); - node.networkNode.sendMessage(sender, message).catch((e) => { - log.error("::syncAcceptHandler: Error sending message", e); - }); -} - -/* data: { id: string } */ -function syncRejectHandler(_node: DRPNode, _data: Uint8Array): void { - // TODO: handle reject. Possible actions: - // - Retry sync - // - Ask sync from another peer - // - Do nothing -} - export function drpObjectChangesHandler( node: DRPNode, obj: DRPObject, From 565c534018a62a5fbf782cac6aa449f2b2c0ddd6 Mon Sep 17 00:00:00 2001 From: anhnd350309 <anhnd350309@gmail.com> Date: Thu, 6 Mar 2025 17:58:16 +0700 Subject: [PATCH 4/8] update to function --- packages/node/src/handlers.ts | 533 +++++++++++++++++----------------- 1 file changed, 260 insertions(+), 273 deletions(-) diff --git a/packages/node/src/handlers.ts b/packages/node/src/handlers.ts index 59b176471..fa47c6e90 100644 --- a/packages/node/src/handlers.ts +++ b/packages/node/src/handlers.ts @@ -31,324 +31,311 @@ interface HandleParams { } interface IHandlerStrategy { - handle: (handleParams: HandleParams) => Promise<void> | void; + (handleParams: HandleParams): Promise<void> | void; } -class FetchStateHandler implements IHandlerStrategy { - handle({ node, message }: HandleParams) { - const fetchState = FetchState.decode(message.data); - const drpObject = node.objectStore.get(fetchState.objectId); - if (!drpObject) { - log.error("::fetchStateHandler: Object not found"); +/* + Handler for all DRP messages, including pubsub messages and direct messages + You need to setup stream xor data +*/ +export async function drpMessagesHandler( + node: DRPNode, + stream?: Stream, + data?: Uint8Array +): Promise<void> { + let message: Message; + try { + if (stream) { + const byteArray = await streamToUint8Array(stream); + message = Message.decode(byteArray); + } else if (data) { + message = Message.decode(data); + } else { + log.error("::messageHandler: Stream and data are undefined"); return; } + } catch (err) { + log.error("::messageHandler: Error decoding message", err); + return; + } - const aclState = drpObject.aclStates.get(fetchState.vertexHash); - const drpState = drpObject.drpStates.get(fetchState.vertexHash); - const response = FetchStateResponse.create({ - objectId: fetchState.objectId, - vertexHash: fetchState.vertexHash, - aclState: serializeStateMessage(aclState), - drpState: serializeStateMessage(drpState), - }); - - const responseMessage = Message.create({ - sender: node.networkNode.peerId, - type: MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE, - data: FetchStateResponse.encode(response).finish(), - }); - node.networkNode.sendMessage(message.sender, responseMessage).catch((e) => { - log.error("::fetchStateHandler: Error sending message", e); - }); + const handler = messageHandlers.get(message.type); + if (!handler) { + log.error("::messageHandler: Invalid operation"); + return; + } + const result = handler({ node, message, stream }); + if (result instanceof Promise) { + await result; } } -class FetchStateResponseHandler implements IHandlerStrategy { - handle({ node, message }: HandleParams) { - const { data } = message; - const fetchStateResponse = FetchStateResponse.decode(data); - if (!fetchStateResponse.drpState && !fetchStateResponse.aclState) { - log.error("::fetchStateResponseHandler: No state found"); - } - const object = node.objectStore.get(fetchStateResponse.objectId); - if (!object) { - log.error("::fetchStateResponseHandler: Object not found"); - return; - } - if (!object.acl) { - log.error("::fetchStateResponseHandler: ACL not found"); - return; - } - - const aclState = deserializeStateMessage(fetchStateResponse.aclState); - const drpState = deserializeStateMessage(fetchStateResponse.drpState); - if (fetchStateResponse.vertexHash === HashGraph.rootHash) { - const state = aclState; - object.aclStates.set(fetchStateResponse.vertexHash, state); - for (const e of state.state) { - if (object.originalObjectACL) object.originalObjectACL[e.key] = e.value; - (object.acl as ACL)[e.key] = e.value; - } - node.objectStore.put(object.id, object); - return; - } +const messageHandlers: Map<MessageType, IHandlerStrategy> = new Map([ + [MessageType.MESSAGE_TYPE_FETCH_STATE, fetchStateHandler], + [MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE, fetchStateResponseHandler], + [MessageType.MESSAGE_TYPE_UPDATE, updateHandler], + [MessageType.MESSAGE_TYPE_SYNC, syncHandler], + [MessageType.MESSAGE_TYPE_SYNC_ACCEPT, syncAcceptHandler], + [MessageType.MESSAGE_TYPE_SYNC_REJECT, syncRejectHandler], + [MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE, attestationUpdateHandler], +]); - if (fetchStateResponse.aclState) { - object.aclStates.set(fetchStateResponse.vertexHash, aclState as DRPState); - } - if (fetchStateResponse.drpState) { - object.drpStates.set(fetchStateResponse.vertexHash, drpState as DRPState); - } +function fetchStateHandler({ node, message }: HandleParams): ReturnType<IHandlerStrategy> { + const { data, sender } = message; + const fetchState = FetchState.decode(data); + const drpObject = node.objectStore.get(fetchState.objectId); + if (!drpObject) { + log.error("::fetchStateHandler: Object not found"); + return; } -} -class UpdateHandler implements IHandlerStrategy { - /* - data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } - operations array doesn't contain the full remote operations array - */ - async handle({ node, message }: HandleParams) { - const { sender, data } = message; - const updateMessage = Update.decode(data); - const object = node.objectStore.get(updateMessage.objectId); - if (!object) { - log.error("::updateHandler: Object not found"); - return; - } - - let verifiedVertices: Vertex[] = []; - if ((object.acl as ACL).permissionless) { - verifiedVertices = updateMessage.vertices; - } else { - verifiedVertices = await verifyACLIncomingVertices(updateMessage.vertices); - } + const aclState = drpObject.aclStates.get(fetchState.vertexHash); + const drpState = drpObject.drpStates.get(fetchState.vertexHash); + const response = FetchStateResponse.create({ + objectId: fetchState.objectId, + vertexHash: fetchState.vertexHash, + aclState: serializeStateMessage(aclState), + drpState: serializeStateMessage(drpState), + }); - const [merged, _] = object.merge(verifiedVertices); + const messageFetchStateResponse = Message.create({ + sender: node.networkNode.peerId, + type: MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE, + data: FetchStateResponse.encode(response).finish(), + }); + node.networkNode.sendMessage(sender, messageFetchStateResponse).catch((e) => { + log.error("::fetchStateHandler: Error sending message", e); + }); +} - if (!merged) { - await node.syncObject(updateMessage.objectId, sender); - } else { - // add their signatures - object.finalityStore.addSignatures(sender, updateMessage.attestations); - - // add my signatures - const attestations = signFinalityVertices(node, object, verifiedVertices); - - if (attestations.length !== 0) { - // broadcast the attestations - const message = Message.create({ - sender: node.networkNode.peerId, - type: MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE, - data: AttestationUpdate.encode( - AttestationUpdate.create({ - objectId: object.id, - attestations: attestations, - }) - ).finish(), - }); +function fetchStateResponseHandler({ node, message }: HandleParams): ReturnType<IHandlerStrategy> { + const { data } = message; + const fetchStateResponse = FetchStateResponse.decode(data); + if (!fetchStateResponse.drpState && !fetchStateResponse.aclState) { + log.error("::fetchStateResponseHandler: No state found"); + } + const object = node.objectStore.get(fetchStateResponse.objectId); + if (!object) { + log.error("::fetchStateResponseHandler: Object not found"); + return; + } + if (!object.acl) { + log.error("::fetchStateResponseHandler: ACL not found"); + return; + } - node.networkNode.broadcastMessage(object.id, message).catch((e) => { - log.error("::updateHandler: Error broadcasting message", e); - }); - } + const aclState = deserializeStateMessage(fetchStateResponse.aclState); + const drpState = deserializeStateMessage(fetchStateResponse.drpState); + if (fetchStateResponse.vertexHash === HashGraph.rootHash) { + const state = aclState; + object.aclStates.set(fetchStateResponse.vertexHash, state); + for (const e of state.state) { + if (object.originalObjectACL) object.originalObjectACL[e.key] = e.value; + (object.acl as ACL)[e.key] = e.value; } - node.objectStore.put(object.id, object); - return; } -} - -class SyncHandler implements IHandlerStrategy { - /* - data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } - operations array contain the full remote operations array - */ - async handle({ node, message, stream }: HandleParams) { - if (!stream) { - log.error("::messageSyncHandler: Stream is undefined"); - return; - } - const { sender, data } = message; - // (might send reject) <- TODO: when should we reject? - const syncMessage = Sync.decode(data); - const object = node.objectStore.get(syncMessage.objectId); - if (!object) { - log.error("::syncHandler: Object not found"); - return; - } - - await signGeneratedVertices(node, object.vertices); - - const requested: Set<Vertex> = new Set(object.vertices); - const requesting: string[] = []; - for (const h of syncMessage.vertexHashes) { - const vertex = object.vertices.find((v) => v.hash === h); - if (vertex) { - requested.delete(vertex); - } else { - requesting.push(h); - } - } - - if (requested.size === 0 && requesting.length === 0) return; - const attestations = getAttestations(object, [...requested]); + if (fetchStateResponse.aclState) { + object.aclStates.set(fetchStateResponse.vertexHash, aclState as DRPState); + } + if (fetchStateResponse.drpState) { + object.drpStates.set(fetchStateResponse.vertexHash, drpState as DRPState); + } +} - const messageSyncAccept = Message.create({ - sender: node.networkNode.peerId, - type: MessageType.MESSAGE_TYPE_SYNC_ACCEPT, - // add data here - data: SyncAccept.encode( - SyncAccept.create({ - objectId: object.id, - requested: [...requested], - attestations, - requesting, - }) - ).finish(), - }); +function attestationUpdateHandler({ node, message }: HandleParams): ReturnType<IHandlerStrategy> { + const { data, sender } = message; + const attestationUpdate = AttestationUpdate.decode(data); + const object = node.objectStore.get(attestationUpdate.objectId); + if (!object) { + log.error("::attestationUpdateHandler: Object not found"); + return; + } - node.networkNode.sendMessage(sender, messageSyncAccept).catch((e) => { - log.error("::syncHandler: Error sending message", e); - }); + if ((object.acl as ACL).query_isFinalitySigner(sender)) { + object.finalityStore.addSignatures(sender, attestationUpdate.attestations); } } -class SyncAcceptHandler implements IHandlerStrategy { - /* - data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } - operations array contain the full remote operations array - */ - async handle({ node, message, stream }: HandleParams) { - if (!stream) { - log.error("::messageSyncAcceptHandler: Stream is undefined"); - return; - } - const { data, sender } = message; - const syncAcceptMessage = SyncAccept.decode(data); - const object = node.objectStore.get(syncAcceptMessage.objectId); - if (!object) { - log.error("::syncAcceptHandler: Object not found"); - return; - } - - let verifiedVertices: Vertex[] = []; - if ((object.acl as ACL).permissionless) { - verifiedVertices = syncAcceptMessage.requested; - } else { - verifiedVertices = await verifyACLIncomingVertices(syncAcceptMessage.requested); - } +/* + data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } + operations array doesn't contain the full remote operations array +*/ +async function updateHandler({ node, message }: HandleParams): Promise<void> { + const { sender, data } = message; - if (verifiedVertices.length !== 0) { - object.merge(verifiedVertices); - object.finalityStore.mergeSignatures(syncAcceptMessage.attestations); - node.objectStore.put(object.id, object); - } + const updateMessage = Update.decode(data); + const object = node.objectStore.get(updateMessage.objectId); + if (!object) { + log.error("::updateHandler: Object not found"); + return; + } - await signGeneratedVertices(node, object.vertices); - signFinalityVertices(node, object, object.vertices); + let verifiedVertices: Vertex[] = []; + if ((object.acl as ACL).permissionless) { + verifiedVertices = updateMessage.vertices; + } else { + verifiedVertices = await verifyACLIncomingVertices(updateMessage.vertices); + } - // send missing vertices - const requested: Vertex[] = []; - for (const h of syncAcceptMessage.requesting) { - const vertex = object.vertices.find((v) => v.hash === h); - if (vertex) { - requested.push(vertex); - } + const [merged, _] = object.merge(verifiedVertices); + + if (!merged) { + await node.syncObject(updateMessage.objectId, sender); + } else { + // add their signatures + object.finalityStore.addSignatures(sender, updateMessage.attestations); + + // add my signatures + const attestations = signFinalityVertices(node, object, verifiedVertices); + + if (attestations.length !== 0) { + // broadcast the attestations + const message = Message.create({ + sender: node.networkNode.peerId, + type: MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE, + data: AttestationUpdate.encode( + AttestationUpdate.create({ + objectId: object.id, + attestations: attestations, + }) + ).finish(), + }); + + node.networkNode.broadcastMessage(object.id, message).catch((e) => { + log.error("::updateHandler: Error broadcasting message", e); + }); } - - if (requested.length === 0) return; - - const attestations = getAttestations(object, requested); - - const messageSyncAccept = Message.create({ - sender: node.networkNode.peerId, - type: MessageType.MESSAGE_TYPE_SYNC_ACCEPT, - data: SyncAccept.encode( - SyncAccept.create({ - objectId: object.id, - requested, - attestations, - requesting: [], - }) - ).finish(), - }); - node.networkNode.sendMessage(sender, messageSyncAccept).catch((e) => { - log.error("::syncAcceptHandler: Error sending message", e); - }); } + + node.objectStore.put(object.id, object); } -class SyncRejectHandler implements IHandlerStrategy { - /* data: { id: string } */ - async handle() { - // TODO: handle reject. Possible actions: - // - Retry sync - // - Ask sync from another peer - // - Do nothing +/* + data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } + operations array contain the full remote operations array +*/ +async function syncHandler({ node, message, stream }: HandleParams): Promise<void> { + if (!stream) { + log.error("::syncHandler: Stream is undefined"); + return; + } + const { sender, data } = message; + // (might send reject) <- TODO: when should we reject? + const syncMessage = Sync.decode(data); + const object = node.objectStore.get(syncMessage.objectId); + if (!object) { + log.error("::syncHandler: Object not found"); + return; } -} -class AttestationUpdateHandler implements IHandlerStrategy { - async handle({ node, message }: HandleParams) { - const attestationUpdate = AttestationUpdate.decode(message.data); - const object = node.objectStore.get(attestationUpdate.objectId); - if (!object) { - log.error("::attestationUpdateHandler: Object not found"); - return; - } + await signGeneratedVertices(node, object.vertices); - if ((object.acl as ACL).query_isFinalitySigner(message.sender)) { - object.finalityStore.addSignatures(message.sender, attestationUpdate.attestations); + const requested: Set<Vertex> = new Set(object.vertices); + const requesting: string[] = []; + for (const h of syncMessage.vertexHashes) { + const vertex = object.vertices.find((v) => v.hash === h); + if (vertex) { + requested.delete(vertex); + } else { + requesting.push(h); } } -} -const messageHandlers: Map<MessageType, IHandlerStrategy> = new Map([ - [MessageType.MESSAGE_TYPE_FETCH_STATE, new FetchStateHandler()], - [MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE, new FetchStateResponseHandler()], - [MessageType.MESSAGE_TYPE_UPDATE, new UpdateHandler()], - [MessageType.MESSAGE_TYPE_SYNC, new SyncHandler()], - [MessageType.MESSAGE_TYPE_SYNC_ACCEPT, new SyncAcceptHandler()], - [MessageType.MESSAGE_TYPE_SYNC_REJECT, new SyncRejectHandler()], - [MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE, new AttestationUpdateHandler()], -]); + if (requested.size === 0 && requesting.length === 0) return; + + const attestations = getAttestations(object, [...requested]); + + const messageSyncAccept = Message.create({ + sender: node.networkNode.peerId, + type: MessageType.MESSAGE_TYPE_SYNC_ACCEPT, + // add data here + data: SyncAccept.encode( + SyncAccept.create({ + objectId: object.id, + requested: [...requested], + attestations, + requesting, + }) + ).finish(), + }); + + node.networkNode.sendMessage(sender, messageSyncAccept).catch((e) => { + log.error("::syncHandler: Error sending message", e); + }); +} /* - Handler for all DRP messages, including pubsub messages and direct messages - You need to setup stream xor data + data: { id: string, operations: {nonce: string, fn: string, args: string[] }[] } + operations array contain the full remote operations array */ -export async function drpMessagesHandler( - node: DRPNode, - stream?: Stream, - data?: Uint8Array -): Promise<void> { - let message: Message; - try { - if (stream) { - const byteArray = await streamToUint8Array(stream); - message = Message.decode(byteArray); - } else if (data) { - message = Message.decode(data); - } else { - log.error("::messageHandler: Stream and data are undefined"); - return; - } - } catch (err) { - log.error("::messageHandler: Error decoding message", err); +async function syncAcceptHandler({ node, message, stream }: HandleParams): Promise<void> { + if (!stream) { + log.error("::syncAcceptHandler: Stream is undefined"); return; } - - const handler = messageHandlers.get(message.type); - if (!handler) { - log.error("::messageHandler: Invalid operation"); + const { data, sender } = message; + const syncAcceptMessage = SyncAccept.decode(data); + const object = node.objectStore.get(syncAcceptMessage.objectId); + if (!object) { + log.error("::syncAcceptHandler: Object not found"); return; } - const result = handler.handle({ node, message, stream }); - if (result instanceof Promise) { - await result; + + let verifiedVertices: Vertex[] = []; + if ((object.acl as ACL).permissionless) { + verifiedVertices = syncAcceptMessage.requested; + } else { + verifiedVertices = await verifyACLIncomingVertices(syncAcceptMessage.requested); + } + + if (verifiedVertices.length !== 0) { + object.merge(verifiedVertices); + object.finalityStore.mergeSignatures(syncAcceptMessage.attestations); + node.objectStore.put(object.id, object); } + + await signGeneratedVertices(node, object.vertices); + signFinalityVertices(node, object, object.vertices); + + // send missing vertices + const requested: Vertex[] = []; + for (const h of syncAcceptMessage.requesting) { + const vertex = object.vertices.find((v) => v.hash === h); + if (vertex) { + requested.push(vertex); + } + } + + if (requested.length === 0) return; + + const attestations = getAttestations(object, requested); + + const messageSyncAccept = Message.create({ + sender: node.networkNode.peerId, + type: MessageType.MESSAGE_TYPE_SYNC_ACCEPT, + data: SyncAccept.encode( + SyncAccept.create({ + objectId: object.id, + requested, + attestations, + requesting: [], + }) + ).finish(), + }); + node.networkNode.sendMessage(sender, messageSyncAccept).catch((e) => { + log.error("::syncAcceptHandler: Error sending message", e); + }); +} + +/* data: { id: string } */ +function syncRejectHandler({ node, message }: HandleParams): ReturnType<IHandlerStrategy> { + // TODO: handle reject. Possible actions: + // - Retry sync + // - Ask sync from another peer + // - Do nothing } export function drpObjectChangesHandler( From adb6deb171f1d766212eb9259d7894e8039a67ce Mon Sep 17 00:00:00 2001 From: anhnd350309 <anhnd350309@gmail.com> Date: Thu, 6 Mar 2025 18:07:06 +0700 Subject: [PATCH 5/8] feat: update with underscore --- packages/node/src/handlers.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/node/src/handlers.ts b/packages/node/src/handlers.ts index fa47c6e90..51d204d0c 100644 --- a/packages/node/src/handlers.ts +++ b/packages/node/src/handlers.ts @@ -331,7 +331,7 @@ async function syncAcceptHandler({ node, message, stream }: HandleParams): Promi } /* data: { id: string } */ -function syncRejectHandler({ node, message }: HandleParams): ReturnType<IHandlerStrategy> { +function syncRejectHandler(_handleParams: HandleParams): ReturnType<IHandlerStrategy> { // TODO: handle reject. Possible actions: // - Retry sync // - Ask sync from another peer From 59d84b546210ed932870cd7c566b1223baf6ea28 Mon Sep 17 00:00:00 2001 From: Sacha Froment <sfroment42@gmail.com> Date: Thu, 6 Mar 2025 14:37:48 +0100 Subject: [PATCH 6/8] chore: move to top Signed-off-by: Sacha Froment <sfroment42@gmail.com> --- packages/node/src/handlers.ts | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/packages/node/src/handlers.ts b/packages/node/src/handlers.ts index 51d204d0c..5e9d570ef 100644 --- a/packages/node/src/handlers.ts +++ b/packages/node/src/handlers.ts @@ -34,10 +34,20 @@ interface IHandlerStrategy { (handleParams: HandleParams): Promise<void> | void; } -/* - Handler for all DRP messages, including pubsub messages and direct messages - You need to setup stream xor data -*/ +const messageHandlers: Map<MessageType, IHandlerStrategy> = new Map([ + [MessageType.MESSAGE_TYPE_FETCH_STATE, fetchStateHandler], + [MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE, fetchStateResponseHandler], + [MessageType.MESSAGE_TYPE_UPDATE, updateHandler], + [MessageType.MESSAGE_TYPE_SYNC, syncHandler], + [MessageType.MESSAGE_TYPE_SYNC_ACCEPT, syncAcceptHandler], + [MessageType.MESSAGE_TYPE_SYNC_REJECT, syncRejectHandler], + [MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE, attestationUpdateHandler], +]); + +/** + * Handler for all DRP messages, including pubsub messages and direct messages + * You need to setup stream xor data + */ export async function drpMessagesHandler( node: DRPNode, stream?: Stream, @@ -70,16 +80,6 @@ export async function drpMessagesHandler( } } -const messageHandlers: Map<MessageType, IHandlerStrategy> = new Map([ - [MessageType.MESSAGE_TYPE_FETCH_STATE, fetchStateHandler], - [MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE, fetchStateResponseHandler], - [MessageType.MESSAGE_TYPE_UPDATE, updateHandler], - [MessageType.MESSAGE_TYPE_SYNC, syncHandler], - [MessageType.MESSAGE_TYPE_SYNC_ACCEPT, syncAcceptHandler], - [MessageType.MESSAGE_TYPE_SYNC_REJECT, syncRejectHandler], - [MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE, attestationUpdateHandler], -]); - function fetchStateHandler({ node, message }: HandleParams): ReturnType<IHandlerStrategy> { const { data, sender } = message; const fetchState = FetchState.decode(data); From bb8add9fca0171aec2daaf0b2e35ed7d341ad1c0 Mon Sep 17 00:00:00 2001 From: Sacha Froment <sfroment42@gmail.com> Date: Thu, 6 Mar 2025 14:42:06 +0100 Subject: [PATCH 7/8] chore: use a record Signed-off-by: Sacha Froment <sfroment42@gmail.com> --- packages/node/src/handlers.ts | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/packages/node/src/handlers.ts b/packages/node/src/handlers.ts index 5e9d570ef..9151a9326 100644 --- a/packages/node/src/handlers.ts +++ b/packages/node/src/handlers.ts @@ -34,15 +34,18 @@ interface IHandlerStrategy { (handleParams: HandleParams): Promise<void> | void; } -const messageHandlers: Map<MessageType, IHandlerStrategy> = new Map([ - [MessageType.MESSAGE_TYPE_FETCH_STATE, fetchStateHandler], - [MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE, fetchStateResponseHandler], - [MessageType.MESSAGE_TYPE_UPDATE, updateHandler], - [MessageType.MESSAGE_TYPE_SYNC, syncHandler], - [MessageType.MESSAGE_TYPE_SYNC_ACCEPT, syncAcceptHandler], - [MessageType.MESSAGE_TYPE_SYNC_REJECT, syncRejectHandler], - [MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE, attestationUpdateHandler], -]); +const messageHandlers: Record<MessageType, IHandlerStrategy> = { + [MessageType.MESSAGE_TYPE_UNSPECIFIED]: () => {}, + [MessageType.MESSAGE_TYPE_FETCH_STATE]: fetchStateHandler, + [MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE]: fetchStateResponseHandler, + [MessageType.MESSAGE_TYPE_UPDATE]: updateHandler, + [MessageType.MESSAGE_TYPE_SYNC]: syncHandler, + [MessageType.MESSAGE_TYPE_SYNC_ACCEPT]: syncAcceptHandler, + [MessageType.MESSAGE_TYPE_SYNC_REJECT]: syncRejectHandler, + [MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE]: attestationUpdateHandler, + [MessageType.MESSAGE_TYPE_CUSTOM]: () => {}, + [MessageType.UNRECOGNIZED]: () => {}, +}; /** * Handler for all DRP messages, including pubsub messages and direct messages @@ -69,7 +72,7 @@ export async function drpMessagesHandler( return; } - const handler = messageHandlers.get(message.type); + const handler = messageHandlers[message.type]; if (!handler) { log.error("::messageHandler: Invalid operation"); return; From 0429d541b17f1b5b193862cbc041381dd26874dc Mon Sep 17 00:00:00 2001 From: Sacha Froment <sfroment42@gmail.com> Date: Thu, 6 Mar 2025 14:46:31 +0100 Subject: [PATCH 8/8] chore: use a record Signed-off-by: Sacha Froment <sfroment42@gmail.com> --- packages/node/src/handlers.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/node/src/handlers.ts b/packages/node/src/handlers.ts index 9151a9326..24d974d68 100644 --- a/packages/node/src/handlers.ts +++ b/packages/node/src/handlers.ts @@ -34,8 +34,8 @@ interface IHandlerStrategy { (handleParams: HandleParams): Promise<void> | void; } -const messageHandlers: Record<MessageType, IHandlerStrategy> = { - [MessageType.MESSAGE_TYPE_UNSPECIFIED]: () => {}, +const messageHandlers: Record<MessageType, IHandlerStrategy | undefined> = { + [MessageType.MESSAGE_TYPE_UNSPECIFIED]: undefined, [MessageType.MESSAGE_TYPE_FETCH_STATE]: fetchStateHandler, [MessageType.MESSAGE_TYPE_FETCH_STATE_RESPONSE]: fetchStateResponseHandler, [MessageType.MESSAGE_TYPE_UPDATE]: updateHandler, @@ -43,8 +43,8 @@ const messageHandlers: Record<MessageType, IHandlerStrategy> = { [MessageType.MESSAGE_TYPE_SYNC_ACCEPT]: syncAcceptHandler, [MessageType.MESSAGE_TYPE_SYNC_REJECT]: syncRejectHandler, [MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE]: attestationUpdateHandler, - [MessageType.MESSAGE_TYPE_CUSTOM]: () => {}, - [MessageType.UNRECOGNIZED]: () => {}, + [MessageType.MESSAGE_TYPE_CUSTOM]: undefined, + [MessageType.UNRECOGNIZED]: undefined, }; /**