Skip to content

Commit

Permalink
Revert
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Apr 14, 2020
1 parent 026a602 commit e07b9f8
Showing 1 changed file with 31 additions and 31 deletions.
62 changes: 31 additions & 31 deletions distributed/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,24 +673,24 @@ async def _close_comm(comm):
return tasks

def __getattr__(self, key):
return partial(self._send_recv_from_rpc, key)
async def send_recv_from_rpc(**kwargs):
if self.serializers is not None and kwargs.get("serializers") is None:
kwargs["serializers"] = self.serializers
if self.deserializers is not None and kwargs.get("deserializers") is None:
kwargs["deserializers"] = self.deserializers
try:
comm = await self.live_comm()
comm.name = "rpc." + key
result = await send_recv(comm=comm, op=key, **kwargs)
except (RPCClosed, CommClosedError) as e:
raise e.__class__(
"%s: while trying to call remote method %r" % (e, key)
)

async def _send_recv_from_rpc(self, key, **kwargs):
if self.serializers is not None and kwargs.get("serializers") is None:
kwargs["serializers"] = self.serializers
if self.deserializers is not None and kwargs.get("deserializers") is None:
kwargs["deserializers"] = self.deserializers
try:
comm = await self.live_comm()
comm.name = "rpc." + key
result = await send_recv(comm=comm, op=key, **kwargs)
except (RPCClosed, CommClosedError) as e:
raise e.__class__(
"%s: while trying to call remote method %r" % (e, key)
)
self.comms[comm] = True # mark as open
return result

self.comms[comm] = True # mark as open
return result
return send_recv_from_rpc

def close_rpc(self):
if self.status != "closed":
Expand Down Expand Up @@ -744,22 +744,22 @@ def address(self):
return self.addr

def __getattr__(self, key):
return partial(self._send_recv_from_rpc, key)

async def _send_recv_from_rpc(self, key, **kwargs):
if self.serializers is not None and kwargs.get("serializers") is None:
kwargs["serializers"] = self.serializers
if self.deserializers is not None and kwargs.get("deserializers") is None:
kwargs["deserializers"] = self.deserializers
comm = await self.pool.connect(self.addr)
name, comm.name = comm.name, "ConnectionPool." + key
try:
result = await send_recv(comm=comm, op=key, **kwargs)
finally:
self.pool.reuse(self.addr, comm)
comm.name = name
async def send_recv_from_rpc(**kwargs):
if self.serializers is not None and kwargs.get("serializers") is None:
kwargs["serializers"] = self.serializers
if self.deserializers is not None and kwargs.get("deserializers") is None:
kwargs["deserializers"] = self.deserializers
comm = await self.pool.connect(self.addr)
name, comm.name = comm.name, "ConnectionPool." + key
try:
result = await send_recv(comm=comm, op=key, **kwargs)
finally:
self.pool.reuse(self.addr, comm)
comm.name = name

return result

return result
return send_recv_from_rpc

async def close_rpc(self):
pass
Expand Down

0 comments on commit e07b9f8

Please sign in to comment.