Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

various improvements in shared_access_coordinator(), restore_worker() and service_message_history() #804

Merged
merged 2 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions bitdust/access/shared_access_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,7 @@ def __init__(self, key_id, debug_level=_DebugLevel, log_events=_Debug, log_trans
self.customer_idurl = self.glob_id['idurl']
self.known_suppliers_list = []
self.known_ecc_map = None
self.critical_suppliers_number = 1
self.dht_lookup_use_cache = True
self.received_index_file_revision = {}
self.last_time_in_sync = -1
Expand Down Expand Up @@ -595,25 +596,25 @@ def A(self, event, *args, **kwargs):
if event == 'shutdown':
self.state = 'CLOSED'
self.doDestroyMe(*args, **kwargs)
elif event == 'timer-30sec' or event == 'all-suppliers-disconnected':
self.state = 'DISCONNECTED'
self.doReportDisconnected(*args, **kwargs)
elif event == 'list-files-received':
self.doSupplierSendIndexFile(*args, **kwargs)
elif event == 'key-not-registered':
self.doSupplierTransferPubKey(*args, **kwargs)
elif event == 'supplier-connected' or event == 'key-sent':
self.doSupplierRequestIndexFile(*args, **kwargs)
elif event == 'all-suppliers-connected':
self.state = 'CONNECTED'
self.doReportConnected(*args, **kwargs)
elif event == 'index-sent' or event == 'index-up-to-date' or event == 'index-failed' or event == 'list-files-failed' or event == 'supplier-failed':
self.doRemember(event, *args, **kwargs)
self.doCheckAllConnected(*args, **kwargs)
elif event == 'list-files-verified':
self.doSupplierProcessListFiles(*args, **kwargs)
elif event == 'supplier-file-modified' or event == 'index-received' or event == 'index-missing':
self.doSupplierRequestListFiles(event, *args, **kwargs)
elif (event == 'timer-30sec' and not self.isEnoughConnected(*args, **kwargs)) or event == 'all-suppliers-disconnected':
self.state = 'DISCONNECTED'
self.doReportDisconnected(*args, **kwargs)
elif (event == 'timer-30sec' and self.isEnoughConnected(*args, **kwargs)) or event == 'all-suppliers-connected':
self.state = 'CONNECTED'
self.doReportConnected(*args, **kwargs)
#---DISCONNECTED---
elif self.state == 'DISCONNECTED':
if event == 'shutdown':
Expand All @@ -639,6 +640,14 @@ def A(self, event, *args, **kwargs):
pass
return None

def isEnoughConnected(self, *args, **kwargs):
"""
Action method.
"""
if _Debug:
lg.args(_DebugLevel, progress=len(self.suppliers_in_progress), succeed=self.suppliers_succeed, critical_suppliers_number=self.critical_suppliers_number)
return len(self.suppliers_succeed) >= self.critical_suppliers_number

def doInit(self, *args, **kwargs):
"""
Action method.
Expand Down Expand Up @@ -667,6 +676,10 @@ def doConnectCustomerSuppliers(self, *args, **kwargs):
self.automat('all-suppliers-disconnected')
return
self.known_ecc_map = args[0].get('ecc_map')
self.critical_suppliers_number = 1
if self.known_ecc_map:
from bitdust.raid import eccmap
self.critical_suppliers_number = eccmap.GetCorrectableErrors(eccmap.GetEccMapSuppliersNumber(self.known_ecc_map))
self.suppliers_in_progress.clear()
self.suppliers_succeed.clear()
for supplier_idurl in self.known_suppliers_list:
Expand Down Expand Up @@ -811,14 +824,10 @@ def doCheckAllConnected(self, *args, **kwargs):
"""
Action method.
"""
critical_suppliers_number = 1
if self.known_ecc_map:
from bitdust.raid import eccmap
critical_suppliers_number = eccmap.GetCorrectableErrors(eccmap.GetEccMapSuppliersNumber(self.known_ecc_map))
if _Debug:
lg.args(_DebugLevel, progress=len(self.suppliers_in_progress), succeed=self.suppliers_succeed, critical_suppliers_number=critical_suppliers_number)
lg.args(_DebugLevel, progress=len(self.suppliers_in_progress), succeed=self.suppliers_succeed, critical_suppliers_number=self.critical_suppliers_number)
if len(self.suppliers_in_progress) == 0:
if len(self.suppliers_succeed) >= critical_suppliers_number:
if len(self.suppliers_succeed) >= self.critical_suppliers_number:
self.automat('all-suppliers-connected')
else:
self.automat('all-suppliers-disconnected')
Expand Down
127 changes: 0 additions & 127 deletions bitdust/automats/global_state.py

This file was deleted.

1 change: 0 additions & 1 deletion bitdust/contacts/contactsdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -847,7 +847,6 @@ def load_contacts():
_CorrespondentsChangedCallback([], correspondents())
AddContactsChangedCallback(on_contacts_changed)
if listeners.is_populate_required('correspondent'):
# listeners.populate_later().remove('correspondent')
populate_correspondents()


Expand Down
2 changes: 0 additions & 2 deletions bitdust/customer/fire_hire.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@

from bitdust.logs import lg

from bitdust.automats import global_state
from bitdust.automats import automat

from bitdust.lib import misc
Expand Down Expand Up @@ -267,7 +266,6 @@ def state_changed(self, oldstate, newstate, event, *args, **kwargs):
"""
This method intended to catch the moment when automat's state was changed.
"""
global_state.set_global_state('FIREHIRE ' + newstate)
if newstate == 'READY' and event != 'instant':
self.automat('instant')

Expand Down
3 changes: 1 addition & 2 deletions bitdust/p2p/network_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
from bitdust.logs import lg

from bitdust.automats import automat
from bitdust.automats import global_state

from bitdust.lib import net_misc
from bitdust.lib import misc
Expand Down Expand Up @@ -146,6 +145,7 @@ def Destroy():


class NetworkConnector(automat.Automat):

"""
Class to monitor Internet connection and reconnect when needed.
"""
Expand Down Expand Up @@ -174,7 +174,6 @@ def init(self):
net_misc.SetConnectionFailedCallbackFunc(ConnectionFailedCallback)

def state_changed(self, oldstate, newstate, event, *args, **kwargs):
global_state.set_global_state('NETWORK ' + newstate)
if driver.is_on('service_p2p_hookups'):
from bitdust.p2p import p2p_connector
from bitdust.system import tray_icon
Expand Down
1 change: 0 additions & 1 deletion bitdust/services/service_backups.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ def start(self):
events.add_subscriber(self._on_my_identity_rotated, 'my-identity-rotated')
events.add_subscriber(self._on_key_erased, 'key-erased')
if listeners.is_populate_required('remote_version'):
# listeners.populate_later().remove('remote_version')
backup_matrix.populate_remote_versions()
return True

Expand Down
1 change: 0 additions & 1 deletion bitdust/services/service_keys_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def start(self):
callback.add_outbox_callback(self._on_outbox_packet_sent)
callback.append_inbox_callback(self._on_inbox_packet_received)
if listeners.is_populate_required('key'):
# listeners.populate_later().remove('key')
my_keys.populate_keys()
return True

Expand Down
26 changes: 0 additions & 26 deletions bitdust/services/service_message_history.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ class MessageHistoryService(LocalService):

def dependent_on(self):
return [
'service_my_data',
'service_private_groups',
]

Expand All @@ -60,10 +59,8 @@ def start(self):
events.add_subscriber(self.on_key_generated, 'key-generated')
events.add_subscriber(self.on_key_erased, 'key-erased')
if listeners.is_populate_required('conversation'):
# listeners.populate_later().remove('conversation')
message_database.populate_conversations()
if listeners.is_populate_required('message'):
# listeners.populate_later().remove('message')
message_database.populate_messages()
return True

Expand Down Expand Up @@ -100,26 +97,3 @@ def on_key_erased(self, evt):
if evt.data['key_id'].startswith('group_'):
conversation_id = message_database.get_conversation_id(evt.data['local_key_id'], evt.data['local_key_id'], 3)
listeners.push_snapshot('conversation', snap_id=conversation_id, deleted=True)

# def do_check_create_rename_key(self, new_key_id):
# from bitdust.logs import lg
# from bitdust.crypt import my_keys
# from bitdust.chat import message_database
# try:
# new_public_key = my_keys.get_public_key_raw(new_key_id)
# except:
# lg.exc()
# return
# try:
# new_local_key_id = my_keys.get_local_key_id(new_key_id)
# except:
# lg.exc()
# return
# if new_local_key_id is None:
# lg.err('did not found local_key_id for %r' % new_key_id)
# return
# message_database.check_create_rename_key(
# new_public_key=new_public_key,
# new_key_id=new_key_id,
# new_local_key_id=new_local_key_id,
# )
2 changes: 0 additions & 2 deletions bitdust/services/service_my_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ def start(self):
if keys_synchronizer.is_synchronized() and index_synchronizer.is_synchronized():
self.confirm_service_started(result=True)
if listeners.is_populate_required('private_file'):
# listeners.populate_later().remove('private_file')
backup_fs.populate_private_files()
else:
lg.warn('can not start service_my_data right now, keys_synchronizer.is_synchronized=%r index_synchronizer.is_synchronized=%r' % (keys_synchronizer.is_synchronized(), index_synchronizer.is_synchronized()))
Expand All @@ -90,7 +89,6 @@ def _on_my_storage_ready(self, evt):
if self.starting_deferred:
self.confirm_service_started(result=True)
if listeners.is_populate_required('private_file'):
# listeners.populate_later().remove('private_file')
backup_fs.populate_private_files()
if driver.is_enabled('service_my_data'):
if not driver.is_started('service_my_data'):
Expand Down
1 change: 0 additions & 1 deletion bitdust/services/service_p2p_hookups.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ def start(self):
events.add_subscriber(self._on_identity_url_changed, 'identity-url-changed')
events.add_subscriber(self._on_my_identity_url_changed, 'my-identity-url-changed')
if listeners.is_populate_required('online_status'):
# listeners.populate_later().remove('online_status')
online_status.populate_online_statuses()
return True

Expand Down
1 change: 0 additions & 1 deletion bitdust/storage/backup_rebuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,6 @@ def state_changed(self, oldstate, newstate, event, *args, **kwargs):

Need to notify backup_monitor() machine about my new state.
"""
# global_state.set_global_state('REBUILD ' + newstate)
data = self.to_json()
if newstate in ['STOPPED', 'DONE']:
data['rebuilding'] = False
Expand Down
8 changes: 6 additions & 2 deletions bitdust/storage/restore_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def __init__(self, BackupID, OutputFile, KeyID=None, ecc_map=None, debug_level=_
self.blockRestoredCallback = None
self.Attempts = 0

super(RestoreWorker, self).__init__(name='restore_worker_%s' % self.version, state='AT_STARTUP', debug_level=debug_level, log_events=log_events, log_transitions=log_transitions, publish_events=publish_events, **kwargs)
super(RestoreWorker, self).__init__(name='restore_%s' % self.version, state='AT_STARTUP', debug_level=debug_level, log_events=log_events, log_transitions=log_transitions, publish_events=publish_events, **kwargs)
events.send('restore-started', data=dict(backup_id=self.backup_id))

def set_packet_in_callback(self, cb):
Expand Down Expand Up @@ -377,7 +377,11 @@ def doInit(self, *args, **kwargs):
num_suppliers = settings.DefaultDesiredSuppliers()
self.EccMap = eccmap.eccmap(eccmap.GetEccMapName(num_suppliers))
lg.warn('no meta info found, guessed ECC map %r from %d known suppliers' % (self.EccMap, len(self.known_suppliers)))
self.max_errors = eccmap.GetCorrectableErrors(self.EccMap.NumSuppliers())
# TODO: here we multiply by two because we have always two packets for each fragment: Data and Parity
# so number of possible errors can be two times larger
# however we may also add another check here to identify dead suppliers as well
# and dead suppliers number must be lower than "max_errors" in order restore to continue
self.max_errors = eccmap.GetCorrectableErrors(self.EccMap.NumSuppliers())*2
if data_receiver.A():
data_receiver.A().addStateChangedCallback(self._on_data_receiver_state_changed)

Expand Down
Loading
Loading