-
Notifications
You must be signed in to change notification settings - Fork 685
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Christoph/feat/plugins own process alternative #1280
Christoph/feat/plugins own process alternative #1280
Conversation
|
||
val, error = await self._pass_or_raise( | ||
self.chain.get_block_body_by_hash(event.block_hash) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even though mypy
knows that val
is a BlockBody
, there is still some room for human errors in here. This is unrelated to this PR and boils down to the fact that e.g. BlockBody() + BlockHeader()
is perfectly fine from mypy
's perspective. I guess that is because both can be serialized to bytes
?
So, that means there is room for errors where you e.g. subscribe to GetBlockBodyByHashRequests
but then feed a BlockHeader
into that response.
@pipermerriam 346fcd1 is implemting a type heavy version in contrast to the inferred version used by #1212 . I kinda like it more than what I thought I would but I still think #1212 has the CBC version to say it in Vlad speech. As in, with this version, there is room to screw up, whereas with #1212 it will just work™. Thoughts? |
@@ -64,11 +95,16 @@ def p2p_server(monkeypatch, jsonrpc_ipc_pipe_path): | |||
return Server(None, None, None, None, None, None, None) | |||
|
|||
|
|||
@pytest.fixture | |||
def chain_with_block_validation(base_db, genesis_state): | |||
return _chain_with_block_validation(base_db, genesis_state, TestAsyncChain) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe use the evm.tools.builder
utils for this (to reduce shared reliance on the chain_with_block_validation
fixture). I'm of the opinion that the broader we use that fixture the more likely we are to end up in a situation where we have a tangled mess of dependencies that all need slightly different things from it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be clear on the problem I tried to solve here: Certain tests in tests/trinity
are already depending on the chain_with_block_validation
fixture that is in tests/conftest
. That however is tied to a chain that doesn't have the async
functions and I can't import from Trinity there.
So, I created another chain_with_block_validation
fixture in trinity/conftest
to break the lookup one level earlier and pass the TestAsyncChain
.
Oh, btw. Just in case you are checking this out locally. I forgot to cut a new Lahja release. I developed this against a local version. Will update in a minute. |
346fcd1
to
5af6e29
Compare
def __init__(self, chain: BaseLightPeerChain, event_bus: Endpoint) -> None: | ||
self.chain = chain | ||
self.event_bus = event_bus | ||
asyncio.ensure_future(self.answer_get_blockheader_by_hash_requests()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Noting that if one of these background processes crashes we should probably crash the whole trinity process which currently wouldn't be the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, that whole thing should probably become a BaseService
and needs to be wired up with the CancelToken
stuff.
asyncio.ensure_future(self.answer_get_account_requests()) | ||
asyncio.ensure_future(self.answer_get_contract_code_requests()) | ||
|
||
async def answer_get_blockheader_by_hash_requests(self) -> None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
naming nitpick. Thoughts on the prefix handle_
? I think it fits the conventions used elsewhere in this codebase a bit more closely.
|
||
TReturn = TypeVar("TReturn") | ||
|
||
async def _pass_or_raise(self, awaitable: Awaitable[TReturn]) -> Tuple[TReturn, Exception]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be moved off of this class into a stand-alone utility. That allows us to write a basic test for it as well as reducing some of the cognitive overhead when reading the methods.
|
||
TReturn = TypeVar("TReturn") | ||
|
||
async def _pass_or_raise(self, awaitable: Awaitable[TReturn]) -> Tuple[TReturn, Exception]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't the return types here be Tuple[Optional[TReturn], Optional[Exception]]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have strict optional enabled in this repo yet but as a preparation for that I agree we should start using Optional[T]
self.event_bus.broadcast( | ||
BytesResponse(val, error), | ||
event.broadcast_config() | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these methods could be cleanup up further with one extra utility.
def _handle_request(event_bus, event, response_type, awaitable):
try:
val = await awaitable
except Exception as err:
response = response_type(None, err)
else:
response = response_type(val, None)
event_bus.broadcast(response, event.broadcast_config())
This turns all of the methods into:
async def answer_get_contract_code_request(self):
async for event in self.event_bus.stream(GetContractCodeRequest):
await _handle_request(
self.event_bus,
event,
response_type,
self.chain.get_contract_code(event.block_hash, event.address),
)
The above could easily be cleaned up further since I think response_type
can be inferred from the event
and the self.event_bus
could be hidden away either by currying or by writing this as a method on the class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried that but I couldn't get it to work without giving up type safety. I think it boils down to the fact that:
a.) type inference still needs to improve
b.) specifying bounds is still rather limited (e.g. can't express typeVar("TResponse", bound=BaseLightPeerChainResponse[TWhatever])
)
However, instead of saying:
self.event_bus.broadcast(
BytesResponse(val, error),
event.broadcast_config()
)
We can say
self.event_bus.broadcast(
event.expected_response_type()(val, error),
event.broadcast_config()
)
So, at least we get rid of manually instantiating a response type.
|
||
def _pass_or_raise(self, response: TResponse) -> TResponse: | ||
if response.error is not None: | ||
raise Exception( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using the raw exception class here feels wrong. Can we do something custom so that we can be more specific on the other side of this when catching exceptions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed. I think we should have something like a RemoteException
defined within lahja
and derived from LahjaException
because wrapping up exceptions seems a scenario pretty common when working with the lahja so it makes sense to me to define it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah wait, actually this is already the other side and I was wondering why I couldn't just raise response.error
which should be the exact exception that was wrapped up on the other end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh actually, if we just raise response.error
we get exactly the same user experience as on current master. E.g if we aren't connected to any peers, we are left with.
Within trinity --atach
:
<ipython-input-44-a60af3c8cfea> in <module>()
----> 1 w3.eth.getBlock('latest')
~/Documents/hacking/ef/py-evm/venvx/lib/python3.6/site-packages/web3/eth.py in getBlock(self, block_identifier, full_transactions)
139 return self.web3.manager.request_blocking(
140 method,
--> 141 [block_identifier, full_transactions],
142 )
143
~/Documents/hacking/ef/py-evm/venvx/lib/python3.6/site-packages/web3/manager.py in request_blocking(self, method, params)
108
109 if "error" in response:
--> 110 raise ValueError(response["error"])
111
112 return response['result']
ValueError:
And in the main log output:
Traceback (most recent call last):
File "/home/cburgdorf/Documents/hacking/ef/py-evm/trinity/rpc/main.py", line 119, in _get_result
result = await method(*params)
File "/home/cburgdorf/Documents/hacking/ef/py-evm/trinity/rpc/format.py", line 135, in async_formatted_func
return await func(self, *formatted)
File "/home/cburgdorf/Documents/hacking/ef/py-evm/trinity/rpc/modules/eth.py", line 182, in getBlockByNumber
block = await get_block_at_number(self._chain, at_block)
File "/home/cburgdorf/Documents/hacking/ef/py-evm/trinity/rpc/modules/eth.py", line 94, in get_block_at_number
return await chain.coro_get_block_by_header(at_header)
File "/home/cburgdorf/Documents/hacking/ef/py-evm/trinity/chains/light.py", line 150, in coro_get_block_by_header
block_body = await self._peer_chain.coro_get_block_body_by_hash(header.hash)
File "/home/cburgdorf/Documents/hacking/ef/py-evm/trinity/plugins/builtin/light_peer_chain_bridge/light_peer_chain_bridge.py", line 222, in coro_get_block_body_by_hash
return self._pass_or_raise(await self.event_bus.request(event)).block_body
File "/home/cburgdorf/Documents/hacking/ef/py-evm/trinity/plugins/builtin/light_peer_chain_bridge/light_peer_chain_bridge.py", line 240, in _pass_or_raise
raise response.error
p2p.exceptions.NoConnectedPeers
Not saying this is how it should be but at least it doesn't change anything from where we are today and I can scratch this from the list for now.
def start(self) -> None: | ||
self.logger.info('LightPeerChain Bridge started') | ||
chain = cast(LightDispatchChain, self.chain) | ||
LightPeerChainEventBusResponder(chain._peer_chain, self.context.event_bus) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really odd to me. When this starts we completely lose any handle to the thing/things that are running which feels wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Haha, totally! It's a one-off fire-and-forgot-no-strings-attached thingy. Yes, tbh, plugins that run in the networking process are still messy in terms of cancellation / takedown (compared to isolated plugins where we just kill the process and block on that)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's more systemic no need to solve here if you think it'll be easier to iterate on in a subsequent PR (but if you can open an issue to track that would be good)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer to open an issue: #1284.
Otherwise, I might risk spending the rest of my life on this PR and just cherry-pick things from time to time into master 😂
async for event in self.event_bus.stream(GetContractCodeRequest): | ||
|
||
val, error = await self._pass_or_raise( | ||
self.chain.get_contract_code(event.block_hash, event.address) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was surprised to see that chain.get_contract_code
was a coroutine since we've got a convention in place elsewhere that the method name would be chain.coro_get_contract_code
. Can we update these?
I like this more. It feels like there is less magic and I think it's easier to move from this pattern to something like #1212 than it is to move the other direction. |
Yep, I figured that 🙉 🙈 Let's go with that then. Will iterate on the edgy parts. |
90e39d7
to
60a4600
Compare
af4b874
to
5924042
Compare
I tried to address the latest review, updated the individual commits and will now close this PR to continue things back in #1212. |
What was wrong?
This is an alternative to #1212. Let's ignore that this needs another rebase and the fact that the commit history isn't clean (if we decide to go down that path) and focus on the last commit.
I'll put in some inline comments.