Skip to content

Commit

Permalink
Merge pull request #94 from UCLA-IRL/sync
Browse files Browse the repository at this point in the history
Adding sync into Repo
  • Loading branch information
tianyuan129 authored Jul 20, 2024
2 parents 2dcd229 + e38ef0c commit 276aa7e
Show file tree
Hide file tree
Showing 21 changed files with 795 additions and 32 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,8 @@ venv.bak/
.leveldb/
.pytest_cache/
repo.db
NDN_Repo.egg-info/
NDN_Repo.egg-info/

# tianyuan's helper
feature.*
testrepo.*
4 changes: 2 additions & 2 deletions docs/src/specification/specification.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ Specification
Delete <delete>
Check <check>
TCP bulk insert <tcp_bulk>
Security <security>
Joining SVS Sync (Draft) <sync>
Sync Group Join <sync_join>
Sync Group Leave <sync_leave>
23 changes: 23 additions & 0 deletions docs/src/specification/sync_join.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
Sync Join
=========

The sync join protocol is used to command the repo to join a state vector sync group.

1. The repo subscribes to the topic ``/<repo_name>/sync/join``.

2. The client publishes a message to the topic ``/<repo_name>/sync/join``. The message payload is of type
``RepoCommandParam``, containing one or more ``SyncParam`` with the following fields:

* ``sync_prefix``: The name prefix of the sync group to join.
* ``register_prefix``: (Optional) The name prefix for the repo to register with the forwarder. This prefix must not
be the same as ``sync_prefix``.
* ``data_name_dedupe``: (Optional) If true, the repo will deduplicate data names in the sync group.
* ``reset``: (Optional) If true, rebuild state vectors from the stored state vectors on the repo disk. This is useful
if interests are sent for permanently unavailable data from an old vector.

3. The repo joins the sync group, saving sync information to disk.

* The repo will listen for state vector interests for the sync group. Then, to fetch any data, the repo will send
interests following the SVSPS data naming convention. For more information, see the
`specification page <https://named-data.github.io/StateVectorSync/Specification.html>`_ for State Vector
Synchronization.
17 changes: 17 additions & 0 deletions docs/src/specification/sync_leave.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
Sync Leave
==========

The sync leave protocol is used to command the repo to leave the sync group. This command also removes any information
about the sync group from repo storage.

1. The repo subscribes to the topic ``/<repo_name>/sync/leave``.

2. The client publishes a message to the topic ``/<repo_name>/sync/leave``. The message payload is of type
``RepoCommandParam``, containing one or more ``SyncParam`` with the following field:

* ``sync_prefix``: The name prefix of the sync group to leave.

3. The repo leaves the sync group, removing sync information from disk. The repo no longer listens to the originally
specified register prefix.

* Note that any already-stored data packets that were received prior to leaving the sync group are *not* deleted.
41 changes: 41 additions & 0 deletions examples/leavesync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env python3
import argparse
import logging
from ndn.app import NDNApp
from ndn.encoding import Name
from ndn.security import KeychainDigest
from ndn_python_repo.clients import SyncClient
import uuid

async def run_leave_sync_client(app: NDNApp, **kwargs):
client = SyncClient(app=app, prefix=kwargs['client_prefix'], repo_name=kwargs['repo_name'])
await client.leave_sync(sync_prefix=kwargs['sync_prefix'])
app.shutdown()


def main():
parser = argparse.ArgumentParser(description='leavesync')
parser.add_argument('-r', '--repo_name',
required=True, help='Name of repo')
parser.add_argument('--client_prefix', required=True,
help='prefix of this client')
parser.add_argument('--sync_prefix', required=True,
help='The sync prefix repo should leave')
args = parser.parse_args()

