Skip to content

Commit

Permalink
svs: add suffix after nodeID as demuxer
Browse files Browse the repository at this point in the history
  • Loading branch information
zjkmxy committed Dec 8, 2024
1 parent 3a44f5e commit 986fe73
Show file tree
Hide file tree
Showing 9 changed files with 164 additions and 193 deletions.
3 changes: 2 additions & 1 deletion deno.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
"build.ts"
],
"rules": {
"tags": ["recommended"],
"include": [
"deprecated"
"no-deprecated-deno-api"
]
}
},
Expand Down
4 changes: 2 additions & 2 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ucla-irl/ndnts-aux",
"version": "4.0.1",
"version": "4.0.2",
"description": "NDNts Auxiliary Package for Web and Deno",
"scripts": {
"test": "deno test --no-check",
Expand All @@ -25,8 +25,8 @@
"eventemitter3": "^5.0.1",
"jose": "^5.9.6",
"tslib": "^2.8.1",
"type-fest": "^4.27.0",
"uuid": "^10.0.0",
"type-fest": "^4.30.0",
"uuid": "^11.0.3",
"wait-your-turn": "^1.0.1",
"y-protocols": "^1.0.6",
"yjs": "^13.6.20"
Expand Down
271 changes: 120 additions & 151 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/namespace/leaf-node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export class LeafNode extends ExpressingPoint {
public readonly onSaveStorage = new EventChain<LeafNodeEvents['saveStorage']>();

constructor(
public readonly config: LeafNodeOpts,
public override readonly config: LeafNodeOpts,
describe?: string,
) {
super(config, describe);
Expand Down
24 changes: 12 additions & 12 deletions src/sync-agent/deliveries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -221,14 +221,14 @@ export abstract class SyncDelivery implements AsyncDisposable {
// Note: storage is not necessarily a real storage.
export class AtLeastOnceDelivery extends SyncDelivery {
constructor(
readonly nodeId: Name,
readonly fw: Forwarder,
readonly syncPrefix: Name,
readonly signer: Signer,
readonly verifier: Verifier,
override readonly nodeId: Name,
override readonly fw: Forwarder,
override readonly syncPrefix: Name,
override readonly signer: Signer,
override readonly verifier: Verifier,
readonly storage: Storage,
onUpdatePromise: Promise<UpdateEvent>,
protected state?: StateVector,
protected override state?: StateVector,
) {
super(nodeId, fw, syncPrefix, signer, verifier, onUpdatePromise, state);
}
Expand Down Expand Up @@ -408,15 +408,15 @@ export class AtLeastOnceDelivery extends SyncDelivery {
// This delivery does not persists anything.
export class LatestOnlyDelivery extends SyncDelivery {
constructor(
readonly nodeId: Name,
readonly fw: Forwarder,
readonly syncPrefix: Name,
readonly signer: Signer,
readonly verifier: Verifier,
override readonly nodeId: Name,
override readonly fw: Forwarder,
override readonly syncPrefix: Name,
override readonly signer: Signer,
override readonly verifier: Verifier,
readonly pktStorage: Storage,
readonly stateStorage: Storage,
readonly onUpdatePromise: Promise<UpdateEvent>,
protected state?: StateVector,
protected override state?: StateVector,
) {
super(nodeId, fw, syncPrefix, signer, verifier, onUpdatePromise, state);
}
Expand Down
32 changes: 16 additions & 16 deletions src/sync-agent/delivery-alo.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class DeliveryTester implements AsyncDisposable {
this.stores = Array.from({ length: svsCount }, (_, i) => {
const store = new InMemoryStorage();
this.#closers.use(store);
const responder = new Responder(name`/test/32=node/${i}`, this.fwAB, store);
const responder = new Responder(name`/test/32=node/${i}/t=0`, this.fwAB, store);
this.#closers.use(responder);
return store;
});
Expand All @@ -43,7 +43,7 @@ class DeliveryTester implements AsyncDisposable {
async start(timeoutMs: number, signer: Signer = digestSigning, verifier: Verifier = digestSigning) {
for (let i = 0; this.svsCount > i; i++) {
const alo = await AtLeastOnceDelivery.create(
name`/test/32=node/${i}`,
name`/test/32=node/${i}/t=0`,
this.fwAB,
this.syncPrefix,
signer,
Expand All @@ -65,7 +65,7 @@ class DeliveryTester implements AsyncDisposable {

async onUpdate(content: Uint8Array, name: Name, instance: SyncDelivery) {
const receiver = this.alos.findIndex((v) => v === instance);
const origin = name.at(name.length - 1).as(GenericNumber);
const origin = name.at(name.length - 2).as(GenericNumber);
const evt = { content, origin, receiver };
this.events.push(evt);
await this.updateEvent?.(evt, this);
Expand All @@ -85,7 +85,7 @@ class DeliveryTester implements AsyncDisposable {

async dispositData(id: number, seq: number, content: Uint8Array) {
const data = new Data(
name`/test/32=node/${id}/seq=${seq}`,
name`/test/32=node/${id}/t=0/seq=${seq}`,
Data.FreshnessPeriod(60000),
content,
);
Expand Down Expand Up @@ -170,7 +170,7 @@ Deno.test('Alo.2 No missing due to out-of-order', async () => {

await stopSignal1;
// For now, the state must not be set
assert.assertEquals(tester.alos[0].syncState.get(name`/test/32=node/${1}`), 0);
assert.assertEquals(tester.alos[0].syncState.get(name`/test/32=node/${1}/t=0`), 0);
// But the data should be delivered
assert.assertEquals(tester.events.length, 2);

Expand All @@ -184,7 +184,7 @@ Deno.test('Alo.2 No missing due to out-of-order', async () => {
eventSet = tester.events;

// At last, the state should be updated
assert.assertEquals(tester.alos[0].syncState.get(name`/test/32=node/${1}`), 4);
assert.assertEquals(tester.alos[0].syncState.get(name`/test/32=node/${1}/t=0`), 4);
}

// Since it is unordered, we have to sort
Expand Down Expand Up @@ -242,15 +242,15 @@ Deno.test('Alo.2.1 Concurrent onUpdates causing gap in the middle', async () =>

// Call onUpdate for 7-8 and 1-2
await tester.alos[0].handleSyncUpdate(
new SyncUpdate<Name>(tester.alos[0].syncInst!.get(name`/test/32=node/${1}`), 7, 8),
new SyncUpdate<Name>(tester.alos[0].syncInst!.get(name`/test/32=node/${1}/t=0`), 7, 8),
);
await tester.alos[0].handleSyncUpdate(
new SyncUpdate<Name>(tester.alos[0].syncInst!.get(name`/test/32=node/${1}`), 1, 2),
new SyncUpdate<Name>(tester.alos[0].syncInst!.get(name`/test/32=node/${1}/t=0`), 1, 2),
);

await stopSignal1;
// For now, the state must be in the middle
assert.assertEquals(tester.alos[0].syncState.get(name`/test/32=node/${1}`), 2);
assert.assertEquals(tester.alos[0].syncState.get(name`/test/32=node/${1}/t=0`), 2);
// But the data should be delivered
assert.assertEquals(tester.events.length, 4);

Expand All @@ -259,32 +259,32 @@ Deno.test('Alo.2.1 Concurrent onUpdates causing gap in the middle', async () =>
await tester.dispositData(1, 5, new TextEncoder().encode('E'));
// Call onUpdate on each of them
await tester.alos[0].handleSyncUpdate(
new SyncUpdate<Name>(tester.alos[0].syncInst!.get(name`/test/32=node/${1}`), 3, 3),
new SyncUpdate<Name>(tester.alos[0].syncInst!.get(name`/test/32=node/${1}/t=0`), 3, 3),
);
await tester.alos[0].handleSyncUpdate(
new SyncUpdate<Name>(tester.alos[0].syncInst!.get(name`/test/32=node/${1}`), 5, 5),
new SyncUpdate<Name>(tester.alos[0].syncInst!.get(name`/test/32=node/${1}/t=0`), 5, 5),
);

await stopSignal2;
// For now, the state must move by 1
assert.assertEquals(tester.alos[0].syncState.get(name`/test/32=node/${1}`), 3);
assert.assertEquals(tester.alos[0].syncState.get(name`/test/32=node/${1}/t=0`), 3);

// Finally make up all missing data.
await tester.dispositData(1, 4, new TextEncoder().encode('D'));
await tester.dispositData(1, 6, new TextEncoder().encode('F'));
// Call onUpdate on each of them
await tester.alos[0].handleSyncUpdate(
new SyncUpdate<Name>(tester.alos[0].syncInst!.get(name`/test/32=node/${1}`), 4, 4),
new SyncUpdate<Name>(tester.alos[0].syncInst!.get(name`/test/32=node/${1}/t=0`), 4, 4),
);
await tester.alos[0].handleSyncUpdate(
new SyncUpdate<Name>(tester.alos[0].syncInst!.get(name`/test/32=node/${1}`), 6, 6),
new SyncUpdate<Name>(tester.alos[0].syncInst!.get(name`/test/32=node/${1}/t=0`), 6, 6),
);

await stopSignal3;
eventSet = tester.events;

// At last, the state should be updated
assert.assertEquals(tester.alos[0].syncState.get(name`/test/32=node/${1}`), 8);
assert.assertEquals(tester.alos[0].syncState.get(name`/test/32=node/${1}/t=0`), 8);
}

// Since it is unordered, we have to sort
Expand Down Expand Up @@ -379,7 +379,7 @@ Deno.test('Alo.3 Recover after shutdown', async () => {

// Restart alo 0. It is supposed to deliver 'C' again.
tester.alos[0] = await AtLeastOnceDelivery.create(
name`/test/32=node/${0}`,
name`/test/32=node/${0}/t=0`,
tester.fwAB,
tester.syncPrefix,
digestSigning,
Expand Down
10 changes: 5 additions & 5 deletions src/sync-agent/namespace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,28 +70,28 @@ function createDefaultNamespace(): SyncAgentNamespace {
const ret = {
/**
* Extract the node ID from the key name of the signer
* @param signerName the signer's name used in KeyLocator. e.g. /ndn-app/alice/KEY/1/NA/XXX
* @param signerName the signer's name used in KeyLocator. e.g. /ndn-app/alice/t=1/KEY/1/NA/XXX
*/
nodeIdFromSigner(signerName: Name): Name {
return signerName.getPrefix(signerName.length - 4);
},
/**
* Extract the application prefix from the key name of the signer
* @param signerName the signer's name used in KeyLocator. e.g. /ndn-app/alice/KEY/1/NA/XXX
* @param signerName the signer's name used in KeyLocator. e.g. /ndn-app/alice/t=1/KEY/1/NA/XXX
*/
appPrefixFromSigner(signerName: Name): Name {
return signerName.getPrefix(signerName.length - 5);
return signerName.getPrefix(signerName.length - 6);
},
appPrefixFromNodeId(nodeId: Name): Name {
return nodeId.getPrefix(nodeId.length - 1);
return nodeId.getPrefix(nodeId.length - 2);
},
latestOnlyKey(pktName: Name): string {
// Remove the sequence number
return pktName.getPrefix(pktName.length - 1).toString();
},
baseName(nodeId: Name, syncPrefix: Name): Name {
// append is side-effect free
const groupPrefix = syncPrefix.slice(nodeId.length - 1); // nodeId.length - 1 is appPrefix
const groupPrefix = syncPrefix.slice(nodeId.length - 2); // nodeId.length - 2 is appPrefix
return nodeId.append(...groupPrefix.comps);
},
syncStateKey(baseName: Name): string {
Expand Down
5 changes: 3 additions & 2 deletions src/sync-agent/sync-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,9 @@ export class SyncAgent implements AsyncDisposable {
snapshotTopic?: string,
) {
const tempStorage = new InMemoryStorage();
// Note: we need the signer name to be /[appPrefix]/<nodeId>/KEY/<keyID>
// TODO: In future we plan to have each device of user named as /[appPrefix]/<nodeId>/<keyID>
// Note: we need the signer name to be /[appPrefix]/<nodeId/demuxer>/KEY/<keyID>
// <demuxer> is a timestamp or device ID which is used to distinguish different instances of the same user.
// It is part of the nodeID.
const appPrefix = getNamespace().appPrefixFromNodeId(nodeId);
// const nodeId = getNamespace().nodeIdFromSigner(signer.name)
const aloSyncPrefix = appPrefix.append(getNamespace().syncKeyword, getNamespace().atLeastOnceKeyword);
Expand Down

0 comments on commit 986fe73

Please sign in to comment.