Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

prepare to add uniqueness to Request()/Data() packets for index files #806

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion bitdust/access/shared_access_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ def doSupplierRequestListFiles(self, event, *args, **kwargs):
pkt_out = None
if event == 'supplier-file-modified':
remote_path = kwargs['remote_path']
if remote_path == settings.BackupIndexFileName():
if remote_path == settings.BackupIndexFileName() or packetid.IsIndexFileName(remote_path):
if self.state == 'CONNECTED':
self.automat('restart')
else:
Expand Down Expand Up @@ -890,6 +890,7 @@ def _do_connect_with_supplier(self, supplier_idurl):
def _do_retrieve_index_file(self, supplier_idurl):
packetID = global_id.MakeGlobalID(
key_id=self.key_id,
# path=packetid.MakeIndexFileNamePacketID(),
path=settings.BackupIndexFileName(),
)
sc = supplier_connector.by_idurl(supplier_idurl, customer_idurl=self.customer_idurl)
Expand Down Expand Up @@ -951,6 +952,7 @@ def _do_read_index_file(self, wrapped_packet, supplier_idurl):
def _do_send_index_file(self, supplier_idurl):
packetID = global_id.MakeGlobalID(
key_id=self.key_id,
# path=packetid.MakeIndexFileNamePacketID(),
path=settings.BackupIndexFileName(),
)
data = bpio.ReadBinaryFile(settings.BackupIndexFilePath(self.customer_idurl, self.key_alias))
Expand Down
3 changes: 2 additions & 1 deletion bitdust/crypt/encrypted.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ def Unserialize(data, decrypt_key=None):
_s = dct['s']
_b = dct['b']
except Exception as exc:
lg.exc('data unserialize failed with %r: %r\n%r\n%s' % (
lg.exc('data unserialize failed using key %r with %r: %r\n%r\n%s' % (
decrypt_key,
exc,
list(dct.keys()),
(dct.get('c'), dct.get('b'), dct.get('i'), dct.get('r')),
Expand Down
14 changes: 14 additions & 0 deletions bitdust/lib/packetid.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,3 +517,17 @@ def SplitQueueMessagePacketID(packet_id):
packet_id = packet_id[6:]
queue_id, _, unique_id = packet_id.rpartition('_')
return queue_id, unique_id


def MakeIndexFileNamePacketID():
return '.index.{}'.format(UniqueID())


def IsIndexFileName(path):
if len(path) < 19 or len(path) > 23:
return False
if not path.startswith('.index.'):
return False
if not path[7:].isdecimal():
return False
return True
11 changes: 7 additions & 4 deletions bitdust/storage/backup_fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ def MakeID(itr, randomized=True):
for k in itr.keys():
if k == 0:
continue
if k == settings.BackupIndexFileName():
if k == settings.BackupIndexFileName() or packetid.IsIndexFileName(k):
continue
try:
if isinstance(itr[k], int):
Expand Down Expand Up @@ -1938,7 +1938,8 @@ def UnserializeIndex(json_data, customer_idurl=None, new_revision=None, deleted_
if new_revision is not None:
cur_revision = revision(customer_idurl, key_alias)
if cur_revision >= new_revision:
lg.warn('ignoring items for %r with alias %r because current revision is up to date: %d >= %d' % (customer_idurl, key_alias, cur_revision, new_revision))
if _Debug:
lg.dbg(_DebugLevel, 'ignoring items for %r with alias %r because current revision is up to date: %d >= %d' % (customer_idurl, key_alias, cur_revision, new_revision))
continue
count = 0
count_modified = 0
Expand Down Expand Up @@ -1980,7 +1981,9 @@ def UnserializeIndex(json_data, customer_idurl=None, new_revision=None, deleted_

def _one_item(path_id, path, info):
if path_id not in known_items:
if path_id != settings.BackupIndexFileName():
if path_id == settings.BackupIndexFileName() or packetid.IsIndexFileName(path_id):
pass
else:
to_be_removed_items.add(path_id)

TraverseByID(_one_item, iterID=fsID(customer_idurl, key_alias))
Expand Down Expand Up @@ -2028,7 +2031,7 @@ def _one_item(path_id, path, info):
updated_keys.append(key_alias)
if key_alias.startswith('share_'):
for new_file_item in new_files:
if new_file_item.path_id == settings.BackupIndexFileName():
if new_file_item.path_id == settings.BackupIndexFileName() or packetid.IsIndexFileName(new_file_item.path_id):
continue
new_file_path = ToPath(new_file_item.path_id, iterID=fsID(customer_idurl, key_alias))
if new_file_path:
Expand Down
12 changes: 6 additions & 6 deletions bitdust/storage/backup_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ def process_line_dir(line, current_key_alias=None, customer_idurl=None, is_in_sy
pth = line
path_id = pth.strip('/')
if auto_create and is_in_sync:
if path_id != settings.BackupIndexFileName() and path_id not in ignored_path_ids:
if (path_id != settings.BackupIndexFileName() and not packetid.IsIndexFileName(path_id)) and path_id not in ignored_path_ids:
if not backup_fs.ExistsID(pth, iterID=backup_fs.fsID(customer_idurl, current_key_alias)):
if _Debug:
lg.out(_DebugLevel, ' AUTO CREATE DIR "%s" in the index' % pth)
Expand Down Expand Up @@ -360,7 +360,7 @@ def process_line_file(line, current_key_alias=None, customer_idurl=None, is_in_s
filesz = -1
path_id = pth.strip('/')
if auto_create and is_in_sync:
if path_id != settings.BackupIndexFileName() and path_id not in ignored_path_ids:
if (path_id != settings.BackupIndexFileName() and not packetid.IsIndexFileName(path_id)) and path_id not in ignored_path_ids:
if not backup_fs.IsFileID(pth, iterID=backup_fs.fsID(customer_idurl, current_key_alias)):
if _Debug:
lg.out(_DebugLevel, ' AUTO CREATE FILE "%s" in the index' % pth)
Expand All @@ -376,12 +376,12 @@ def process_line_file(line, current_key_alias=None, customer_idurl=None, is_in_s
modified = True
if not backup_fs.IsFileID(pth, iterID=backup_fs.fsID(customer_idurl, current_key_alias)):
# remote supplier have some file - but we don't have it in the index
if path_id == settings.BackupIndexFileName():
if path_id == settings.BackupIndexFileName() or packetid.IsIndexFileName(path_id):
# this is the index file saved on remote supplier
# must remember its size and put it in the backup_fs
item = backup_fs.FSItemInfo(
name=path_id,
path_id=path_id,
name=settings.BackupIndexFileName(),
path_id=settings.BackupIndexFileName(),
typ=backup_fs.FILE,
key_id=global_id.MakeGlobalID(idurl=customer_idurl, key_alias=current_key_alias),
)
Expand Down Expand Up @@ -878,7 +878,7 @@ def visit(key_id, realpath, subpath, name):
return True
if realpath.startswith('newblock-'):
return False
if subpath == settings.BackupIndexFileName():
if subpath == settings.BackupIndexFileName() or packetid.IsIndexFileName(subpath):
return False
try:
version = subpath.split('/')[-2]
Expand Down
4 changes: 3 additions & 1 deletion bitdust/storage/index_synchronizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,7 @@ def doSuppliersSendIndexFile(self, *args, **kwargs):
"""
packetID = global_id.MakeGlobalID(
customer=my_id.getGlobalID(key_alias='master'),
# path=packetid.MakeIndexFileNamePacketID(),
path=settings.BackupIndexFileName(),
)
self.sending_suppliers.clear()
Expand Down Expand Up @@ -513,6 +514,7 @@ def _on_supplier_acked(self, newpacket, info):
def _do_retrieve(self, x=None):
packetID = global_id.MakeGlobalID(
customer=my_id.getGlobalID(key_alias='master'),
# path=packetid.MakeIndexFileNamePacketID(),
path=settings.BackupIndexFileName(),
)
localID = my_id.getIDURL()
Expand Down Expand Up @@ -540,4 +542,4 @@ def _do_retrieve(self, x=None):
self.requested_suppliers_number += 1
self.requests_packets_sent.append((packetID, supplier_idurl))
if _Debug:
lg.out(_DebugLevel, ' %s sending to %s' % (pkt_out, nameurl.GetName(supplier_idurl)))
lg.dbg(_DebugLevel, '%s sending to %s' % (pkt_out, nameurl.GetName(supplier_idurl)))
5 changes: 4 additions & 1 deletion bitdust/stream/io_throttle.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,7 @@ def GetRequestQueueLength(supplierIDURL):


class SupplierQueue:

def __init__(self, supplierIdentity, creatorID, customerIDURL=None):
self.customerIDURL = customerIDURL
if self.customerIDURL is None:
Expand Down Expand Up @@ -733,10 +734,12 @@ def GetRequestQueueLength(self):


class IOThrottle:

"""
All of the backup rebuilds will run their data requests through this
So it gets throttled, also to reduce duplicate requests.
"""

def __init__(self):
self.creatorID = my_id.getIDURL()
self.supplierQueues = {}
Expand Down Expand Up @@ -796,7 +799,7 @@ def QueueRequestFile(self, callOnReceived, creatorID, packetID, ownerID, remoteI
remoteID = id_url.field(remoteID)
ownerID = id_url.field(ownerID)
creatorID = id_url.field(creatorID)
if packetID != settings.BackupIndexFileName():
if packetID != settings.BackupIndexFileName() and not packetid.IsIndexFileName(packetID):
customer, pathID = packetid.SplitPacketID(packetID)
filename = os.path.join(settings.getLocalBackupsDir(), customer, pathID)
if os.path.exists(filename):
Expand Down
25 changes: 15 additions & 10 deletions bitdust/supplier/customer_space.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,8 @@ def make_filename(customerGlobID, filePath, keyAlias=None):
if _Debug:
lg.dbg(_DebugLevel, 'making a new folder: %s' % keyAliasDir)
bpio._dir_make(keyAliasDir)
if packetid.IsIndexFileName(filePath):
filePath = settings.BackupIndexFileName()
filename = os.path.join(keyAliasDir, filePath)
return filename

Expand All @@ -289,7 +291,7 @@ def make_valid_filename(customerIDURL, glob_path):
if not customerGlobID:
lg.warn('customer id is empty: %r' % glob_path)
return ''
if filePath != settings.BackupIndexFileName():
if filePath != settings.BackupIndexFileName() and not packetid.IsIndexFileName(filePath):
if not packetid.Valid(filePath): # SECURITY
lg.warn('invalid file path')
return ''
Expand Down Expand Up @@ -415,7 +417,7 @@ def on_data(newpacket):
data_exists = not os.path.exists(filename)
data_changed = True
if not data_exists:
if remote_path == settings.BackupIndexFileName():
if remote_path == settings.BackupIndexFileName() or packetid.IsIndexFileName(remote_path):
current_data = bpio.ReadBinaryFile(filename)
if current_data == new_data:
lg.warn('skip rewriting existing file %s' % filename)
Expand All @@ -431,8 +433,8 @@ def on_data(newpacket):
p2p_service.SendAck(newpacket, response=strng.to_text(sz), remote_idurl=authorized_idurl)
reactor.callLater(0, local_tester.TestSpaceTime) # @UndefinedVariable
if key_alias != 'master' and data_changed:
if remote_path == settings.BackupIndexFileName():
do_notify_supplier_file_modified(key_alias, remote_path, 'write', customer_idurl, authorized_idurl)
if remote_path == settings.BackupIndexFileName() or packetid.IsIndexFileName(remote_path):
do_notify_supplier_file_modified(key_alias, settings.BackupIndexFileName(), 'write', customer_idurl, authorized_idurl)
else:
if packetid.BlockNumber(newpacket.PacketID) == 0:
do_notify_supplier_file_modified(key_alias, remote_path, 'write', customer_idurl, authorized_idurl)
Expand Down Expand Up @@ -531,25 +533,28 @@ def on_retrieve(newpacket):
# it can be not a new Data(), but the old data returning back as a response to Retreive() packet
# to solve the issue we will create a new Data() packet
# which will be addressed directly to recipient and "wrap" stored data inside it
return_packet_id = stored_packet.PacketID
if packetid.IsIndexFileName(glob_path['path']):
return_packet_id = newpacket.PacketID
payload = stored_packet.Serialize()
routed_packet = signed.Packet(
return_packet = signed.Packet(
Command=commands.Data(),
OwnerID=stored_packet.OwnerID,
CreatorID=my_id.getIDURL(),
PacketID=stored_packet.PacketID,
PacketID=return_packet_id,
Payload=payload,
RemoteID=recipient_idurl,
)
if _Debug:
lg.args(_DebugLevel, file_size=sz, payload_size=len(payload), fn=filename, recipient=recipient_idurl)
if recipient_idurl == stored_packet.OwnerID:
if _Debug:
lg.dbg(_DebugLevel, 'from request %r : sending %r back to owner: %s' % (newpacket, stored_packet, recipient_idurl))
gateway.outbox(routed_packet)
lg.dbg(_DebugLevel, 'from request %r : sending back %r in %r to owner: %s' % (newpacket, stored_packet, return_packet, recipient_idurl))
gateway.outbox(return_packet)
return True
if _Debug:
lg.dbg(_DebugLevel, 'from request %r : returning data owned by %s to %s' % (newpacket, stored_packet.OwnerID, recipient_idurl))
gateway.outbox(routed_packet)
lg.dbg(_DebugLevel, 'from request %r : returning data %r in %r owned by %s to %s' % (newpacket, stored_packet, return_packet, stored_packet.OwnerID, recipient_idurl))
gateway.outbox(return_packet)
return True


Expand Down
5 changes: 1 addition & 4 deletions bitdust/transport/packet_out.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,10 +260,7 @@ def search_by_response_packet(newpacket=None, proto=None, host=None, outgoing_co
)
matching_packet_ids = []
matching_packet_ids.append(incoming_packet_id.lower())
if incoming_command and incoming_command in [
commands.Data(),
commands.Retrieve(),
] and id_url.is_cached(incoming_owner_idurl) and incoming_owner_idurl == my_id.getIDURL():
if incoming_command and incoming_command in [commands.Data(), commands.Retrieve()] and id_url.is_cached(incoming_owner_idurl) and incoming_owner_idurl == my_id.getIDURL():
my_rotated_idurls = id_url.list_known_idurls(my_id.getIDURL(), num_revisions=10, include_revisions=False)
# TODO: my_rotated_idurls can be cached for optimization
for another_idurl in my_rotated_idurls:
Expand Down
2 changes: 1 addition & 1 deletion bitdust/transport/proxy/proxy_receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ def _do_send_identity_to_router(self, identity_source, failed_event):
Command=commands.Identity(),
OwnerID=my_id.getIDURL(),
CreatorID=my_id.getIDURL(),
PacketID=('proxy_receiver:%s' % packetid.UniqueID()),
PacketID='proxy_receiver:%s' % packetid.UniqueID(),
Payload=identity_obj.serialize(),
RemoteID=self.router_idurl,
)
Expand Down
Loading