Skip to content

Commit edb7af6

Browse files
authored
Merge pull request #806 from vesellov/master
prepare to add uniqueness to Request()/Data() packets for index files
2 parents bdc0c5a + 363819a commit edb7af6

10 files changed

+56
-29
lines changed

bitdust/access/shared_access_coordinator.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -707,7 +707,7 @@ def doSupplierRequestListFiles(self, event, *args, **kwargs):
707707
pkt_out = None
708708
if event == 'supplier-file-modified':
709709
remote_path = kwargs['remote_path']
710-
if remote_path == settings.BackupIndexFileName():
710+
if remote_path == settings.BackupIndexFileName() or packetid.IsIndexFileName(remote_path):
711711
if self.state == 'CONNECTED':
712712
self.automat('restart')
713713
else:
@@ -890,6 +890,7 @@ def _do_connect_with_supplier(self, supplier_idurl):
890890
def _do_retrieve_index_file(self, supplier_idurl):
891891
packetID = global_id.MakeGlobalID(
892892
key_id=self.key_id,
893+
# path=packetid.MakeIndexFileNamePacketID(),
893894
path=settings.BackupIndexFileName(),
894895
)
895896
sc = supplier_connector.by_idurl(supplier_idurl, customer_idurl=self.customer_idurl)
@@ -951,6 +952,7 @@ def _do_read_index_file(self, wrapped_packet, supplier_idurl):
951952
def _do_send_index_file(self, supplier_idurl):
952953
packetID = global_id.MakeGlobalID(
953954
key_id=self.key_id,
955+
# path=packetid.MakeIndexFileNamePacketID(),
954956
path=settings.BackupIndexFileName(),
955957
)
956958
data = bpio.ReadBinaryFile(settings.BackupIndexFilePath(self.customer_idurl, self.key_alias))

