From 363819adf85b59d2900c1550ecc0177bda96e195 Mon Sep 17 00:00:00 2001 From: Veselin Penev Date: Mon, 7 Oct 2024 10:10:11 +0200 Subject: [PATCH] prepare to add uniqueness to Request()/Data() packets for index files --- bitdust/access/shared_access_coordinator.py | 4 +++- bitdust/crypt/encrypted.py | 3 ++- bitdust/lib/packetid.py | 14 ++++++++++++ bitdust/storage/backup_fs.py | 11 +++++---- bitdust/storage/backup_matrix.py | 12 +++++----- bitdust/storage/index_synchronizer.py | 4 +++- bitdust/stream/io_throttle.py | 5 ++++- bitdust/supplier/customer_space.py | 25 ++++++++++++--------- bitdust/transport/packet_out.py | 5 +---- bitdust/transport/proxy/proxy_receiver.py | 2 +- 10 files changed, 56 insertions(+), 29 deletions(-) diff --git a/bitdust/access/shared_access_coordinator.py b/bitdust/access/shared_access_coordinator.py index 03c510776..c44383b0b 100644 --- a/bitdust/access/shared_access_coordinator.py +++ b/bitdust/access/shared_access_coordinator.py @@ -707,7 +707,7 @@ def doSupplierRequestListFiles(self, event, *args, **kwargs): pkt_out = None if event == 'supplier-file-modified': remote_path = kwargs['remote_path'] - if remote_path == settings.BackupIndexFileName(): + if remote_path == settings.BackupIndexFileName() or packetid.IsIndexFileName(remote_path): if self.state == 'CONNECTED': self.automat('restart') else: @@ -890,6 +890,7 @@ def _do_connect_with_supplier(self, supplier_idurl): def _do_retrieve_index_file(self, supplier_idurl): packetID = global_id.MakeGlobalID( key_id=self.key_id, + # path=packetid.MakeIndexFileNamePacketID(), path=settings.BackupIndexFileName(), ) sc = supplier_connector.by_idurl(supplier_idurl, customer_idurl=self.customer_idurl) @@ -951,6 +952,7 @@ def _do_read_index_file(self, wrapped_packet, supplier_idurl): def _do_send_index_file(self, supplier_idurl): packetID = global_id.MakeGlobalID( key_id=self.key_id, + # path=packetid.MakeIndexFileNamePacketID(), path=settings.BackupIndexFileName(), ) data = bpio.ReadBinaryFile(settings.BackupIndexFilePath(self.customer_idurl, self.key_alias)) diff --git a/bitdust/crypt/encrypted.py b/bitdust/crypt/encrypted.py index a4364f120..c3320a961 100644 --- a/bitdust/crypt/encrypted.py +++ b/bitdust/crypt/encrypted.py @@ -290,7 +290,8 @@ def Unserialize(data, decrypt_key=None): _s = dct['s'] _b = dct['b'] except Exception as exc: - lg.exc('data unserialize failed with %r: %r\n%r\n%s' % ( + lg.exc('data unserialize failed using key %r with %r: %r\n%r\n%s' % ( + decrypt_key, exc, list(dct.keys()), (dct.get('c'), dct.get('b'), dct.get('i'), dct.get('r')), diff --git a/bitdust/lib/packetid.py b/bitdust/lib/packetid.py index 6fff4131b..0ceadf6bf 100644 --- a/bitdust/lib/packetid.py +++ b/bitdust/lib/packetid.py @@ -517,3 +517,17 @@ def SplitQueueMessagePacketID(packet_id): packet_id = packet_id[6:] queue_id, _, unique_id = packet_id.rpartition('_') return queue_id, unique_id + + +def MakeIndexFileNamePacketID(): + return '.index.{}'.format(UniqueID()) + + +def IsIndexFileName(path): + if len(path) < 19 or len(path) > 23: + return False + if not path.startswith('.index.'): + return False + if not path[7:].isdecimal(): + return False + return True diff --git a/bitdust/storage/backup_fs.py b/bitdust/storage/backup_fs.py index afeec556b..e7079b3a4 100644 --- a/bitdust/storage/backup_fs.py +++ b/bitdust/storage/backup_fs.py @@ -628,7 +628,7 @@ def MakeID(itr, randomized=True): for k in itr.keys(): if k == 0: continue - if k == settings.BackupIndexFileName(): + if k == settings.BackupIndexFileName() or packetid.IsIndexFileName(k): continue try: if isinstance(itr[k], int): @@ -1938,7 +1938,8 @@ def UnserializeIndex(json_data, customer_idurl=None, new_revision=None, deleted_ if new_revision is not None: cur_revision = revision(customer_idurl, key_alias) if cur_revision >= new_revision: - lg.warn('ignoring items for %r with alias %r because current revision is up to date: %d >= %d' % (customer_idurl, key_alias, cur_revision, new_revision)) + if _Debug: + lg.dbg(_DebugLevel, 'ignoring items for %r with alias %r because current revision is up to date: %d >= %d' % (customer_idurl, key_alias, cur_revision, new_revision)) continue count = 0 count_modified = 0 @@ -1980,7 +1981,9 @@ def UnserializeIndex(json_data, customer_idurl=None, new_revision=None, deleted_ def _one_item(path_id, path, info): if path_id not in known_items: - if path_id != settings.BackupIndexFileName(): + if path_id == settings.BackupIndexFileName() or packetid.IsIndexFileName(path_id): + pass + else: to_be_removed_items.add(path_id) TraverseByID(_one_item, iterID=fsID(customer_idurl, key_alias)) @@ -2028,7 +2031,7 @@ def _one_item(path_id, path, info): updated_keys.append(key_alias) if key_alias.startswith('share_'): for new_file_item in new_files: - if new_file_item.path_id == settings.BackupIndexFileName(): + if new_file_item.path_id == settings.BackupIndexFileName() or packetid.IsIndexFileName(new_file_item.path_id): continue new_file_path = ToPath(new_file_item.path_id, iterID=fsID(customer_idurl, key_alias)) if new_file_path: diff --git a/bitdust/storage/backup_matrix.py b/bitdust/storage/backup_matrix.py index c1d769c66..d467f9dee 100644 --- a/bitdust/storage/backup_matrix.py +++ b/bitdust/storage/backup_matrix.py @@ -300,7 +300,7 @@ def process_line_dir(line, current_key_alias=None, customer_idurl=None, is_in_sy pth = line path_id = pth.strip('/') if auto_create and is_in_sync: - if path_id != settings.BackupIndexFileName() and path_id not in ignored_path_ids: + if (path_id != settings.BackupIndexFileName() and not packetid.IsIndexFileName(path_id)) and path_id not in ignored_path_ids: if not backup_fs.ExistsID(pth, iterID=backup_fs.fsID(customer_idurl, current_key_alias)): if _Debug: lg.out(_DebugLevel, ' AUTO CREATE DIR "%s" in the index' % pth) @@ -360,7 +360,7 @@ def process_line_file(line, current_key_alias=None, customer_idurl=None, is_in_s filesz = -1 path_id = pth.strip('/') if auto_create and is_in_sync: - if path_id != settings.BackupIndexFileName() and path_id not in ignored_path_ids: + if (path_id != settings.BackupIndexFileName() and not packetid.IsIndexFileName(path_id)) and path_id not in ignored_path_ids: if not backup_fs.IsFileID(pth, iterID=backup_fs.fsID(customer_idurl, current_key_alias)): if _Debug: lg.out(_DebugLevel, ' AUTO CREATE FILE "%s" in the index' % pth) @@ -376,12 +376,12 @@ def process_line_file(line, current_key_alias=None, customer_idurl=None, is_in_s modified = True if not backup_fs.IsFileID(pth, iterID=backup_fs.fsID(customer_idurl, current_key_alias)): # remote supplier have some file - but we don't have it in the index - if path_id == settings.BackupIndexFileName(): + if path_id == settings.BackupIndexFileName() or packetid.IsIndexFileName(path_id): # this is the index file saved on remote supplier # must remember its size and put it in the backup_fs item = backup_fs.FSItemInfo( - name=path_id, - path_id=path_id, + name=settings.BackupIndexFileName(), + path_id=settings.BackupIndexFileName(), typ=backup_fs.FILE, key_id=global_id.MakeGlobalID(idurl=customer_idurl, key_alias=current_key_alias), ) @@ -878,7 +878,7 @@ def visit(key_id, realpath, subpath, name): return True if realpath.startswith('newblock-'): return False - if subpath == settings.BackupIndexFileName(): + if subpath == settings.BackupIndexFileName() or packetid.IsIndexFileName(subpath): return False try: version = subpath.split('/')[-2] diff --git a/bitdust/storage/index_synchronizer.py b/bitdust/storage/index_synchronizer.py index 2c045f0df..c58ab75a4 100644 --- a/bitdust/storage/index_synchronizer.py +++ b/bitdust/storage/index_synchronizer.py @@ -377,6 +377,7 @@ def doSuppliersSendIndexFile(self, *args, **kwargs): """ packetID = global_id.MakeGlobalID( customer=my_id.getGlobalID(key_alias='master'), + # path=packetid.MakeIndexFileNamePacketID(), path=settings.BackupIndexFileName(), ) self.sending_suppliers.clear() @@ -513,6 +514,7 @@ def _on_supplier_acked(self, newpacket, info): def _do_retrieve(self, x=None): packetID = global_id.MakeGlobalID( customer=my_id.getGlobalID(key_alias='master'), + # path=packetid.MakeIndexFileNamePacketID(), path=settings.BackupIndexFileName(), ) localID = my_id.getIDURL() @@ -540,4 +542,4 @@ def _do_retrieve(self, x=None): self.requested_suppliers_number += 1 self.requests_packets_sent.append((packetID, supplier_idurl)) if _Debug: - lg.out(_DebugLevel, ' %s sending to %s' % (pkt_out, nameurl.GetName(supplier_idurl))) + lg.dbg(_DebugLevel, '%s sending to %s' % (pkt_out, nameurl.GetName(supplier_idurl))) diff --git a/bitdust/stream/io_throttle.py b/bitdust/stream/io_throttle.py index f4843b6e9..87fb98c96 100644 --- a/bitdust/stream/io_throttle.py +++ b/bitdust/stream/io_throttle.py @@ -258,6 +258,7 @@ def GetRequestQueueLength(supplierIDURL): class SupplierQueue: + def __init__(self, supplierIdentity, creatorID, customerIDURL=None): self.customerIDURL = customerIDURL if self.customerIDURL is None: @@ -733,10 +734,12 @@ def GetRequestQueueLength(self): class IOThrottle: + """ All of the backup rebuilds will run their data requests through this So it gets throttled, also to reduce duplicate requests. """ + def __init__(self): self.creatorID = my_id.getIDURL() self.supplierQueues = {} @@ -796,7 +799,7 @@ def QueueRequestFile(self, callOnReceived, creatorID, packetID, ownerID, remoteI remoteID = id_url.field(remoteID) ownerID = id_url.field(ownerID) creatorID = id_url.field(creatorID) - if packetID != settings.BackupIndexFileName(): + if packetID != settings.BackupIndexFileName() and not packetid.IsIndexFileName(packetID): customer, pathID = packetid.SplitPacketID(packetID) filename = os.path.join(settings.getLocalBackupsDir(), customer, pathID) if os.path.exists(filename): diff --git a/bitdust/supplier/customer_space.py b/bitdust/supplier/customer_space.py index 2b2f21fd6..963501558 100644 --- a/bitdust/supplier/customer_space.py +++ b/bitdust/supplier/customer_space.py @@ -274,6 +274,8 @@ def make_filename(customerGlobID, filePath, keyAlias=None): if _Debug: lg.dbg(_DebugLevel, 'making a new folder: %s' % keyAliasDir) bpio._dir_make(keyAliasDir) + if packetid.IsIndexFileName(filePath): + filePath = settings.BackupIndexFileName() filename = os.path.join(keyAliasDir, filePath) return filename @@ -289,7 +291,7 @@ def make_valid_filename(customerIDURL, glob_path): if not customerGlobID: lg.warn('customer id is empty: %r' % glob_path) return '' - if filePath != settings.BackupIndexFileName(): + if filePath != settings.BackupIndexFileName() and not packetid.IsIndexFileName(filePath): if not packetid.Valid(filePath): # SECURITY lg.warn('invalid file path') return '' @@ -415,7 +417,7 @@ def on_data(newpacket): data_exists = not os.path.exists(filename) data_changed = True if not data_exists: - if remote_path == settings.BackupIndexFileName(): + if remote_path == settings.BackupIndexFileName() or packetid.IsIndexFileName(remote_path): current_data = bpio.ReadBinaryFile(filename) if current_data == new_data: lg.warn('skip rewriting existing file %s' % filename) @@ -431,8 +433,8 @@ def on_data(newpacket): p2p_service.SendAck(newpacket, response=strng.to_text(sz), remote_idurl=authorized_idurl) reactor.callLater(0, local_tester.TestSpaceTime) # @UndefinedVariable if key_alias != 'master' and data_changed: - if remote_path == settings.BackupIndexFileName(): - do_notify_supplier_file_modified(key_alias, remote_path, 'write', customer_idurl, authorized_idurl) + if remote_path == settings.BackupIndexFileName() or packetid.IsIndexFileName(remote_path): + do_notify_supplier_file_modified(key_alias, settings.BackupIndexFileName(), 'write', customer_idurl, authorized_idurl) else: if packetid.BlockNumber(newpacket.PacketID) == 0: do_notify_supplier_file_modified(key_alias, remote_path, 'write', customer_idurl, authorized_idurl) @@ -531,12 +533,15 @@ def on_retrieve(newpacket): # it can be not a new Data(), but the old data returning back as a response to Retreive() packet # to solve the issue we will create a new Data() packet # which will be addressed directly to recipient and "wrap" stored data inside it + return_packet_id = stored_packet.PacketID + if packetid.IsIndexFileName(glob_path['path']): + return_packet_id = newpacket.PacketID payload = stored_packet.Serialize() - routed_packet = signed.Packet( + return_packet = signed.Packet( Command=commands.Data(), OwnerID=stored_packet.OwnerID, CreatorID=my_id.getIDURL(), - PacketID=stored_packet.PacketID, + PacketID=return_packet_id, Payload=payload, RemoteID=recipient_idurl, ) @@ -544,12 +549,12 @@ def on_retrieve(newpacket): lg.args(_DebugLevel, file_size=sz, payload_size=len(payload), fn=filename, recipient=recipient_idurl) if recipient_idurl == stored_packet.OwnerID: if _Debug: - lg.dbg(_DebugLevel, 'from request %r : sending %r back to owner: %s' % (newpacket, stored_packet, recipient_idurl)) - gateway.outbox(routed_packet) + lg.dbg(_DebugLevel, 'from request %r : sending back %r in %r to owner: %s' % (newpacket, stored_packet, return_packet, recipient_idurl)) + gateway.outbox(return_packet) return True if _Debug: - lg.dbg(_DebugLevel, 'from request %r : returning data owned by %s to %s' % (newpacket, stored_packet.OwnerID, recipient_idurl)) - gateway.outbox(routed_packet) + lg.dbg(_DebugLevel, 'from request %r : returning data %r in %r owned by %s to %s' % (newpacket, stored_packet, return_packet, stored_packet.OwnerID, recipient_idurl)) + gateway.outbox(return_packet) return True diff --git a/bitdust/transport/packet_out.py b/bitdust/transport/packet_out.py index f6a0b22a0..8110b94c6 100644 --- a/bitdust/transport/packet_out.py +++ b/bitdust/transport/packet_out.py @@ -260,10 +260,7 @@ def search_by_response_packet(newpacket=None, proto=None, host=None, outgoing_co ) matching_packet_ids = [] matching_packet_ids.append(incoming_packet_id.lower()) - if incoming_command and incoming_command in [ - commands.Data(), - commands.Retrieve(), - ] and id_url.is_cached(incoming_owner_idurl) and incoming_owner_idurl == my_id.getIDURL(): + if incoming_command and incoming_command in [commands.Data(), commands.Retrieve()] and id_url.is_cached(incoming_owner_idurl) and incoming_owner_idurl == my_id.getIDURL(): my_rotated_idurls = id_url.list_known_idurls(my_id.getIDURL(), num_revisions=10, include_revisions=False) # TODO: my_rotated_idurls can be cached for optimization for another_idurl in my_rotated_idurls: diff --git a/bitdust/transport/proxy/proxy_receiver.py b/bitdust/transport/proxy/proxy_receiver.py index f3ac4124f..60305a6c7 100644 --- a/bitdust/transport/proxy/proxy_receiver.py +++ b/bitdust/transport/proxy/proxy_receiver.py @@ -756,7 +756,7 @@ def _do_send_identity_to_router(self, identity_source, failed_event): Command=commands.Identity(), OwnerID=my_id.getIDURL(), CreatorID=my_id.getIDURL(), - PacketID=('proxy_receiver:%s' % packetid.UniqueID()), + PacketID='proxy_receiver:%s' % packetid.UniqueID(), Payload=identity_obj.serialize(), RemoteID=self.router_idurl, )