logging.basicConfig(format='[%(asctime)s]%(levelname)s:%(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG)

app = NDNApp(face=None, keychain=KeychainDigest())
try:
app.run_forever(
after_start=run_leave_sync_client(app, repo_name=Name.from_str(args.repo_name),
client_prefix=Name.from_str(args.client_prefix),
sync_prefix=Name.from_str(args.sync_prefix)))
except FileNotFoundError:
print('Error: could not connect to NFD.')


if __name__ == '__main__':
main()
56 changes: 56 additions & 0 deletions examples/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env python3
import argparse
import logging
from ndn.app import NDNApp
from ndn.encoding import Name
from ndn.security import KeychainDigest
from ndn_python_repo.clients import SyncClient
import uuid


async def run_sync_client(app: NDNApp, **kwargs):
client = SyncClient(app=app, prefix=kwargs['client_prefix'], repo_name=kwargs['repo_name'])
await client.join_sync(sync_prefix=kwargs['sync_prefix'], register_prefix=kwargs['register_prefix'],
data_name_dedupe=kwargs['data_name_dedupe'],
reset=kwargs['reset'])
app.shutdown()


def main():
parser = argparse.ArgumentParser(description='sync')
parser.add_argument('-r', '--repo_name',
required=True, help='Name of repo')
parser.add_argument('--client_prefix', required=True,
help='prefix of this client')
parser.add_argument('--sync_prefix', required=True,
help='The sync prefix repo should join')
parser.add_argument('--register_prefix', required=False,
help='The prefix repo should register')
parser.add_argument('--data_name_dedupe', required=False, default=False,
help='whether repo should dedupe the sync group in data naming')
parser.add_argument('--reset', required=False, default=False,
help='whether repo should reset the sync group')
args = parser.parse_args()

logging.basicConfig(format='[%(asctime)s]%(levelname)s:%(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
level=logging.DEBUG)

app = NDNApp(face=None, keychain=KeychainDigest())
register_prefix = None
if args.register_prefix:
register_prefix = Name.from_str(args.register_prefix)
try:
app.run_forever(
after_start=run_sync_client(app, repo_name=Name.from_str(args.repo_name),
client_prefix=Name.from_str(args.client_prefix),
sync_prefix=Name.from_str(args.sync_prefix),
register_prefix=register_prefix,
data_name_dedupe=args.data_name_dedupe,
reset=args.reset))
except FileNotFoundError:
print('Error: could not connect to NFD.')


if __name__ == '__main__':
main()
1 change: 1 addition & 0 deletions ndn_python_repo/clients/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .getfile import GetfileClient
from .putfile import PutfileClient
from .delete import DeleteClient
from .sync import SyncClient
from .command_checker import CommandChecker
85 changes: 85 additions & 0 deletions ndn_python_repo/clients/sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# -----------------------------------------------------------------------------
# NDN Repo putfile client.
#
# @Author [email protected]
# [email protected]
# @Date 2019-10-18
# -----------------------------------------------------------------------------

import os
import sys
sys.path.insert(1, os.path.join(sys.path[0], '..'))

import asyncio as aio
from .command_checker import CommandChecker
from ..command import RepoCommandParam, SyncParam, EmbName, RepoStatCode
from ..utils import PubSub
import logging
import multiprocessing
from ndn.app import NDNApp
from ndn.encoding import Name, NonStrictName, Component, Links
import os
import platform
from hashlib import sha256
from typing import Optional, List

class SyncClient(object):

def __init__(self, app: NDNApp, prefix: NonStrictName, repo_name: NonStrictName):
"""
A client to sync the repo.
:param app: NDNApp.
:param prefix: NonStrictName. The name of this client
:param repo_name: NonStrictName. Routable name to remote repo.
"""
self.app = app
self.prefix = prefix
self.repo_name = Name.normalize(repo_name)
self.encoded_packets = {}
self.pb = PubSub(self.app, self.prefix)
self.pb.base_prefix = self.prefix

# https://bugs.python.org/issue35219
if platform.system() == 'Darwin':
os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES'

async def join_sync(self, sync_prefix: NonStrictName, register_prefix: NonStrictName = None,
data_name_dedupe: bool = False, reset: bool = False) -> bytes:

# construct insert cmd msg
cmd_param = RepoCommandParam()
cmd_sync = SyncParam()
cmd_sync.sync_prefix = sync_prefix
cmd_sync.register_prefix = register_prefix
cmd_sync.data_name_dedupe = data_name_dedupe
cmd_sync.reset = reset

cmd_param.sync_groups = [cmd_sync]
cmd_param_bytes = bytes(cmd_param.encode())

# publish msg to repo's join topic
await self.pb.wait_for_ready()
is_success = await self.pb.publish(self.repo_name + Name.from_str('sync/join'), cmd_param_bytes)
if is_success:
logging.info('Published an join msg and was acknowledged by a subscriber')
else:
logging.info('Published an join msg but was not acknowledged by a subscriber')
return sha256(cmd_param_bytes).digest()

async def leave_sync(self, sync_prefix: NonStrictName) -> bytes:
# construct insert cmd msg
cmd_param = RepoCommandParam()
cmd_sync = SyncParam()
cmd_sync.sync_prefix = sync_prefix
cmd_param.sync_groups = [cmd_sync]
cmd_param_bytes = bytes(cmd_param.encode())

# publish msg to repo's leave topic
await self.pb.wait_for_ready()
is_success = await self.pb.publish(self.repo_name + Name.from_str('sync/leave'), cmd_param_bytes)
if is_success:
logging.info('Published an leave msg and was acknowledged by a subscriber')
else:
logging.info('Published an leave msg but was not acknowledged by a subscriber')
return sha256(cmd_param_bytes).digest()
3 changes: 2 additions & 1 deletion ndn_python_repo/cmd/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,11 @@ async def async_main(app: NDNApp, config):
pb = PubSub(app)
read_handle = ReadHandle(app, storage, config)
write_handle = WriteCommandHandle(app, storage, pb, read_handle, config)
sync_handle = SyncCommandHandle(app, storage, pb, read_handle, config)
delete_handle = DeleteCommandHandle(app, storage, pb, read_handle, config)
tcp_bulk_insert_handle = TcpBulkInsertHandle(storage, read_handle, config)

repo = Repo(app, storage, read_handle, write_handle, delete_handle, tcp_bulk_insert_handle, config)
repo = Repo(app, storage, read_handle, write_handle, delete_handle, sync_handle, tcp_bulk_insert_handle, config)
await repo.listen()


Expand Down
21 changes: 19 additions & 2 deletions ndn_python_repo/command/repo_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"""
import ndn.encoding as enc

__all__ = ['RepoTypeNumber', 'EmbName', 'ObjParam', 'RepoCommandParam', 'ObjStatus', 'RepoCommandRes',
__all__ = ['RepoTypeNumber', 'EmbName', 'ObjParam', 'SyncParam', 'SyncStatus', 'RepoCommandParam', 'ObjStatus', 'RepoCommandRes',
'RepeatedNames', 'RepoStatCode', 'RepoStatQuery']


Expand All @@ -22,7 +22,10 @@ class RepoTypeNumber:
CHECK_PREFIX = 213
OBJECT_PARAM = 301
OBJECT_RESULT = 302

SYNC_PARAM = 401
SYNC_RESULT = 402
SYNC_DATA_NAME_DEDUPE = 403
SYNC_RESET = 404

class RepoStatCode:
# 100 has not been used by previous code, but defined and documented.
Expand Down Expand Up @@ -51,9 +54,17 @@ class ObjParam(enc.TlvModel):
end_block_id = enc.UintField(RepoTypeNumber.END_BLOCK_ID)
register_prefix = enc.ModelField(RepoTypeNumber.REGISTER_PREFIX, EmbName)

class SyncParam(enc.TlvModel):
sync_prefix = enc.NameField()
register_prefix = enc.NameField()
data_name_dedupe = enc.BoolField(RepoTypeNumber.SYNC_DATA_NAME_DEDUPE)
reset = enc.BoolField(RepoTypeNumber.SYNC_RESET)
# forwarding_hint = enc.ModelField(RepoTypeNumber.FORWARDING_HINT, enc.Links)
# sync_prefix = enc.ModelField(RepoTypeNumber.REGISTER_PREFIX, EmbName)

class RepoCommandParam(enc.TlvModel):
objs = enc.RepeatedField(enc.ModelField(RepoTypeNumber.OBJECT_PARAM, ObjParam))
sync_groups = enc.RepeatedField(enc.ModelField(RepoTypeNumber.SYNC_PARAM, SyncParam))


class RepoStatQuery(enc.TlvModel):
Expand All @@ -67,9 +78,15 @@ class ObjStatus(enc.TlvModel):
delete_num = enc.UintField(RepoTypeNumber.DELETE_NUM)


class SyncStatus(enc.TlvModel):
name = enc.NameField()
status_code = enc.UintField(RepoTypeNumber.STATUS_CODE)
insert_num = enc.UintField(RepoTypeNumber.INSERT_NUM)

class RepoCommandRes(enc.TlvModel):
status_code = enc.UintField(RepoTypeNumber.STATUS_CODE)
objs = enc.RepeatedField(enc.ModelField(RepoTypeNumber.OBJECT_RESULT, ObjStatus))
sync_groups = enc.RepeatedField(enc.ModelField(RepoTypeNumber.SYNC_RESULT, ObjStatus))


class RepeatedNames(enc.TlvModel):
Expand Down
1 change: 1 addition & 0 deletions ndn_python_repo/handle/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
from .command_handle_base import CommandHandle
from .write_command_handle import WriteCommandHandle
from .delete_command_handle import DeleteCommandHandle
from .sync_command_handle import SyncCommandHandle
from .tcp_bulk_insert_handle import TcpBulkInsertHandle
Loading

0 comments on commit 276aa7e

Please sign in to comment.