Skip to content

Commit

Permalink
Use pipeline to fetch sync publication
Browse files Browse the repository at this point in the history
  • Loading branch information
zjkmxy committed Jan 23, 2024
1 parent 96f7b38 commit b017893
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 31 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@ucla-irl/ndnts-aux",
"version": "1.0.7",
"version": "1.0.8-beta1",
"description": "NDNts Auxiliary Package for Web and Deno",
"scripts": {
"test": "deno test",
Expand Down
61 changes: 31 additions & 30 deletions src/sync-agent/deliveries.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { type Endpoint } from '@ndn/endpoint';
import { SvStateVector, SvSync, type SyncNode, type SyncUpdate } from '@ndn/sync';
import { Data, digestSigning, Interest, Name, Signer, type Verifier } from '@ndn/packet';
import { Data, digestSigning, Name, Signer, type Verifier } from '@ndn/packet';
import { SequenceNum } from '@ndn/naming-convention2';
import { Decoder, Encoder } from '@ndn/tlv';
import { fetch as fetchSegments, TcpCubic } from '@ndn/segmented-object';
import { getNamespace } from './namespace.ts';
import { Storage } from '../storage/mod.ts';
import { panic } from '../utils/panic.ts';
Expand Down Expand Up @@ -220,21 +221,20 @@ export class AtLeastOnceDelivery extends SyncDelivery {
const prefix = getNamespace().baseName(update.id, this.syncPrefix);
let lastHandled = update.loSeqNum - 1;
// Modify NDNts's segmented object fetching pipeline to fetch sequences.
// See: https://github.com/zjkmxy/ndn-cc-v3/blob/2128cd8614d8116f7d9dfb17f86f71fbac258739/src/lib/backend/main.ts#L88-L106
for (let i = update.loSeqNum; i <= update.hiSeqNum; i++) {
const name = prefix.append(SequenceNum.create(i));
try {
const data = await this.endpoint.consume(
new Interest(
name,
Interest.MustBeFresh,
Interest.Lifetime(5000),
),
{
verifier: this.verifier,
retx: 5,
},
);
// fetchSegments is not supposed to be working with sequence numbers, but I can abuse the convention
const continuation = fetchSegments(prefix, {
segmentNumConvention: SequenceNum,
retxLimit: 25,
lifetimeAfterRto: 1000, // The true timeout timer is the RTO
ca: new TcpCubic(),
verifier: this.verifier,
});
try {
for await (const data of continuation) {
const i = data.name.get(data.name.length - 1)?.as(SequenceNum);
if (i !== lastHandled + 1) {
throw new Error(`[FATAL] sync update error: seq=${i} is processed before seq=${lastHandled + 1}`);
}

// Put into storage
// Note: even though endpoint.consume does not give me the raw Data packet,
Expand All @@ -248,23 +248,24 @@ export class AtLeastOnceDelivery extends SyncDelivery {

// Mark as persist
lastHandled = i;
} catch (error) {
// TODO: Find a better way to handle this
console.error(`Unable to fetch or verify ${name.toString()} due to: `, error);
console.warn('The current SVS protocol cannot recover from this error. A reset will be triggered');

this._syncInst?.close();
this._syncNode = undefined;

if (this._onReset) {
this._onReset();
} else {
this.reset();
}
}
} catch (error) {
// TODO: Find a better way to handle this
console.error(`Unable to fetch or verify ${name.toString()} due to: `, error);
console.warn('The current SVS protocol cannot recover from this error. A reset will be triggered');

this._syncInst?.close();
this._syncNode = undefined;

return;
if (this._onReset) {
this._onReset();
} else {
this.reset();
}

return;
}

// Putting this out of the loop makes it not exactly once:
// If the application is down before all messages in the update is handled,
// some may be redelivered the next time the application starts.
Expand Down

0 comments on commit b017893

Please sign in to comment.