diff --git a/package.json b/package.json index c9df07c..b4194b7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@ucla-irl/ndnts-aux", - "version": "1.0.7", + "version": "1.0.8-beta1", "description": "NDNts Auxiliary Package for Web and Deno", "scripts": { "test": "deno test", diff --git a/src/sync-agent/deliveries.ts b/src/sync-agent/deliveries.ts index e575e74..24d86ea 100644 --- a/src/sync-agent/deliveries.ts +++ b/src/sync-agent/deliveries.ts @@ -1,8 +1,9 @@ import { type Endpoint } from '@ndn/endpoint'; import { SvStateVector, SvSync, type SyncNode, type SyncUpdate } from '@ndn/sync'; -import { Data, digestSigning, Interest, Name, Signer, type Verifier } from '@ndn/packet'; +import { Data, digestSigning, Name, Signer, type Verifier } from '@ndn/packet'; import { SequenceNum } from '@ndn/naming-convention2'; import { Decoder, Encoder } from '@ndn/tlv'; +import { fetch as fetchSegments, TcpCubic } from '@ndn/segmented-object'; import { getNamespace } from './namespace.ts'; import { Storage } from '../storage/mod.ts'; import { panic } from '../utils/panic.ts'; @@ -220,21 +221,20 @@ export class AtLeastOnceDelivery extends SyncDelivery { const prefix = getNamespace().baseName(update.id, this.syncPrefix); let lastHandled = update.loSeqNum - 1; // Modify NDNts's segmented object fetching pipeline to fetch sequences. - // See: https://github.com/zjkmxy/ndn-cc-v3/blob/2128cd8614d8116f7d9dfb17f86f71fbac258739/src/lib/backend/main.ts#L88-L106 - for (let i = update.loSeqNum; i <= update.hiSeqNum; i++) { - const name = prefix.append(SequenceNum.create(i)); - try { - const data = await this.endpoint.consume( - new Interest( - name, - Interest.MustBeFresh, - Interest.Lifetime(5000), - ), - { - verifier: this.verifier, - retx: 5, - }, - ); + // fetchSegments is not supposed to be working with sequence numbers, but I can abuse the convention + const continuation = fetchSegments(prefix, { + segmentNumConvention: SequenceNum, + retxLimit: 25, + lifetimeAfterRto: 1000, // The true timeout timer is the RTO + ca: new TcpCubic(), + verifier: this.verifier, + }); + try { + for await (const data of continuation) { + const i = data.name.get(data.name.length - 1)?.as(SequenceNum); + if (i !== lastHandled + 1) { + throw new Error(`[FATAL] sync update error: seq=${i} is processed before seq=${lastHandled + 1}`); + } // Put into storage // Note: even though endpoint.consume does not give me the raw Data packet, @@ -248,23 +248,24 @@ export class AtLeastOnceDelivery extends SyncDelivery { // Mark as persist lastHandled = i; - } catch (error) { - // TODO: Find a better way to handle this - console.error(`Unable to fetch or verify ${name.toString()} due to: `, error); - console.warn('The current SVS protocol cannot recover from this error. A reset will be triggered'); - - this._syncInst?.close(); - this._syncNode = undefined; - - if (this._onReset) { - this._onReset(); - } else { - this.reset(); - } + } + } catch (error) { + // TODO: Find a better way to handle this + console.error(`Unable to fetch or verify ${name.toString()} due to: `, error); + console.warn('The current SVS protocol cannot recover from this error. A reset will be triggered'); + + this._syncInst?.close(); + this._syncNode = undefined; - return; + if (this._onReset) { + this._onReset(); + } else { + this.reset(); } + + return; } + // Putting this out of the loop makes it not exactly once: // If the application is down before all messages in the update is handled, // some may be redelivered the next time the application starts.