bitdust/crypt/encrypted.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,8 @@ def Unserialize(data, decrypt_key=None):
290290
_s = dct['s']
291291
_b = dct['b']
292292
except Exception as exc:
293-
lg.exc('data unserialize failed with %r: %r\n%r\n%s' % (
293+
lg.exc('data unserialize failed using key %r with %r: %r\n%r\n%s' % (
294+
decrypt_key,
294295
exc,
295296
list(dct.keys()),
296297
(dct.get('c'), dct.get('b'), dct.get('i'), dct.get('r')),

bitdust/lib/packetid.py

+14
Original file line numberDiff line numberDiff line change
@@ -517,3 +517,17 @@ def SplitQueueMessagePacketID(packet_id):
517517
packet_id = packet_id[6:]
518518
queue_id, _, unique_id = packet_id.rpartition('_')
519519
return queue_id, unique_id
520+
521+
522+
def MakeIndexFileNamePacketID():
523+
return '.index.{}'.format(UniqueID())
524+
525+
526+
def IsIndexFileName(path):
527+
if len(path) < 19 or len(path) > 23:
528+
return False
529+
if not path.startswith('.index.'):
530+
return False
531+
if not path[7:].isdecimal():
532+
return False
533+
return True

bitdust/storage/backup_fs.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -628,7 +628,7 @@ def MakeID(itr, randomized=True):
628628
for k in itr.keys():
629629
if k == 0:
630630
continue
631-
if k == settings.BackupIndexFileName():
631+
if k == settings.BackupIndexFileName() or packetid.IsIndexFileName(k):
632632
continue
633633
try:
634634
if isinstance(itr[k], int):
@@ -1938,7 +1938,8 @@ def UnserializeIndex(json_data, customer_idurl=None, new_revision=None, deleted_
19381938
if new_revision is not None:
19391939
cur_revision = revision(customer_idurl, key_alias)
19401940
if cur_revision >= new_revision:
1941-
lg.warn('ignoring items for %r with alias %r because current revision is up to date: %d >= %d' % (customer_idurl, key_alias, cur_revision, new_revision))
1941+
if _Debug:
1942+
lg.dbg(_DebugLevel, 'ignoring items for %r with alias %r because current revision is up to date: %d >= %d' % (customer_idurl, key_alias, cur_revision, new_revision))
19421943
continue
19431944
count = 0
19441945
count_modified = 0
@@ -1980,7 +1981,9 @@ def UnserializeIndex(json_data, customer_idurl=None, new_revision=None, deleted_
19801981

19811982
def _one_item(path_id, path, info):
19821983
if path_id not in known_items:
1983-
if path_id != settings.BackupIndexFileName():
1984+
if path_id == settings.BackupIndexFileName() or packetid.IsIndexFileName(path_id):
1985+
pass
1986+
else:
19841987
to_be_removed_items.add(path_id)
19851988

19861989
TraverseByID(_one_item, iterID=fsID(customer_idurl, key_alias))
@@ -2028,7 +2031,7 @@ def _one_item(path_id, path, info):
20282031
updated_keys.append(key_alias)
20292032
if key_alias.startswith('share_'):
20302033
for new_file_item in new_files:
2031-
if new_file_item.path_id == settings.BackupIndexFileName():
2034+
if new_file_item.path_id == settings.BackupIndexFileName() or packetid.IsIndexFileName(new_file_item.path_id):
20322035
continue
20332036
new_file_path = ToPath(new_file_item.path_id, iterID=fsID(customer_idurl, key_alias))
20342037
if new_file_path:

bitdust/storage/backup_matrix.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ def process_line_dir(line, current_key_alias=None, customer_idurl=None, is_in_sy
300300
pth = line
301301
path_id = pth.strip('/')
302302
if auto_create and is_in_sync:
303-
if path_id != settings.BackupIndexFileName() and path_id not in ignored_path_ids:
303+
if (path_id != settings.BackupIndexFileName() and not packetid.IsIndexFileName(path_id)) and path_id not in ignored_path_ids:
304304
if not backup_fs.ExistsID(pth, iterID=backup_fs.fsID(customer_idurl, current_key_alias)):
305305
if _Debug:
306306
lg.out(_DebugLevel, ' AUTO CREATE DIR "%s" in the index' % pth)
@@ -360,7 +360,7 @@ def process_line_file(line, current_key_alias=None, customer_idurl=None, is_in_s
360360
filesz = -1
361361
path_id = pth.strip('/')
362362
if auto_create and is_in_sync:
363-
if path_id != settings.BackupIndexFileName() and path_id not in ignored_path_ids:
363+
if (path_id != settings.BackupIndexFileName() and not packetid.IsIndexFileName(path_id)) and path_id not in ignored_path_ids:
364364
if not backup_fs.IsFileID(pth, iterID=backup_fs.fsID(customer_idurl, current_key_alias)):
365365
if _Debug:
366366
lg.out(_DebugLevel, ' AUTO CREATE FILE "%s" in the index' % pth)
@@ -376,12 +376,12 @@ def process_line_file(line, current_key_alias=None, customer_idurl=None, is_in_s
376376
modified = True
377377
if not backup_fs.IsFileID(pth, iterID=backup_fs.fsID(customer_idurl, current_key_alias)):
378378
# remote supplier have some file - but we don't have it in the index
379-
if path_id == settings.BackupIndexFileName():
379+
if path_id == settings.BackupIndexFileName() or packetid.IsIndexFileName(path_id):
380380
# this is the index file saved on remote supplier
381381
# must remember its size and put it in the backup_fs
382382
item = backup_fs.FSItemInfo(
383-
name=path_id,
384-
path_id=path_id,
383+
name=settings.BackupIndexFileName(),
384+
path_id=settings.BackupIndexFileName(),
385385
typ=backup_fs.FILE,
386386
key_id=global_id.MakeGlobalID(idurl=customer_idurl, key_alias=current_key_alias),
387387
)
@@ -878,7 +878,7 @@ def visit(key_id, realpath, subpath, name):
878878
return True
879879
if realpath.startswith('newblock-'):
880880
return False
881-
if subpath == settings.BackupIndexFileName():
881+
if subpath == settings.BackupIndexFileName() or packetid.IsIndexFileName(subpath):
882882
return False
883883
try:
884884
version = subpath.split('/')[-2]

bitdust/storage/index_synchronizer.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ def doSuppliersSendIndexFile(self, *args, **kwargs):
377377
"""
378378
packetID = global_id.MakeGlobalID(
379379
customer=my_id.getGlobalID(key_alias='master'),
380+
# path=packetid.MakeIndexFileNamePacketID(),
380381
path=settings.BackupIndexFileName(),
381382
)
382383
self.sending_suppliers.clear()
@@ -513,6 +514,7 @@ def _on_supplier_acked(self, newpacket, info):
513514
def _do_retrieve(self, x=None):
514515
packetID = global_id.MakeGlobalID(
515516
customer=my_id.getGlobalID(key_alias='master'),
517+
# path=packetid.MakeIndexFileNamePacketID(),
516518
path=settings.BackupIndexFileName(),
517519
)
518520
localID = my_id.getIDURL()
@@ -540,4 +542,4 @@ def _do_retrieve(self, x=None):
540542
self.requested_suppliers_number += 1
541543
self.requests_packets_sent.append((packetID, supplier_idurl))
542544
if _Debug:
543-
lg.out(_DebugLevel, ' %s sending to %s' % (pkt_out, nameurl.GetName(supplier_idurl)))
545+
lg.dbg(_DebugLevel, '%s sending to %s' % (pkt_out, nameurl.GetName(supplier_idurl)))

bitdust/stream/io_throttle.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ def GetRequestQueueLength(supplierIDURL):
258258

259259

260260
class SupplierQueue:
261+
261262
def __init__(self, supplierIdentity, creatorID, customerIDURL=None):
262263
self.customerIDURL = customerIDURL
263264
if self.customerIDURL is None:
@@ -733,10 +734,12 @@ def GetRequestQueueLength(self):
733734

734735

735736
class IOThrottle:
737+
736738
"""
737739
All of the backup rebuilds will run their data requests through this
738740
So it gets throttled, also to reduce duplicate requests.
739741
"""
742+
740743
def __init__(self):
741744
self.creatorID = my_id.getIDURL()
742745
self.supplierQueues = {}
@@ -796,7 +799,7 @@ def QueueRequestFile(self, callOnReceived, creatorID, packetID, ownerID, remoteI
796799
remoteID = id_url.field(remoteID)
797800
ownerID = id_url.field(ownerID)
798801
creatorID = id_url.field(creatorID)
799-
if packetID != settings.BackupIndexFileName():
802+
if packetID != settings.BackupIndexFileName() and not packetid.IsIndexFileName(packetID):
800803
customer, pathID = packetid.SplitPacketID(packetID)
801804
filename = os.path.join(settings.getLocalBackupsDir(), customer, pathID)
802805
if os.path.exists(filename):

bitdust/supplier/customer_space.py

+15-10
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,8 @@ def make_filename(customerGlobID, filePath, keyAlias=None):
274274
if _Debug:
275275
lg.dbg(_DebugLevel, 'making a new folder: %s' % keyAliasDir)
276276
bpio._dir_make(keyAliasDir)
277+
if packetid.IsIndexFileName(filePath):
278+
filePath = settings.BackupIndexFileName()
277279
filename = os.path.join(keyAliasDir, filePath)
278280
return filename
279281

@@ -289,7 +291,7 @@ def make_valid_filename(customerIDURL, glob_path):
289291
if not customerGlobID:
290292
lg.warn('customer id is empty: %r' % glob_path)
291293
return ''
292-
if filePath != settings.BackupIndexFileName():
294+
if filePath != settings.BackupIndexFileName() and not packetid.IsIndexFileName(filePath):
293295
if not packetid.Valid(filePath): # SECURITY
294296
lg.warn('invalid file path')
295297
return ''
@@ -415,7 +417,7 @@ def on_data(newpacket):
415417
data_exists = not os.path.exists(filename)
416418
data_changed = True
417419
if not data_exists:
418-
if remote_path == settings.BackupIndexFileName():
420+
if remote_path == settings.BackupIndexFileName() or packetid.IsIndexFileName(remote_path):
419421
current_data = bpio.ReadBinaryFile(filename)
420422
if current_data == new_data:
421423
lg.warn('skip rewriting existing file %s' % filename)
@@ -431,8 +433,8 @@ def on_data(newpacket):
431433
p2p_service.SendAck(newpacket, response=strng.to_text(sz), remote_idurl=authorized_idurl)
432434
reactor.callLater(0, local_tester.TestSpaceTime) # @UndefinedVariable
433435
if key_alias != 'master' and data_changed:
434-
if remote_path == settings.BackupIndexFileName():
435-
do_notify_supplier_file_modified(key_alias, remote_path, 'write', customer_idurl, authorized_idurl)
436+
if remote_path == settings.BackupIndexFileName() or packetid.IsIndexFileName(remote_path):
437+
do_notify_supplier_file_modified(key_alias, settings.BackupIndexFileName(), 'write', customer_idurl, authorized_idurl)
436438
else:
437439
if packetid.BlockNumber(newpacket.PacketID) == 0:
438440
do_notify_supplier_file_modified(key_alias, remote_path, 'write', customer_idurl, authorized_idurl)
@@ -531,25 +533,28 @@ def on_retrieve(newpacket):
531533
# it can be not a new Data(), but the old data returning back as a response to Retreive() packet
532534
# to solve the issue we will create a new Data() packet
533535
# which will be addressed directly to recipient and "wrap" stored data inside it
536+
return_packet_id = stored_packet.PacketID
537+
if packetid.IsIndexFileName(glob_path['path']):
538+
return_packet_id = newpacket.PacketID
534539
payload = stored_packet.Serialize()
535-
routed_packet = signed.Packet(
540+
return_packet = signed.Packet(
536541
Command=commands.Data(),
537542
OwnerID=stored_packet.OwnerID,
538543
CreatorID=my_id.getIDURL(),
539-
PacketID=stored_packet.PacketID,
544+
PacketID=return_packet_id,
540545
Payload=payload,
541546
RemoteID=recipient_idurl,
542547
)
543548
if _Debug:
544549
lg.args(_DebugLevel, file_size=sz, payload_size=len(payload), fn=filename, recipient=recipient_idurl)
545550
if recipient_idurl == stored_packet.OwnerID:
546551
if _Debug:
547-
lg.dbg(_DebugLevel, 'from request %r : sending %r back to owner: %s' % (newpacket, stored_packet, recipient_idurl))
548-
gateway.outbox(routed_packet)
552+
lg.dbg(_DebugLevel, 'from request %r : sending back %r in %r to owner: %s' % (newpacket, stored_packet, return_packet, recipient_idurl))
553+
gateway.outbox(return_packet)
549554
return True
550555
if _Debug:
551-
lg.dbg(_DebugLevel, 'from request %r : returning data owned by %s to %s' % (newpacket, stored_packet.OwnerID, recipient_idurl))
552-
gateway.outbox(routed_packet)
556+
lg.dbg(_DebugLevel, 'from request %r : returning data %r in %r owned by %s to %s' % (newpacket, stored_packet, return_packet, stored_packet.OwnerID, recipient_idurl))
557+
gateway.outbox(return_packet)
553558
return True
554559

555560

bitdust/transport/packet_out.py

+1-4
Original file line numberDiff line numberDiff line change
@@ -260,10 +260,7 @@ def search_by_response_packet(newpacket=None, proto=None, host=None, outgoing_co
260260
)
261261
matching_packet_ids = []
262262
matching_packet_ids.append(incoming_packet_id.lower())
263-
if incoming_command and incoming_command in [
264-
commands.Data(),
265-
commands.Retrieve(),
266-
] and id_url.is_cached(incoming_owner_idurl) and incoming_owner_idurl == my_id.getIDURL():
263+
if incoming_command and incoming_command in [commands.Data(), commands.Retrieve()] and id_url.is_cached(incoming_owner_idurl) and incoming_owner_idurl == my_id.getIDURL():
267264
my_rotated_idurls = id_url.list_known_idurls(my_id.getIDURL(), num_revisions=10, include_revisions=False)
268265
# TODO: my_rotated_idurls can be cached for optimization
269266
for another_idurl in my_rotated_idurls:

bitdust/transport/proxy/proxy_receiver.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ def _do_send_identity_to_router(self, identity_source, failed_event):
756756
Command=commands.Identity(),
757757
OwnerID=my_id.getIDURL(),
758758
CreatorID=my_id.getIDURL(),
759-
PacketID=('proxy_receiver:%s' % packetid.UniqueID()),
759+
PacketID='proxy_receiver:%s' % packetid.UniqueID(),
760760
Payload=identity_obj.serialize(),
761761
RemoteID=self.router_idurl,
762762
)

0 commit comments

Comments
 (0)