diff --git a/.gitignore b/.gitignore index 8cbe621..e9dea97 100644 --- a/.gitignore +++ b/.gitignore @@ -114,4 +114,8 @@ venv.bak/ .leveldb/ .pytest_cache/ repo.db -NDN_Repo.egg-info/ \ No newline at end of file +NDN_Repo.egg-info/ + +# tianyuan's helper +feature.* +testrepo.* \ No newline at end of file diff --git a/docs/src/specification/specification.rst b/docs/src/specification/specification.rst index ac6f72b..c767b2c 100644 --- a/docs/src/specification/specification.rst +++ b/docs/src/specification/specification.rst @@ -8,5 +8,5 @@ Specification Delete Check TCP bulk insert - Security - Joining SVS Sync (Draft) + Sync Group Join + Sync Group Leave diff --git a/docs/src/specification/sync_join.rst b/docs/src/specification/sync_join.rst new file mode 100644 index 0000000..009fcb5 --- /dev/null +++ b/docs/src/specification/sync_join.rst @@ -0,0 +1,23 @@ +Sync Join +========= + +The sync join protocol is used to command the repo to join a state vector sync group. + +1. The repo subscribes to the topic ``//sync/join``. + +2. The client publishes a message to the topic ``//sync/join``. The message payload is of type + ``RepoCommandParam``, containing one or more ``SyncParam`` with the following fields: + + * ``sync_prefix``: The name prefix of the sync group to join. + * ``register_prefix``: (Optional) The name prefix for the repo to register with the forwarder. This prefix must not + be the same as ``sync_prefix``. + * ``data_name_dedupe``: (Optional) If true, the repo will deduplicate data names in the sync group. + * ``reset``: (Optional) If true, rebuild state vectors from the stored state vectors on the repo disk. This is useful + if interests are sent for permanently unavailable data from an old vector. + +3. The repo joins the sync group, saving sync information to disk. + + * The repo will listen for state vector interests for the sync group. Then, to fetch any data, the repo will send + interests following the SVSPS data naming convention. For more information, see the + `specification page `_ for State Vector + Synchronization. \ No newline at end of file diff --git a/docs/src/specification/sync_leave.rst b/docs/src/specification/sync_leave.rst new file mode 100644 index 0000000..2ff8ca5 --- /dev/null +++ b/docs/src/specification/sync_leave.rst @@ -0,0 +1,17 @@ +Sync Leave +========== + +The sync leave protocol is used to command the repo to leave the sync group. This command also removes any information +about the sync group from repo storage. + +1. The repo subscribes to the topic ``//sync/leave``. + +2. The client publishes a message to the topic ``//sync/leave``. The message payload is of type + ``RepoCommandParam``, containing one or more ``SyncParam`` with the following field: + + * ``sync_prefix``: The name prefix of the sync group to leave. + +3. The repo leaves the sync group, removing sync information from disk. The repo no longer listens to the originally + specified register prefix. + + * Note that any already-stored data packets that were received prior to leaving the sync group are *not* deleted. \ No newline at end of file diff --git a/examples/leavesync.py b/examples/leavesync.py new file mode 100644 index 0000000..63afab4 --- /dev/null +++ b/examples/leavesync.py @@ -0,0 +1,41 @@ +#!/usr/bin/env python3 +import argparse +import logging +from ndn.app import NDNApp +from ndn.encoding import Name +from ndn.security import KeychainDigest +from ndn_python_repo.clients import SyncClient +import uuid + +async def run_leave_sync_client(app: NDNApp, **kwargs): + client = SyncClient(app=app, prefix=kwargs['client_prefix'], repo_name=kwargs['repo_name']) + await client.leave_sync(sync_prefix=kwargs['sync_prefix']) + app.shutdown() + + +def main(): + parser = argparse.ArgumentParser(description='leavesync') + parser.add_argument('-r', '--repo_name', + required=True, help='Name of repo') + parser.add_argument('--client_prefix', required=True, + help='prefix of this client') + parser.add_argument('--sync_prefix', required=True, + help='The sync prefix repo should leave') + args = parser.parse_args() + + logging.basicConfig(format='[%(asctime)s]%(levelname)s:%(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.DEBUG) + + app = NDNApp(face=None, keychain=KeychainDigest()) + try: + app.run_forever( + after_start=run_leave_sync_client(app, repo_name=Name.from_str(args.repo_name), + client_prefix=Name.from_str(args.client_prefix), + sync_prefix=Name.from_str(args.sync_prefix))) + except FileNotFoundError: + print('Error: could not connect to NFD.') + + +if __name__ == '__main__': + main() diff --git a/examples/sync.py b/examples/sync.py new file mode 100644 index 0000000..b509169 --- /dev/null +++ b/examples/sync.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python3 +import argparse +import logging +from ndn.app import NDNApp +from ndn.encoding import Name +from ndn.security import KeychainDigest +from ndn_python_repo.clients import SyncClient +import uuid + + +async def run_sync_client(app: NDNApp, **kwargs): + client = SyncClient(app=app, prefix=kwargs['client_prefix'], repo_name=kwargs['repo_name']) + await client.join_sync(sync_prefix=kwargs['sync_prefix'], register_prefix=kwargs['register_prefix'], + data_name_dedupe=kwargs['data_name_dedupe'], + reset=kwargs['reset']) + app.shutdown() + + +def main(): + parser = argparse.ArgumentParser(description='sync') + parser.add_argument('-r', '--repo_name', + required=True, help='Name of repo') + parser.add_argument('--client_prefix', required=True, + help='prefix of this client') + parser.add_argument('--sync_prefix', required=True, + help='The sync prefix repo should join') + parser.add_argument('--register_prefix', required=False, + help='The prefix repo should register') + parser.add_argument('--data_name_dedupe', required=False, default=False, + help='whether repo should dedupe the sync group in data naming') + parser.add_argument('--reset', required=False, default=False, + help='whether repo should reset the sync group') + args = parser.parse_args() + + logging.basicConfig(format='[%(asctime)s]%(levelname)s:%(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.DEBUG) + + app = NDNApp(face=None, keychain=KeychainDigest()) + register_prefix = None + if args.register_prefix: + register_prefix = Name.from_str(args.register_prefix) + try: + app.run_forever( + after_start=run_sync_client(app, repo_name=Name.from_str(args.repo_name), + client_prefix=Name.from_str(args.client_prefix), + sync_prefix=Name.from_str(args.sync_prefix), + register_prefix=register_prefix, + data_name_dedupe=args.data_name_dedupe, + reset=args.reset)) + except FileNotFoundError: + print('Error: could not connect to NFD.') + + +if __name__ == '__main__': + main() diff --git a/ndn_python_repo/clients/__init__.py b/ndn_python_repo/clients/__init__.py index 20a79c9..4078900 100644 --- a/ndn_python_repo/clients/__init__.py +++ b/ndn_python_repo/clients/__init__.py @@ -1,4 +1,5 @@ from .getfile import GetfileClient from .putfile import PutfileClient from .delete import DeleteClient +from .sync import SyncClient from .command_checker import CommandChecker \ No newline at end of file diff --git a/ndn_python_repo/clients/sync.py b/ndn_python_repo/clients/sync.py new file mode 100644 index 0000000..541af18 --- /dev/null +++ b/ndn_python_repo/clients/sync.py @@ -0,0 +1,85 @@ +# ----------------------------------------------------------------------------- +# NDN Repo putfile client. +# +# @Author jonnykong@cs.ucla.edu +# susmit@cs.colostate.edu +# @Date 2019-10-18 +# ----------------------------------------------------------------------------- + +import os +import sys +sys.path.insert(1, os.path.join(sys.path[0], '..')) + +import asyncio as aio +from .command_checker import CommandChecker +from ..command import RepoCommandParam, SyncParam, EmbName, RepoStatCode +from ..utils import PubSub +import logging +import multiprocessing +from ndn.app import NDNApp +from ndn.encoding import Name, NonStrictName, Component, Links +import os +import platform +from hashlib import sha256 +from typing import Optional, List + +class SyncClient(object): + + def __init__(self, app: NDNApp, prefix: NonStrictName, repo_name: NonStrictName): + """ + A client to sync the repo. + + :param app: NDNApp. + :param prefix: NonStrictName. The name of this client + :param repo_name: NonStrictName. Routable name to remote repo. + """ + self.app = app + self.prefix = prefix + self.repo_name = Name.normalize(repo_name) + self.encoded_packets = {} + self.pb = PubSub(self.app, self.prefix) + self.pb.base_prefix = self.prefix + + # https://bugs.python.org/issue35219 + if platform.system() == 'Darwin': + os.environ['OBJC_DISABLE_INITIALIZE_FORK_SAFETY'] = 'YES' + + async def join_sync(self, sync_prefix: NonStrictName, register_prefix: NonStrictName = None, + data_name_dedupe: bool = False, reset: bool = False) -> bytes: + + # construct insert cmd msg + cmd_param = RepoCommandParam() + cmd_sync = SyncParam() + cmd_sync.sync_prefix = sync_prefix + cmd_sync.register_prefix = register_prefix + cmd_sync.data_name_dedupe = data_name_dedupe + cmd_sync.reset = reset + + cmd_param.sync_groups = [cmd_sync] + cmd_param_bytes = bytes(cmd_param.encode()) + + # publish msg to repo's join topic + await self.pb.wait_for_ready() + is_success = await self.pb.publish(self.repo_name + Name.from_str('sync/join'), cmd_param_bytes) + if is_success: + logging.info('Published an join msg and was acknowledged by a subscriber') + else: + logging.info('Published an join msg but was not acknowledged by a subscriber') + return sha256(cmd_param_bytes).digest() + + async def leave_sync(self, sync_prefix: NonStrictName) -> bytes: + # construct insert cmd msg + cmd_param = RepoCommandParam() + cmd_sync = SyncParam() + cmd_sync.sync_prefix = sync_prefix + cmd_param.sync_groups = [cmd_sync] + cmd_param_bytes = bytes(cmd_param.encode()) + + # publish msg to repo's leave topic + await self.pb.wait_for_ready() + is_success = await self.pb.publish(self.repo_name + Name.from_str('sync/leave'), cmd_param_bytes) + if is_success: + logging.info('Published an leave msg and was acknowledged by a subscriber') + else: + logging.info('Published an leave msg but was not acknowledged by a subscriber') + return sha256(cmd_param_bytes).digest() diff --git a/ndn_python_repo/cmd/main.py b/ndn_python_repo/cmd/main.py index 1e2963b..e9b5b17 100644 --- a/ndn_python_repo/cmd/main.py +++ b/ndn_python_repo/cmd/main.py @@ -83,10 +83,11 @@ async def async_main(app: NDNApp, config): pb = PubSub(app) read_handle = ReadHandle(app, storage, config) write_handle = WriteCommandHandle(app, storage, pb, read_handle, config) + sync_handle = SyncCommandHandle(app, storage, pb, read_handle, config) delete_handle = DeleteCommandHandle(app, storage, pb, read_handle, config) tcp_bulk_insert_handle = TcpBulkInsertHandle(storage, read_handle, config) - repo = Repo(app, storage, read_handle, write_handle, delete_handle, tcp_bulk_insert_handle, config) + repo = Repo(app, storage, read_handle, write_handle, delete_handle, sync_handle, tcp_bulk_insert_handle, config) await repo.listen() diff --git a/ndn_python_repo/command/repo_commands.py b/ndn_python_repo/command/repo_commands.py index c0e1bd8..42300c9 100644 --- a/ndn_python_repo/command/repo_commands.py +++ b/ndn_python_repo/command/repo_commands.py @@ -6,7 +6,7 @@ """ import ndn.encoding as enc -__all__ = ['RepoTypeNumber', 'EmbName', 'ObjParam', 'RepoCommandParam', 'ObjStatus', 'RepoCommandRes', +__all__ = ['RepoTypeNumber', 'EmbName', 'ObjParam', 'SyncParam', 'SyncStatus', 'RepoCommandParam', 'ObjStatus', 'RepoCommandRes', 'RepeatedNames', 'RepoStatCode', 'RepoStatQuery'] @@ -22,7 +22,10 @@ class RepoTypeNumber: CHECK_PREFIX = 213 OBJECT_PARAM = 301 OBJECT_RESULT = 302 - + SYNC_PARAM = 401 + SYNC_RESULT = 402 + SYNC_DATA_NAME_DEDUPE = 403 + SYNC_RESET = 404 class RepoStatCode: # 100 has not been used by previous code, but defined and documented. @@ -51,9 +54,17 @@ class ObjParam(enc.TlvModel): end_block_id = enc.UintField(RepoTypeNumber.END_BLOCK_ID) register_prefix = enc.ModelField(RepoTypeNumber.REGISTER_PREFIX, EmbName) +class SyncParam(enc.TlvModel): + sync_prefix = enc.NameField() + register_prefix = enc.NameField() + data_name_dedupe = enc.BoolField(RepoTypeNumber.SYNC_DATA_NAME_DEDUPE) + reset = enc.BoolField(RepoTypeNumber.SYNC_RESET) + # forwarding_hint = enc.ModelField(RepoTypeNumber.FORWARDING_HINT, enc.Links) + # sync_prefix = enc.ModelField(RepoTypeNumber.REGISTER_PREFIX, EmbName) class RepoCommandParam(enc.TlvModel): objs = enc.RepeatedField(enc.ModelField(RepoTypeNumber.OBJECT_PARAM, ObjParam)) + sync_groups = enc.RepeatedField(enc.ModelField(RepoTypeNumber.SYNC_PARAM, SyncParam)) class RepoStatQuery(enc.TlvModel): @@ -67,9 +78,15 @@ class ObjStatus(enc.TlvModel): delete_num = enc.UintField(RepoTypeNumber.DELETE_NUM) +class SyncStatus(enc.TlvModel): + name = enc.NameField() + status_code = enc.UintField(RepoTypeNumber.STATUS_CODE) + insert_num = enc.UintField(RepoTypeNumber.INSERT_NUM) + class RepoCommandRes(enc.TlvModel): status_code = enc.UintField(RepoTypeNumber.STATUS_CODE) objs = enc.RepeatedField(enc.ModelField(RepoTypeNumber.OBJECT_RESULT, ObjStatus)) + sync_groups = enc.RepeatedField(enc.ModelField(RepoTypeNumber.SYNC_RESULT, ObjStatus)) class RepeatedNames(enc.TlvModel): diff --git a/ndn_python_repo/handle/__init__.py b/ndn_python_repo/handle/__init__.py index 7ade72c..2e70b2f 100644 --- a/ndn_python_repo/handle/__init__.py +++ b/ndn_python_repo/handle/__init__.py @@ -2,4 +2,5 @@ from .command_handle_base import CommandHandle from .write_command_handle import WriteCommandHandle from .delete_command_handle import DeleteCommandHandle +from .sync_command_handle import SyncCommandHandle from .tcp_bulk_insert_handle import TcpBulkInsertHandle \ No newline at end of file diff --git a/ndn_python_repo/handle/command_handle_base.py b/ndn_python_repo/handle/command_handle_base.py index b10b422..3005279 100644 --- a/ndn_python_repo/handle/command_handle_base.py +++ b/ndn_python_repo/handle/command_handle_base.py @@ -1,9 +1,10 @@ import asyncio as aio import logging +import json from ndn.app import NDNApp -from ndn.encoding import Name, NonStrictName, FormalName +from ndn.encoding import Name, NonStrictName, FormalName, Component from ndn.encoding.tlv_model import DecodeError -from typing import List +from typing import List, Dict from ..command import RepoStatQuery, RepoCommandRes, RepoStatCode, RepeatedNames from ..storage import Storage @@ -128,6 +129,23 @@ def remove_name_from_set_in_storage(set_name: str, storage: Storage, name: NonSt else: return False + # this will overwrite + @staticmethod + def add_dict_in_storage(dict_name: str, storage: Storage, dict: Dict) -> bool: + ret = storage._get(dict_name.encode('utf-8')) + dict_bytes = json.dumps(dict).encode('utf-8') + storage._put(dict_name.encode('utf-8'), dict_bytes) + return (ret is not None) + + @staticmethod + def get_dict_in_storage(dict_name: str, storage: Storage) -> Dict: + res_bytes = storage._get(dict_name.encode('utf-8')) + return json.loads(res_bytes.decode('utf-8')) + + @staticmethod + def remove_dict_in_storage(dict_name: str, storage: Storage) -> bool: + return storage._remove(dict_name.encode('utf-8')) + # Wrapper for registered prefixes @staticmethod def add_registered_prefix_in_storage(storage: Storage, prefix): @@ -146,3 +164,38 @@ def remove_registered_prefix_in_storage(storage: Storage, prefix): if ret: logging.getLogger(__name__).info(f'Removed existing registered prefix from storage: {Name.to_str(prefix)}') return ret + + @staticmethod + def add_sync_states_in_storage(storage: Storage, sync_group: FormalName, states: Dict): + store_key = [Component.from_str('sync_states')] + sync_group + logging.info(f'Added new sync states to storage: {Name.to_str(sync_group)}') + return CommandHandle.add_dict_in_storage(Name.to_str(store_key), storage, states) + + @staticmethod + def get_sync_states_in_storage(storage: Storage, sync_group: FormalName): + store_key = [Component.from_str('sync_states')] + sync_group + return CommandHandle.get_dict_in_storage(Name.to_str(store_key), storage) + + @staticmethod + def remove_sync_states_in_storage(storage: Storage, sync_group: FormalName): + store_key = [Component.from_str('sync_states')] + sync_group + logging.info(f'Removed new sync states to storage: {Name.to_str(sync_group)}') + return CommandHandle.remove_dict_in_storage(Name.to_str(store_key), storage) + + @staticmethod + def add_sync_group_in_storage(storage: Storage, sync_group: FormalName): + ret = CommandHandle.add_name_to_set_in_storage('sync_groups', storage, sync_group) + if not ret: + logging.info(f'Added new sync group to storage: {Name.to_str(sync_group)}') + return ret + + @staticmethod + def get_sync_groups_in_storage(storage: Storage): + return CommandHandle.get_name_from_set_in_storage('sync_groups', storage) + + @staticmethod + def remove_sync_group_in_storage(storage: Storage, sync_group: FormalName): + ret = CommandHandle.remove_name_from_set_in_storage('sync_groups', storage, sync_group) + if ret: + logging.info(f'Removed existing sync_group from storage: {Name.to_str(sync_group)}') + return ret \ No newline at end of file diff --git a/ndn_python_repo/handle/read_handle.py b/ndn_python_repo/handle/read_handle.py index aea8ee9..2781e39 100644 --- a/ndn_python_repo/handle/read_handle.py +++ b/ndn_python_repo/handle/read_handle.py @@ -42,6 +42,7 @@ def _on_interest(self, int_name, int_param, _app_param): Repo should not respond to any interest with MustBeFresh flag set. """ if int_param.must_be_fresh: + logging.warn(f'Repo is configured to ignore Interests with MustBeFresh flag set: {Name.to_str(int_name)}') return data_bytes = self.storage.get_data_packet(int_name, int_param.can_be_prefix) if data_bytes == None: diff --git a/ndn_python_repo/handle/sync_command_handle.py b/ndn_python_repo/handle/sync_command_handle.py new file mode 100644 index 0000000..9ac7ce3 --- /dev/null +++ b/ndn_python_repo/handle/sync_command_handle.py @@ -0,0 +1,252 @@ +import asyncio as aio +import logging +from ndn.app import NDNApp +from ndn.encoding import Name, NonStrictName, DecodeError, Component, parse_data +from ndn.types import InterestNack, InterestTimeout +from . import ReadHandle, CommandHandle +from ..command import RepoCommandRes, RepoCommandParam, SyncParam, SyncStatus, RepoStatCode +from ..utils import concurrent_fetcher, PubSub, PassiveSvs, IdNamingConv +from ..storage import Storage +from typing import Optional, Tuple, List, Dict +from hashlib import sha256 +from .utils import normalize_block_ids + + +class SyncCommandHandle(CommandHandle): + """ + SyncCommandHandle processes insert command interests, and fetches corresponding data to + store them into the database. + TODO: Add validator + """ + def __init__(self, app: NDNApp, storage: Storage, pb: PubSub, read_handle: ReadHandle, + config: dict): + """ + Sync handle need to keep a reference to sync handle to register new prefixes. + + :param app: NDNApp. + :param storage: Storage. + :param read_handle: ReadHandle. This param is necessary, because WriteCommandHandle need to + call ReadHandle.listen() to register new prefixes. + """ + super(SyncCommandHandle, self).__init__(app, storage, pb, config) + self.m_read_handle = read_handle + self.prefix = None + self.register_root = config['repo_config']['register_root'] + # sync specific states + self.states_on_disk = {} + # runtime states + self.running_svs = {} + self.running_fetcher = {} + + async def listen(self, prefix: NonStrictName): + """ + Register routes for command interests. + This function needs to be called explicitly after initialization. + + :param prefix: NonStrictName. The name prefix to listen on. + """ + self.prefix = Name.normalize(prefix) + + # subscribe to sync messages + self.pb.subscribe(self.prefix + Name.from_str('sync/join'), self._on_sync_msg) + + # subscribe to leave messages + self.pb.subscribe(self.prefix + Name.from_str('sync/leave'), self._on_leave_msg) + + def recover_from_states(self, states: Dict): + self.states_on_disk = states + # recover sync + for sync_group, group_states in self.states_on_disk.items(): + new_svs = PassiveSvs(sync_group, lambda svs: self.fetch_missing_data(svs)) + new_svs.decode_from_states(group_states['svs_client_states']) + logging.info(f'Recover sync for {Name.to_str(sync_group)}') + group_fetched_dict = group_states['fetched_dict'] + logging.info(f'Sync progress: {group_fetched_dict}') + new_svs.start(self.app) + self.running_svs[Name.to_str(sync_group)] = new_svs + + def _on_sync_msg(self, msg): + try: + cmd_param = RepoCommandParam.parse(msg) + request_no = sha256(bytes(msg)).digest() + if not cmd_param.sync_groups: + raise DecodeError('Missing sync groups') + for group in cmd_param.sync_groups: + if not group.sync_prefix: + raise DecodeError('Missing name for one or more sync groups') + if group.register_prefix: + if Name.to_str(group.sync_prefix) == Name.to_str(group.register_prefix): + raise DecodeError('Sync prefix and register prefix cannot be the same') + except (DecodeError, IndexError) as exc: + logging.warning(f'Parameter interest blob decoding failed w/ exception: {exc}') + return + aio.create_task(self._process_sync(cmd_param, request_no)) + + def _on_leave_msg(self, msg): + try: + cmd_param = RepoCommandParam.parse(msg) + request_no = sha256(bytes(msg)).digest() + if not cmd_param.sync_groups: + raise DecodeError('Missing sync groups') + for group in cmd_param.sync_groups: + if not group.sync_prefix: + raise DecodeError('Missing name for one or more sync groups') + except (DecodeError, IndexError) as exc: + logging.warning(f'Parameter interest blob decoding failed w/ exception: {exc}') + return + aio.create_task(self._process_leave(cmd_param, request_no)) + + async def _process_sync(self, cmd_param: RepoCommandParam, request_no: bytes): + """ + Process sync command. + Return to client with status code 100 immediately, and then start sync process. + """ + groups = cmd_param.sync_groups + logging.info(f'Recved sync command: {request_no.hex()}') + + # Cached status response + # Note: no coroutine switching here, so no multithread conflicts + def _init_sync_stat(param: SyncParam) -> SyncStatus: + ret = SyncStatus() + ret.name = param.sync_prefix + ret.status_code = RepoStatCode.ROGER + ret.insert_num = 0 + return ret + + # Note: stat is hold by reference + stat = RepoCommandRes() + stat.status_code = RepoStatCode.IN_PROGRESS + stat.sync_groups = [_init_sync_stat(group) for group in groups] + self.m_processes[request_no] = stat + # start sync + for idx, group in enumerate(groups): + # check duplicate + sync_prefix = Name.to_str(group.sync_prefix) + if sync_prefix in self.states_on_disk: + # if asking for reset + if group.reset: + if sync_prefix in self.running_fetcher: + for task in self.running_fetcher.pop(sync_prefix): + task.cancel() + if sync_prefix in self.running_svs: + svs = self.running_svs[sync_prefix] + # rebuild state vectors from actual storage + group_states = self.states_on_disk[sync_prefix] + svs.local_sv = group_states['fetched_dict'] + logging.info(f'Rebuild state vectors to: {svs.local_sv}') + svs.inst_buffer = {} + group_states['svs_client_states'] = svs.encode_into_states() + CommandHandle.add_sync_states_in_storage(self.storage, group.sync_prefix, + self.states_on_disk[sync_prefix]) + logging.info(f'Reset sync for: {sync_prefix}') + continue + else: + logging.info(f'Duplicate sync for: {sync_prefix}') + continue + new_svs = PassiveSvs(group.sync_prefix, lambda svs: self.fetch_missing_data(svs)) + new_svs.start(self.app) + self.running_svs[sync_prefix] = new_svs + # write states + self.states_on_disk[Name.to_str(group.sync_prefix)] = {} + new_states = self.states_on_disk[Name.to_str(group.sync_prefix)] + new_states['fetched_dict'] = {} + new_states['svs_client_states'] = {} + new_states['data_name_dedupe'] = group.data_name_dedupe + new_states['check_status'] = {} + # Remember the prefixes to register + if group.register_prefix: + new_states['register_prefix'] = Name.to_str(group.register_prefix) + is_existing = CommandHandle.add_registered_prefix_in_storage(self.storage, group.register_prefix) + # If repo does not register root prefix, the client tells repo what to register + if not self.register_root and not is_existing: + self.m_read_handle.listen(group.register_prefix) + else: + new_states['register_prefix'] = None + CommandHandle.add_sync_group_in_storage(self.storage, group.sync_prefix) + new_states['svs_client_states'] = new_svs.encode_into_states() + CommandHandle.add_sync_states_in_storage(self.storage, group.sync_prefix, new_states) + + async def _process_leave(self, cmd_param: RepoCommandParam, request_no: bytes): + groups = cmd_param.sync_groups + logging.info(f'Recved leave command: {request_no.hex()}') + + for idx, group in enumerate(groups): + sync_prefix = Name.to_str(group.sync_prefix) + if sync_prefix in self.states_on_disk: + states = self.states_on_disk[sync_prefix] + logging.info(f'Leaving sync for: {sync_prefix}') + if sync_prefix in self.running_fetcher: + for task in self.running_fetcher.pop(sync_prefix): + task.cancel() + + if sync_prefix in self.running_svs: + svs = self.running_svs.pop(sync_prefix) + await svs.stop() + + # Unregister prefix + if states['register_prefix']: + register_prefix = Name.from_str(states['register_prefix']) + CommandHandle.remove_registered_prefix_in_storage(self.storage, Name.from_str(states['register_prefix'])) + if not self.register_root: + self.m_read_handle.unlisten(register_prefix) + + CommandHandle.remove_sync_states_in_storage(self.storage, group.sync_prefix) + CommandHandle.remove_sync_group_in_storage(self.storage, group.sync_prefix) + + self.states_on_disk.pop(sync_prefix) + else: + logging.info(f'Leaving sync group that does not exist: {sync_prefix}') + + def fetch_missing_data(self, svs: PassiveSvs): + if not svs.running: + return + + local_sv = svs.local_sv.copy() + for node_id, seq in local_sv.items(): + task = aio.create_task(self.node_fetcher(svs, node_id, seq)) + self.running_fetcher[Name.to_str(svs.base_prefix)] = task + + # this deals with specific producer. this function is blocking, until receiving all + # data (segments) from the producer + async def node_fetcher(self, svs, node_id, seq): + group_states = self.states_on_disk[Name.to_str(svs.base_prefix)] + group_fetched_dict = group_states['fetched_dict'] + group_data_name_dedupe = group_states['data_name_dedupe'] + fetched_seq = group_fetched_dict.get(node_id, 0) + node_name = Name.from_str(node_id) + svs.base_prefix + if group_data_name_dedupe: + data_prefix = [i for n, i in enumerate(node_name) if i not in node_name[:n]] + else: + data_prefix = node_name + # I do not treat fetching failure as hard failure + if fetched_seq < seq: + async for (data_name, _, data_content, data_bytes) in ( + concurrent_fetcher(self.app, data_prefix, + start_id=fetched_seq+1, end_id=seq, + semaphore=aio.Semaphore(10), + name_conv = IdNamingConv.SEQUENCE, + max_retries = -1)): + # put into storage asap + self.storage.put_data_packet(data_name, data_bytes) + # not very sure the side effect + group_fetched_dict[node_id] = Component.to_number(data_name[-1]) + logging.info(f'Sync progress: {group_fetched_dict}') + group_states['svs_client_states'] = svs.encode_into_states() + CommandHandle.add_sync_states_in_storage(self.storage, svs.base_prefix, group_states) + ''' + Python-repo specific logic: if the inner data content contains a data name, + assuming the data object pointed by is segmented, and fetching all + data segments related to this object name + ''' + try: + _, _, inner_data_content, _ = parse_data(data_content) + obj_pointer = Name.from_bytes(inner_data_content) + except (TypeError, IndexError, ValueError): + logging.debug(f'Data does not include an object pointer, skip') + continue + logging.info(f'Discovered a pointer, fetching data segments for {Name.to_str(obj_pointer)}') + async for (data_name, _, _, data_bytes) in ( + concurrent_fetcher(self.app, obj_pointer, + start_id=0, end_id=None, semaphore=aio.Semaphore(10), + max_retries = -1)): + self.storage.put_data_packet(data_name, data_bytes) diff --git a/ndn_python_repo/ndn-python-repo.conf.sample b/ndn_python_repo/ndn-python-repo.conf.sample index 44b159c..8fb59ee 100644 --- a/ndn_python_repo/ndn-python-repo.conf.sample +++ b/ndn_python_repo/ndn-python-repo.conf.sample @@ -4,8 +4,7 @@ repo_config: repo_name: 'testrepo' # if true, the repo registers the root prefix. If false, client needs to tell repo # which prefix to register/unregister - register_root: True - + register_root: False db_config: # choose one among sqlite3, leveldb, and mongodb diff --git a/ndn_python_repo/repo.py b/ndn_python_repo/repo.py index 2eac7ba..9e7f1a3 100644 --- a/ndn_python_repo/repo.py +++ b/ndn_python_repo/repo.py @@ -9,7 +9,7 @@ class Repo(object): def __init__(self, app: NDNApp, storage: Storage, read_handle: ReadHandle, write_handle: WriteCommandHandle, delete_handle: DeleteCommandHandle, - tcp_bulk_insert_handle: TcpBulkInsertHandle, config: dict): + sync_handle: SyncCommandHandle, tcp_bulk_insert_handle: TcpBulkInsertHandle, config: dict): """ An NDN repo instance. """ @@ -19,6 +19,7 @@ def __init__(self, app: NDNApp, storage: Storage, read_handle: ReadHandle, self.write_handle = write_handle self.read_handle = read_handle self.delete_handle = delete_handle + self.sync_handle = sync_handle self.tcp_bulk_insert_handle = tcp_bulk_insert_handle self.running = True @@ -35,18 +36,29 @@ async def listen(self): # Recover registered prefix to enable hot restart if not self.register_root: self.recover_registered_prefixes() + self.recover_sync_states() # Init PubSub self.write_handle.pb.set_publisher_prefix(self.prefix) self.write_handle.pb.set_base_prefix(self.prefix) self.delete_handle.pb.set_base_prefix(self.prefix) + self.sync_handle.pb.set_base_prefix(self.prefix) await self.write_handle.pb.wait_for_ready() await self.write_handle.listen(self.prefix) await self.delete_handle.listen(self.prefix) + await self.sync_handle.listen(self.prefix) def recover_registered_prefixes(self): prefixes = self.write_handle.get_registered_prefix_in_storage(self.storage) for prefix in prefixes: self.logger.info(f'Existing Prefix Found: {Name.to_str(prefix)}') self.read_handle.listen(prefix) + + def recover_sync_states(self): + states = {} + groups = self.sync_handle.get_sync_groups_in_storage(self.storage) + for group in groups: + group_states = self.sync_handle.get_sync_states_in_storage(self.storage, group) + states[Name.to_str(group)] = group_states + self.sync_handle.recover_from_states(states) \ No newline at end of file diff --git a/ndn_python_repo/utils/__init__.py b/ndn_python_repo/utils/__init__.py index 185a852..ea13f04 100644 --- a/ndn_python_repo/utils/__init__.py +++ b/ndn_python_repo/utils/__init__.py @@ -1,2 +1,3 @@ -from .concurrent_fetcher import concurrent_fetcher -from .pubsub import PubSub \ No newline at end of file +from .concurrent_fetcher import concurrent_fetcher, IdNamingConv +from .pubsub import PubSub +from .passive_svs import PassiveSvs \ No newline at end of file diff --git a/ndn_python_repo/utils/concurrent_fetcher.py b/ndn_python_repo/utils/concurrent_fetcher.py index c98f3e2..5f328f2 100644 --- a/ndn_python_repo/utils/concurrent_fetcher.py +++ b/ndn_python_repo/utils/concurrent_fetcher.py @@ -1,33 +1,43 @@ # ----------------------------------------------------------------------------- # Concurrent segment fetcher. # -# @Author jonnykong@cs.ucla.edu -# @Date 2019-10-15 +# @Author jonnykong@cs.ucla.edu tianyuan@cs.ucla.edu +# @Date 2024-05-24 # ----------------------------------------------------------------------------- import asyncio as aio import logging from ndn.app import NDNApp -from ndn.types import InterestNack, InterestTimeout +from ndn.types import InterestNack, InterestTimeout, InterestCanceled from ndn.encoding import Name, NonStrictName, Component from typing import Optional +class IdNamingConv: + SEGMENT = 1 + SEQUENCE = 2 + NUMBER = 3 -async def concurrent_fetcher(app: NDNApp, name: NonStrictName, start_block_id: int, - end_block_id: Optional[int], semaphore: aio.Semaphore, **kwargs): +async def concurrent_fetcher(app: NDNApp, name: NonStrictName, start_id: int, + end_id: Optional[int], semaphore: aio.Semaphore, **kwargs): """ - An async-generator to fetch data packets between "`name`/`start_block_id`" and "`name`/`end_block_id`"\ + An async-generator to fetch data packets between "`name`/`start_id`" and "`name`/`end_id`"\ concurrently. :param app: NDNApp. :param name: NonStrictName. Name prefix of Data. - :param start_block_id: int. The start segment number. - :param end_block_id: Optional[int]. The end segment number. If not specified, continue fetching\ + :param start_id: int. The start number. + :param end_id: Optional[int]. The end segment number. If not specified, continue fetching\ until an interest receives timeout or nack or 3 times. :return: Yield ``(FormalName, MetaInfo, Content, RawPacket)`` tuples in order. """ - cur_id = start_block_id - final_id = end_block_id if end_block_id is not None else 0x7fffffff + name_conv = IdNamingConv.SEGMENT + max_retries = 3 + if 'name_conv' in kwargs: + name_conv = kwargs['name_conv'] + if 'max_retries' in kwargs: + max_retries = kwargs['max_retries'] + cur_id = start_id + final_id = end_id if end_id is not None else 0x7fffffff is_failed = False tasks = [] recv_window = cur_id - 1 @@ -42,12 +52,20 @@ async def _retry(seq: int): :param seq: block_id of data """ nonlocal app, name, semaphore, is_failed, received_or_fail, final_id - int_name = name + [Component.from_segment(seq)] - + if name_conv == IdNamingConv.SEGMENT: + int_name = name + [Component.from_segment(seq)] + elif name_conv == IdNamingConv.SEQUENCE: + int_name = name + [Component.from_sequence_num(seq)] + elif name_conv == IdNamingConv.NUMBER: + int_name = name + [Component.from_number(seq)] + else: + logging.error('Unrecognized naming convetion') + return trial_times = 0 while True: trial_times += 1 - if trial_times > 3: + # always retry when max_retries is -1 + if max_retries >= 0 and trial_times > max_retries: semaphore.release() is_failed = True received_or_fail.set() @@ -58,15 +76,34 @@ async def _retry(seq: int): int_name, need_raw_packet=True, can_be_prefix=False, lifetime=1000, **kwargs) # Save data and update final_id - logger.info('Received data: {}'.format(Name.to_str(data_name))) - seq_to_data_packet[seq] = (data_name, meta_info, content, data_bytes) - if meta_info is not None and meta_info.final_block_id is not None: + logging.info('Received data: {}'.format(Name.to_str(data_name))) + if name_conv == IdNamingConv.SEGMENT and \ + meta_info is not None and \ + meta_info.final_block_id is not None: + # we need to change final block id before yielding packets, + # preventing window moving beyond the final block id final_id = Component.to_number(meta_info.final_block_id) + + # cancel the Interests for non-existing data + for task in aio.all_tasks(): + task_name = task.get_name() + task_num = None + try: + task_num = int(task_name) + except: continue + if task_num and task_num > final_id \ + and task in tasks: + tasks.remove(task) + task.cancel() + seq_to_data_packet[seq] = (data_name, meta_info, content, data_bytes) break except InterestNack as e: - logger.info(f'Nacked with reason={e.reason}') + logging.info(f'Interest {Name.to_str(int_name)} nacked with reason={e.reason}') except InterestTimeout: - logger.info(f'Timeout') + logging.info(f'Interest {Name.to_str(int_name)} timeout') + except InterestCanceled as e: + logging.info(f'Interest {Name.to_str(int_name)} (might legally) cancelled') + return semaphore.release() received_or_fail.set() @@ -77,11 +114,19 @@ async def _dispatch_tasks(): nonlocal semaphore, tasks, cur_id, final_id, is_failed while cur_id <= final_id: await semaphore.acquire() + # in case final_id has been updated while waiting semaphore + # typically happened after the first round trip when we update + # the actual final_id with the final block id obtained from data. + if cur_id > final_id: + # giving back the semaphore + semaphore.release() + break if is_failed: received_or_fail.set() semaphore.release() break task = aio.get_event_loop().create_task(_retry(cur_id)) + task.set_name(cur_id) tasks.append(task) cur_id += 1 diff --git a/ndn_python_repo/utils/passive_svs.py b/ndn_python_repo/utils/passive_svs.py new file mode 100644 index 0000000..2cd34b7 --- /dev/null +++ b/ndn_python_repo/utils/passive_svs.py @@ -0,0 +1,153 @@ +# ----------------------------------------------------------------------------- +# Passive SVS Listener. +# +# @Author tianyuan@cs.ucla.edu +# @Date 2024-03-29 +# ----------------------------------------------------------------------------- + +import logging +from base64 import b64decode, b64encode +from typing import Dict, Callable +from ndn.app import NDNApp +from ndn.app_support.svs import StateVecWrapper, SvsState +from ndn.encoding import Name, NonStrictName, DecodeError, FormalName, BinaryStr, InterestParam, parse_interest, \ + parse_tl_num, UintField +from ndn.encoding.ndn_format_0_3 import TypeNumber +from ndn.utils import gen_nonce + +OnMissingDataFunc = Callable[["PassiveSvs"], None] +r""" +Called when there is a missing event. +MUST BE NON-BLOCKING. Therefore, it is not allowed to fetch the missing data in this callback. +It can start a task or trigger a signal to fetch missing data. +""" + + +class PassiveSvs: + base_prefix: FormalName + on_missing_data: OnMissingDataFunc + + local_sv: dict[bytes, int] + state: SvsState + running: bool + ndn_app: NDNApp | None + + def __init__(self, base_prefix: NonStrictName, + on_missing_data: OnMissingDataFunc): + self.base_prefix = Name.normalize(base_prefix) + self.on_missing_data = on_missing_data + self.local_sv = {} + self.inst_buffer = {} + self.state = SvsState.SyncSteady + self.running = False + self.ndn_app = None + self.logger = logging.getLogger(__name__) + + def encode_into_states(self): + states = {} + inst_buffer_enc = {} + for nid, inst in self.inst_buffer.items(): + inst_buffer_enc[nid] = b64encode(inst).decode('utf-8') + states['local_sv'] = self.local_sv + states['inst_buffer'] = inst_buffer_enc + return states + + def decode_from_states(self, states: Dict): + inst_buffer_dec = {} + for nid, inst in states['inst_buffer'].items(): + inst_buffer_dec[nid] = b64decode(inst) + self.local_sv = states['local_sv'] + self.inst_buffer = inst_buffer_dec + + def send_interest(self, interest_wire): + final_name, interest_param, _, _ = parse_interest(interest_wire) + interest_param.nonce = gen_nonce() + # a bit hack, refresh the nonce and re-encode + wire_ptr = 0 + while wire_ptr + 5 < len(interest_wire): + typ, typ_len = parse_tl_num(interest_wire[wire_ptr:], 0) + size, siz_len = parse_tl_num(interest_wire[wire_ptr:], typ_len) + if typ != TypeNumber.NONCE or typ_len != 1 or \ + size != 4 or siz_len != 1: + wire_ptr += 1 + continue + else: + # that's it! + wire = bytearray(interest_wire) + nonce = UintField(TypeNumber.NONCE, fixed_len=4) + markers = {} + nonce.encoded_length(interest_param.nonce, markers) + nonce.encode_into(interest_param.nonce, markers, wire, wire_ptr) + break + logging.info(f'Sending buffered interest: {Name.to_str(final_name)}') + # do not await for this + self.ndn_app.express_raw_interest(final_name, interest_param, wire) + + def sync_handler(self, name: FormalName, _param: InterestParam, _app_param: BinaryStr | None, + raw_packet: BinaryStr) -> None: + if len(name) != len(self.base_prefix) + 2: + logging.error(f'Received invalid Sync Interest: {Name.to_str(name)}') + return + logging.info(f'Received Sync Interest: {Name.to_str(name)}') + try: + remote_sv_pkt = StateVecWrapper.parse(name[-2]).val + except (DecodeError, IndexError) as e: + logging.error(f'Unable to decode state vector [{Name.to_str(name)}]: {e}') + return + if remote_sv_pkt is None or not remote_sv_pkt.entries: + return + remote_sv = remote_sv_pkt.entries + + # No lock is needed since we do not await + # Compare state vectors + rsv_dict = {} + for rsv in remote_sv: + if not rsv.node_id: + continue + rsv_id = Name.to_str(rsv.node_id) + rsv_seq = rsv.seq_no + rsv_dict[rsv_id] = rsv_seq + + need_fetch = False + for rsv_id, rsv_seq in rsv_dict.items(): + already_sent = [] + lsv_seq = self.local_sv.get(rsv_id, 0) + if lsv_seq < rsv_seq: + # Remote is latest + need_fetch = True + self.local_sv[rsv_id] = rsv_seq + self.logger.debug(f'Missing data for: [{Name.to_str(rsv_id)}]: {lsv_seq} < {rsv_seq}') + self.inst_buffer[rsv_id] = raw_packet + elif lsv_seq > rsv_seq: + # Local is latest + self.logger.debug(f'Outdated remote on: [{Name.to_str(rsv_id)}]: {rsv_seq} < {lsv_seq}') + raw_inst = self.inst_buffer[rsv_id] + if raw_inst not in already_sent: + already_sent.append(raw_inst) + self.send_interest(raw_inst) + else: pass + + # Notify remote there are mising nodes + diff = self.local_sv.keys() - rsv_dict.keys() + if len(diff) > 0: + self.logger.info(f'Remote missing nodes: {list(diff)}') + # Missing nodes may only exist in other nodes' sync interest, + # therefore we have to send all buffered sync interest out + for _, raw_inst in self.inst_buffer.items(): + self.send_interest(raw_inst) + if need_fetch: + self.on_missing_data(self) + + def start(self, ndn_app: NDNApp): + if self.running: + raise RuntimeError(f'Sync is already running @[{Name.to_str(self.base_prefix)}]') + self.running = True + self.ndn_app = ndn_app + self.ndn_app.route(self.base_prefix, need_raw_packet=True)(self.sync_handler) + + async def stop(self): + if not self.running: + return + self.running = False + await self.ndn_app.unregister(self.base_prefix) + self.logger.info("Passive SVS stopped.") diff --git a/ndn_python_repo/utils/pubsub.py b/ndn_python_repo/utils/pubsub.py index 55b1692..cd64635 100644 --- a/ndn_python_repo/utils/pubsub.py +++ b/ndn_python_repo/utils/pubsub.py @@ -89,7 +89,7 @@ async def wait_for_ready(self): if self.base_prefix != None and Name.is_prefix(self.base_prefix, self.publisher_prefix + ['msg']): self.app.set_interest_filter(self.publisher_prefix + ['msg'], self._on_msg_interest) else: - await self.app.register(self.publisher_prefix + ['msg'], self._on_msg_interest) + await self.app.register(self.publisher_prefix + ['msg'], self._on_msg_interest) except ValueError as esc: # duplicate registration pass diff --git a/pyproject.toml b/pyproject.toml index 00d6228..061e54c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,6 +5,7 @@ description = "An NDN Repo implementation using Python" authors = ["Zhaoning Kong "] maintainers = [ "Xinyu Ma ", + "Tianyuan Yu ", ] license = "Apache-2.0" readme = "README.rst"