-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add tracking signals for getting request/response bodies. #2767
Changes from 4 commits
158cdbe
329f89a
24e1db9
1be8ecb
f19e7c0
45d6332
e1e82e5
6e3819f
89dcb0f
8288c26
9265d0b
202cb86
d687b92
d30d50b
57e3060
f944a17
6a93b16
9f8d389
4fbc080
1034104
dcd7366
7badf72
d7c995a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -168,7 +168,8 @@ def __init__(self, method, url, *, | |
proxy=None, proxy_auth=None, | ||
timer=None, session=None, auto_decompress=True, | ||
ssl=None, | ||
proxy_headers=None): | ||
proxy_headers=None, | ||
traces=[]): | ||
|
||
if loop is None: | ||
loop = asyncio.get_event_loop() | ||
|
@@ -209,6 +210,7 @@ def __init__(self, method, url, *, | |
if data or self.method not in self.GET_METHODS: | ||
self.update_transfer_encoding() | ||
self.update_expect_continue(expect100) | ||
self.traces = traces | ||
|
||
def is_ssl(self): | ||
return self.url.scheme in ('https', 'wss') | ||
|
@@ -475,7 +477,14 @@ async def send(self, conn): | |
if self.url.raw_query_string: | ||
path += '?' + self.url.raw_query_string | ||
|
||
writer = StreamWriter(conn.protocol, conn.transport, self.loop) | ||
async def on_chunk_sent(chunk): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Convert the function into ClientRequest's method. No need to create a nested function on every HTTP request. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
for trace in self.traces: | ||
await trace.send_request_chunk_sent(chunk) | ||
|
||
writer = StreamWriter( | ||
conn.protocol, conn.transport, self.loop, | ||
on_chunk_sent=on_chunk_sent | ||
) | ||
|
||
if self.compress: | ||
writer.enable_compression(self.compress) | ||
|
@@ -513,8 +522,9 @@ async def send(self, conn): | |
self.method, self.original_url, | ||
writer=self._writer, continue100=self._continue, timer=self._timer, | ||
request_info=self.request_info, | ||
auto_decompress=self._auto_decompress) | ||
|
||
auto_decompress=self._auto_decompress, | ||
traces=self.traces, | ||
) | ||
self.response._post_init(self.loop, self._session) | ||
return self.response | ||
|
||
|
@@ -555,7 +565,8 @@ class ClientResponse(HeadersMixin): | |
|
||
def __init__(self, method, url, *, | ||
writer=None, continue100=None, timer=None, | ||
request_info=None, auto_decompress=True): | ||
request_info=None, auto_decompress=True, | ||
traces=[]): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same defaults story. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also changed to |
||
assert isinstance(url, URL) | ||
|
||
self.method = method | ||
|
@@ -572,6 +583,7 @@ def __init__(self, method, url, *, | |
self._timer = timer if timer is not None else TimerNoop() | ||
self._auto_decompress = auto_decompress | ||
self._cache = {} # reqired for @reify method decorator | ||
self._traces = traces | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why in ClientRequest traces is public attribute while in ClientResponse it's private? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can answer: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, fine for me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. I switched now to |
||
|
||
@property | ||
def url(self): | ||
|
@@ -796,6 +808,8 @@ async def read(self): | |
if self._content is None: | ||
try: | ||
self._content = await self.content.read() | ||
for trace in self._traces: | ||
await trace.send_response_chunk_received(self._content) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has to be tested with specific tests/changes under the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
except BaseException: | ||
self.close() | ||
raise | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
|
||
import asyncio | ||
import collections | ||
import inspect | ||
import zlib | ||
|
||
from .abc import AbstractStreamWriter | ||
|
@@ -16,7 +17,12 @@ | |
|
||
class StreamWriter(AbstractStreamWriter): | ||
|
||
def __init__(self, protocol, transport, loop): | ||
def __init__(self, protocol, transport, loop, on_chunk_sent=None): | ||
assert ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The assert is in the hot-pass route. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
on_chunk_sent is None or | ||
inspect.iscoroutinefunction(on_chunk_sent) | ||
) | ||
|
||
self._protocol = protocol | ||
self._transport = transport | ||
|
||
|
@@ -30,6 +36,8 @@ def __init__(self, protocol, transport, loop): | |
self._compress = None | ||
self._drain_waiter = None | ||
|
||
self._on_chunk_sent = on_chunk_sent | ||
|
||
@property | ||
def transport(self): | ||
return self._transport | ||
|
@@ -62,6 +70,9 @@ async def write(self, chunk, *, drain=True, LIMIT=64*1024): | |
writer can't be used after write_eof() method being called. | ||
write() return drain future. | ||
""" | ||
if self._on_chunk_sent: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please do check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed |
||
await self._on_chunk_sent(chunk) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has to be covered with specific tests/changes under There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
if self._compress is not None: | ||
chunk = self._compress.compress(chunk) | ||
if not chunk: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,13 +9,14 @@ | |
|
||
|
||
__all__ = ( | ||
'TraceConfig', 'TraceRequestStartParams', 'TraceRequestEndParams', | ||
'TraceRequestExceptionParams', 'TraceConnectionQueuedStartParams', | ||
'TraceConnectionQueuedEndParams', 'TraceConnectionCreateStartParams', | ||
'TraceConnectionCreateEndParams', 'TraceConnectionReuseconnParams', | ||
'TraceDnsResolveHostStartParams', 'TraceDnsResolveHostEndParams', | ||
'TraceDnsCacheHitParams', 'TraceDnsCacheMissParams', | ||
'TraceRequestRedirectParams' | ||
'TraceConfig', 'TraceRequestStartParams', | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you really need to touch all these lines? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, reverted and added new classes at the end |
||
'TraceRequestChunkSentParams', 'TraceResponseChunkReceivedParams', | ||
'TraceRequestEndParams', 'TraceRequestExceptionParams', | ||
'TraceConnectionQueuedStartParams', 'TraceConnectionQueuedEndParams', | ||
'TraceConnectionCreateStartParams', 'TraceConnectionCreateEndParams', | ||
'TraceConnectionReuseconnParams', 'TraceDnsResolveHostStartParams', | ||
'TraceDnsResolveHostEndParams', 'TraceDnsCacheHitParams', | ||
'TraceDnsCacheMissParams', 'TraceRequestRedirectParams' | ||
) | ||
|
||
|
||
|
@@ -25,6 +26,8 @@ class TraceConfig: | |
|
||
def __init__(self, trace_config_ctx_factory=SimpleNamespace): | ||
self._on_request_start = Signal(self) | ||
self._on_request_chunk_sent = Signal(self) | ||
self._on_response_chunk_received = Signal(self) | ||
self._on_request_end = Signal(self) | ||
self._on_request_exception = Signal(self) | ||
self._on_request_redirect = Signal(self) | ||
|
@@ -47,6 +50,8 @@ def trace_config_ctx(self, trace_request_ctx=None): | |
|
||
def freeze(self): | ||
self._on_request_start.freeze() | ||
self._on_request_chunk_sent.freeze() | ||
self._on_response_chunk_received.freeze() | ||
self._on_request_end.freeze() | ||
self._on_request_exception.freeze() | ||
self._on_request_redirect.freeze() | ||
|
@@ -64,6 +69,14 @@ def freeze(self): | |
def on_request_start(self): | ||
return self._on_request_start | ||
|
||
@property | ||
def on_request_chunk_sent(self): | ||
return self._on_request_chunk_sent | ||
|
||
@property | ||
def on_response_chunk_received(self): | ||
return self._on_response_chunk_received | ||
|
||
@property | ||
def on_request_end(self): | ||
return self._on_request_end | ||
|
@@ -121,6 +134,18 @@ class TraceRequestStartParams: | |
headers = attr.ib(type=CIMultiDict) | ||
|
||
|
||
@attr.s(frozen=True, slots=True) | ||
class TraceRequestChunkSentParams: | ||
""" Parameters sent by the `on_request_chunk_sent` signal""" | ||
chunk = attr.ib(type=bytes) | ||
|
||
|
||
@attr.s(frozen=True, slots=True) | ||
class TraceResponseChunkReceivedParams: | ||
""" Parameters sent by the `on_response_chunk_received` signal""" | ||
chunk = attr.ib(type=bytes) | ||
|
||
|
||
@attr.s(frozen=True, slots=True) | ||
class TraceRequestEndParams: | ||
""" Parameters sent by the `on_request_end` signal""" | ||
|
@@ -213,6 +238,20 @@ async def send_request_start(self, method, url, headers): | |
TraceRequestStartParams(method, url, headers) | ||
) | ||
|
||
async def send_request_chunk_sent(self, chunk): | ||
return await self._trace_config.on_request_chunk_sent.send( | ||
self._session, | ||
self._trace_config_ctx, | ||
TraceRequestChunkSentParams(chunk) | ||
) | ||
|
||
async def send_response_chunk_received(self, chunk): | ||
return await self._trace_config.on_response_chunk_received.send( | ||
self._session, | ||
self._trace_config_ctx, | ||
TraceResponseChunkReceivedParams(chunk) | ||
) | ||
|
||
async def send_request_end(self, method, url, headers, response): | ||
return await self._trace_config.on_request_end.send( | ||
self._session, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -147,6 +147,20 @@ TraceConfig | |
|
||
``params`` is :class:`aiohttp.TraceRequestStartParams` instance. | ||
|
||
.. attribute:: on_request_chunk_sent | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please also upgrade flow diagrams at the beginning of file. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
Property that gives access to the signals that will be executed | ||
when a chunk of request body is sent. | ||
|
||
``params`` is :class:`aiohttp.TraceRequestChunkSentParams` instance. | ||
|
||
.. attribute:: on_response_chunk_received | ||
|
||
Property that gives access to the signals that will be executed | ||
when a chunk of response body is received. | ||
|
||
``params`` is :class:`aiohttp.TraceResponseChunkReceivedParams` instance. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
.. attribute:: on_request_redirect | ||
|
||
Property that gives access to the signals that will be executed when a | ||
|
@@ -259,6 +273,31 @@ TraceRequestStartParams | |
|
||
Headers that will be used for the request, can be mutated. | ||
|
||
|
||
TraceRequestChunkSentParams | ||
--------------------------- | ||
|
||
.. class:: TraceRequestChunkSentParams | ||
|
||
See :attr:`TraceConfig.on_request_chunk_sent` for details. | ||
|
||
.. attribute:: chunk | ||
|
||
Bytes of chunk sent | ||
|
||
|
||
TraceResponseChunkSentParams | ||
--------------------------- | ||
|
||
.. class:: TraceResponseChunkSentParams | ||
|
||
See :attr:`TraceConfig.on_response_chunk_received` for details. | ||
|
||
.. attribute:: chunk | ||
|
||
Bytes of chunk received | ||
|
||
|
||
TraceRequestEndParams | ||
--------------------- | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
[pep8] | ||
max-line-length=79 | ||
ignore=E225,E226 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm in doubt that we should ignore these. Any reasons why to disable them globally? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, I'm not in place to judge this. I did it to be able to save a file which has There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That autopep suggestion looks correct. You may also write There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm with you. @asvetlov specifically asked me to keep it as was. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For me the strange thing is successful passing flake8 checks without the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please drop the change. Feel free to replace the limit with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
[easy_install] | ||
zip_ok = false | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not use mutables as defaults. These may cause some nasty bugs we could easily avoid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true, changed to
None