Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: heartbeat initial version #502

Closed
wants to merge 11 commits into from
4 changes: 4 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,9 @@ max_line_length = 120
[*.md]
max_line_length = 0

[*.json]
indent_size = 2
indent_style = space

[COMMIT_EDITMSG]
max_line_length = 0
11 changes: 10 additions & 1 deletion .prettierrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,14 @@
"trailingComma": "es5",
"bracketSpacing": true,
"arrowParens": "always",
"endOfLine": "auto"
"endOfLine": "auto",
"overrides": [
{
"files": "package.json",
"options": {
"tabWidth": 2,
"useTabs": false
}
}
]
}
117 changes: 59 additions & 58 deletions packages/network/package.json
Original file line number Diff line number Diff line change
@@ -1,61 +1,62 @@
{
"name": "@ts-drp/network",
"version": "0.8.5",
"license": "MIT",
"repository": {
"type": "git",
"url": "git+https://github.com/drp-tech/ts-drp.git"
},
"type": "module",
"types": "./dist/src/index.d.ts",
"files": [
"src",
"dist",
"!dist/test",
"!**/*.tsbuildinfo"
],
"exports": {
".": {
"types": "./dist/src/index.d.ts",
"import": "./dist/src/index.js"
"name": "@ts-drp/network",
"version": "0.8.5",
"license": "MIT",
"repository": {
"type": "git",
"url": "git+https://github.com/drp-tech/ts-drp.git"
},
"type": "module",
"types": "./dist/src/index.d.ts",
"files": [
"src",
"dist",
"!dist/test",
"!**/*.tsbuildinfo"
],
"exports": {
".": {
"types": "./dist/src/index.d.ts",
"import": "./dist/src/index.js"
}
},
"scripts": {
"build": "tsc -b",
"clean": "rm -rf dist/ node_modules/",
"prepack": "tsc -b",
"test": "vitest",
"watch": "tsc -b -w"
},
"devDependencies": {
"race-event": "^1.3.0"
},
"dependencies": {
"@bufbuild/protobuf": "^2.0.0",
"@chainsafe/libp2p-gossipsub": "^14.1.0",
"@chainsafe/libp2p-noise": "^16.0.0",
"@chainsafe/libp2p-yamux": "^7.0.1",
"@libp2p/autonat": "^2.0.6",
"@libp2p/bootstrap": "^11.0.6",
"@libp2p/circuit-relay-v2": "^3.1.6",
"@libp2p/crypto": "^5.0.5",
"@libp2p/dcutr": "^2.0.6",
"@libp2p/devtools-metrics": "^1.1.5",
"@libp2p/interface": "^2.1.3",
"@libp2p/identify": "^3.0.6",
"@libp2p/peer-id": "^5.0.13",
"@libp2p/ping": "2.0.11",
"@libp2p/pubsub-peer-discovery": "^11.0.0",
"@libp2p/webrtc": "^5.0.9",
"@libp2p/websockets": "^9.1.1",
"@libp2p/webtransport": "^5.0.9",
"@multiformats/multiaddr": "^12.3.1",
"@multiformats/multiaddr-matcher": "^1.6.0",
"@ts-drp/logger": "^0.8.5",
"@ts-drp/types": "^0.8.5",
"it-length-prefixed": "^10.0.0",
"it-map": "^3.1.1",
"it-pipe": "^3.0.1",
"libp2p": "^2.1.6",
"uint8arrays": "^5.1.0"
}
},
"scripts": {
"build": "tsc -b",
"clean": "rm -rf dist/ node_modules/",
"prepack": "tsc -b",
"test": "vitest",
"watch": "tsc -b -w"
},
"devDependencies": {
"race-event": "^1.3.0"
},
"dependencies": {
"@bufbuild/protobuf": "^2.0.0",
"@chainsafe/libp2p-gossipsub": "^14.1.0",
"@chainsafe/libp2p-noise": "^16.0.0",
"@chainsafe/libp2p-yamux": "^7.0.1",
"@libp2p/autonat": "^2.0.6",
"@libp2p/bootstrap": "^11.0.6",
"@libp2p/circuit-relay-v2": "^3.1.6",
"@libp2p/crypto": "^5.0.5",
"@libp2p/dcutr": "^2.0.6",
"@libp2p/devtools-metrics": "^1.1.5",
"@libp2p/interface": "^2.1.3",
"@libp2p/identify": "^3.0.6",
"@libp2p/ping": "2.0.11",
"@libp2p/pubsub-peer-discovery": "^11.0.0",
"@libp2p/webrtc": "^5.0.9",
"@libp2p/websockets": "^9.1.1",
"@libp2p/webtransport": "^5.0.9",
"@multiformats/multiaddr": "^12.3.1",
"@multiformats/multiaddr-matcher": "^1.6.0",
"@ts-drp/logger": "^0.8.5",
"@ts-drp/types": "^0.8.5",
"it-length-prefixed": "^10.0.0",
"it-map": "^3.1.1",
"it-pipe": "^3.0.1",
"libp2p": "^2.1.6",
"uint8arrays": "^5.1.0"
}
}
24 changes: 19 additions & 5 deletions packages/network/src/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import type {
Address,
EventCallback,
PeerDiscovery,
PeerId,
Stream,
StreamHandler,
} from "@libp2p/interface";
import { peerIdFromString } from "@libp2p/peer-id";
import { ping } from "@libp2p/ping";
import {
type PubSubPeerDiscoveryComponents,
Expand All @@ -32,12 +34,14 @@ import { type MultiaddrInput, multiaddr } from "@multiformats/multiaddr";
import { WebRTC } from "@multiformats/multiaddr-matcher";
import { Logger, type LoggerOptions } from "@ts-drp/logger";
import { Message } from "@ts-drp/types";
import { DRP_HEARTBEAT_TOPIC } from "@ts-drp/types";
import { type Libp2p, type ServiceFactoryMap, createLibp2p } from "libp2p";

import { uint8ArrayToStream } from "./stream.js";

export * from "./stream.js";

export const DRP_DISCOVERY_TOPIC = "drp::discovery";
export const DRP_MESSAGE_PROTOCOL = "/drp/message/0.0.1";
export const BOOTSTRAP_NODES = [
"/dns4/bootstrap1.topology.gg/tcp/443/wss/p2p/12D3KooWBu1pZ3v2u6tXSmkN35kiMLENpv3bEXcyT1GJTVhipAkG",
Expand Down Expand Up @@ -88,7 +92,7 @@ export class DRPNetworkNode {

const _peerDiscovery: Array<PeerDiscoveryFunction> = [
pubsubPeerDiscovery({
topics: ["drp::discovery"],
topics: [DRP_DISCOVERY_TOPIC],
interval: this._config?.pubsub?.peer_discovery_interval || 5000,
}),
];
Expand Down Expand Up @@ -231,7 +235,8 @@ export class DRPNetworkNode {
);

// needded as I've disabled the pubsubPeerDiscovery
this._pubsub?.subscribe("drp::discovery");
this._pubsub?.subscribe(DRP_DISCOVERY_TOPIC);
this._pubsub?.subscribe(DRP_HEARTBEAT_TOPIC);
}

async stop() {
Expand Down Expand Up @@ -317,16 +322,17 @@ export class DRPNetworkNode {
}
}

async connect(addr: MultiaddrInput) {
async connect(addr: MultiaddrInput | MultiaddrInput[]): Promise<void> {
try {
await this._node?.dial([multiaddr(addr)]);
const multiaddrs = Array.isArray(addr) ? addr.map(multiaddr) : [multiaddr(addr)];
await this._node?.dial(multiaddrs);
log.info("::connect: Successfuly dialed", addr);
} catch (e) {
log.error("::connect:", e);
}
}

async disconnect(peerId: string) {
async disconnect(peerId: string): Promise<void> {
try {
await this._node?.hangUp(multiaddr(`/p2p/${peerId}`));
log.info("::disconnect: Successfuly disconnected", peerId);
Expand Down Expand Up @@ -355,6 +361,14 @@ export class DRPNetworkNode {
return peers.map((peer) => peer.toString());
}

async getPeerMultiaddrs(peerId: PeerId | string): Promise<Address[]> {
const peerIdObj: PeerId = typeof peerId === "string" ? peerIdFromString(peerId) : peerId;

const peer = await this._node?.peerStore.get(peerIdObj);
if (!peer) return [];
return peer.addresses;
}

async broadcastMessage(topic: string, message: Message) {
try {
const messageBuffer = Message.encode(message).finish();
Expand Down
33 changes: 33 additions & 0 deletions packages/node/src/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
} from "@ts-drp/types";
import { fromString as uint8ArrayFromString } from "uint8arrays/from-string";

import { DRPIDHeartbeat } from "./heartbeat/heartbeat.js";
import { type DRPNode, log } from "./index.js";
import { deserializeStateMessage, serializeStateMessage } from "./utils.js";

Expand Down Expand Up @@ -71,12 +72,44 @@ export async function drpMessagesHandler(node: DRPNode, stream?: Stream, data?:
case MessageType.MESSAGE_TYPE_ATTESTATION_UPDATE:
await attestationUpdateHandler(node, message.sender, message.data);
break;
case MessageType.MESSAGE_TYPE_ID_HEARTBEAT_RESPONSE:
await node.heartbeat?.onReceiveHeartbeatResponse(message.sender, message.data);
break;
default:
log.error("::messageHandler: Invalid operation");
break;
}
}

export async function heartbeatHandler(node: DRPNode, stream?: Stream, receivedData?: Uint8Array) {
let message: Message;
try {
if (stream) {
const byteArray = await streamToUint8Array(stream);
message = Message.decode(byteArray);
} else if (receivedData) {
message = Message.decode(receivedData);
} else {
log.error("::heartbeatHandler: Stream and data are undefined");
return;
}
} catch (err) {
log.error("::heartbeatHandler: Error decoding message", err);
return;
}

const { sender, data } = message;

switch (message.type) {
case MessageType.MESSAGE_TYPE_ID_HEARTBEAT:
await DRPIDHeartbeat.onReceiveHeartbeat(sender, data, node.networkNode);
break;
default:
log.error("::heartbeatHandler: Invalid operation");
break;
}
}

function fetchStateHandler(node: DRPNode, sender: string, data: Uint8Array) {
const fetchState = FetchState.decode(data);
const drpObject = node.objectStore.get(fetchState.objectId);
Expand Down
Loading
Loading