Skip to content

Commit

Permalink
Connector changes for grolt
Browse files Browse the repository at this point in the history
  • Loading branch information
technige committed Jun 7, 2021
1 parent 3d40acc commit 19e6f4e
Showing 1 changed file with 39 additions and 25 deletions.
64 changes: 39 additions & 25 deletions py2neo/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -940,14 +940,12 @@ def __init__(self, profile=None, user_agent=None, init_size=None,
self._routing_refresh_ttl = routing_refresh_ttl
self._pools = {}
if self._profile.routing:
self._routing = Router()
self._routing_tables = {}
self._router = Router()
else:
self._routing = None
self._routing_tables = None
self.add_pools(*self._initial_routers)
self._router = None
self._add_pools(*self._initial_routers)
if self._profile.routing:
self._refresh_routing_table(None)
self.refresh_routing_table(None)

def __repr__(self):
return "<{} to {!r}>".format(self.__class__.__name__, self.profile)
Expand All @@ -959,7 +957,7 @@ def __str__(self):
def __hash__(self):
return hash(self.profile)

def add_pools(self, *profiles):
def _add_pools(self, *profiles):
""" Adds connection pools for one or more connection profiles.
Pools that already exist will be skipped.
"""
Expand All @@ -981,16 +979,16 @@ def add_pools(self, *profiles):
def invalidate_routing_table(self, graph_name):
""" Invalidate the routing table for the given graph.
"""
if self._routing is not None:
self._routing.invalidate_routing_table(graph_name)
if self._router is not None:
self._router.invalidate_routing_table(graph_name)

def _get_profiles(self, graph_name=None, readonly=False):
if self._routing is None:
if self._router is None:
# If routing isn't enabled, just return a
# simple list of pools.
return self._pools.keys(), self._pools.keys()

rt = self._routing.table(graph_name)
rt = self._router.get_routing_table(graph_name)
while True: # TODO: some limit to this, maybe with repeater?
ro_profiles, rw_profiles, expired = rt.runners()
if not expired:
Expand All @@ -1003,15 +1001,22 @@ def _get_profiles(self, graph_name=None, readonly=False):
else:
rt.wait_until_updated()
else:
self._refresh_routing_table(graph_name)
self.refresh_routing_table(graph_name)

def _refresh_routing_table(self, graph_name=None):
def refresh_routing_table(self, graph_name):
""" Refresh the routing table for a given graph database.
:param graph_name:
graph database for which to refresh the routing table
:returns:
:class:`.RoutingTable` instance for the graph database
"""
log.debug("Attempting to refresh routing table for %s", _repr_graph_name(graph_name))
assert self._routing is not None
rt = self._routing.table(graph_name)
assert self._router is not None
rt = self._router.get_routing_table(graph_name)
rt.set_updating()
try:
known_routers = self._routing.routers + self._initial_routers # TODO de-dupe
known_routers = self._router.routers + self._initial_routers # TODO de-dupe
log.debug("Known routers are: %s", ", ".join(map(repr, known_routers)))
for router in known_routers:
log.debug("Asking %r for routing table", router)
Expand All @@ -1033,20 +1038,27 @@ def _refresh_routing_table(self, graph_name=None):
continue
else:
# TODO: comment this algorithm
self.add_pools(*routers)
self.add_pools(*ro_runners)
self.add_pools(*rw_runners)
old_profiles = self._routing.update(graph_name, routers, ro_runners, rw_runners, ttl)
self._add_pools(*routers)
self._add_pools(*ro_runners)
self._add_pools(*rw_runners)
old_profiles = self._router.update(graph_name, routers, ro_runners, rw_runners, ttl)
for profile in old_profiles:
self.prune(profile)
return
return self._router.get_routing_table(graph_name)
finally:
cx.release()
else:
raise ServiceUnavailable("Cannot connect to any known routers")
finally:
rt.set_not_updating()

def get_router_profiles(self):
""" Get the last known router profiles.
"""
if self._router is None:
return None
return self._router.routers

@property
def profile(self):
""" The initial connection profile for this connector.
Expand Down Expand Up @@ -1261,7 +1273,7 @@ def prune(self, profile):
pass
else:
pool.prune()
if self._routing is not None and pool.size == 0:
if self._router is not None and pool.size == 0:
log.debug("Removing connection pool for profile %r", profile)
try:
del self._pools[profile]
Expand Down Expand Up @@ -1298,8 +1310,8 @@ def _on_broken(self, profile, message):
"""
log.debug("Connection to %r broken\n%s", profile, message)
# TODO: clean up broken connections from reader and writer entries too
if self._routing is not None:
self._routing.set_broken(profile)
if self._router is not None:
self._router.set_broken(profile)
self.prune(profile)

def auto_run(self, cypher, parameters=None, pull=-1, graph_name=None, readonly=False,
Expand Down Expand Up @@ -1524,7 +1536,9 @@ def __init__(self):
def routers(self):
return self._routers

def table(self, graph_name):
def get_routing_table(self, graph_name):
""" Return the routing table for the given graph.
"""
with self._lock:
try:
return self._routing_tables[graph_name]
Expand Down

0 comments on commit 19e6f4e

Please sign in to comment.