-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add tracking signals for getting request/response bodies. #2767
Changes from 20 commits
158cdbe
329f89a
24e1db9
1be8ecb
f19e7c0
45d6332
e1e82e5
6e3819f
89dcb0f
8288c26
9265d0b
202cb86
d687b92
d30d50b
57e3060
f944a17
6a93b16
9f8d389
4fbc080
1034104
dcd7366
7badf72
d7c995a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -168,7 +168,8 @@ def __init__(self, method, url, *, | |
proxy=None, proxy_auth=None, | ||
timer=None, session=None, auto_decompress=True, | ||
ssl=None, | ||
proxy_headers=None): | ||
proxy_headers=None, | ||
traces=None): | ||
|
||
if loop is None: | ||
loop = asyncio.get_event_loop() | ||
|
@@ -209,6 +210,9 @@ def __init__(self, method, url, *, | |
if data or self.method not in self.GET_METHODS: | ||
self.update_transfer_encoding() | ||
self.update_expect_continue(expect100) | ||
if traces is None: | ||
traces = [] | ||
self._traces = traces | ||
|
||
def is_ssl(self): | ||
return self.url.scheme in ('https', 'wss') | ||
|
@@ -475,7 +479,10 @@ async def send(self, conn): | |
if self.url.raw_query_string: | ||
path += '?' + self.url.raw_query_string | ||
|
||
writer = StreamWriter(conn.protocol, conn.transport, self.loop) | ||
writer = StreamWriter( | ||
conn.protocol, conn.transport, self.loop, | ||
on_chunk_sent=self._on_chunk_request_sent | ||
) | ||
|
||
if self.compress: | ||
writer.enable_compression(self.compress) | ||
|
@@ -513,8 +520,9 @@ async def send(self, conn): | |
self.method, self.original_url, | ||
writer=self._writer, continue100=self._continue, timer=self._timer, | ||
request_info=self.request_info, | ||
auto_decompress=self._auto_decompress) | ||
|
||
auto_decompress=self._auto_decompress, | ||
traces=self._traces, | ||
) | ||
self.response._post_init(self.loop, self._session) | ||
return self.response | ||
|
||
|
@@ -531,6 +539,10 @@ def terminate(self): | |
self._writer.cancel() | ||
self._writer = None | ||
|
||
async def _on_chunk_request_sent(self, chunk): | ||
for trace in self._traces: | ||
await trace.send_request_chunk_sent(chunk) | ||
|
||
|
||
class ClientResponse(HeadersMixin): | ||
|
||
|
@@ -555,7 +567,8 @@ class ClientResponse(HeadersMixin): | |
|
||
def __init__(self, method, url, *, | ||
writer=None, continue100=None, timer=None, | ||
request_info=None, auto_decompress=True): | ||
request_info=None, auto_decompress=True, | ||
traces=None): | ||
assert isinstance(url, URL) | ||
|
||
self.method = method | ||
|
@@ -572,6 +585,9 @@ def __init__(self, method, url, *, | |
self._timer = timer if timer is not None else TimerNoop() | ||
self._auto_decompress = auto_decompress | ||
self._cache = {} # reqired for @reify method decorator | ||
if traces is None: | ||
traces = [] | ||
self._traces = traces | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why in ClientRequest traces is public attribute while in ClientResponse it's private? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I can answer: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, fine for me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. I switched now to |
||
|
||
@property | ||
def url(self): | ||
|
@@ -796,6 +812,8 @@ async def read(self): | |
if self._content is None: | ||
try: | ||
self._content = await self.content.read() | ||
for trace in self._traces: | ||
await trace.send_response_chunk_received(self._content) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has to be tested with specific tests/changes under the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
except BaseException: | ||
self.close() | ||
raise | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,7 @@ | |
|
||
class StreamWriter(AbstractStreamWriter): | ||
|
||
def __init__(self, protocol, transport, loop): | ||
def __init__(self, protocol, transport, loop, on_chunk_sent=None): | ||
self._protocol = protocol | ||
self._transport = transport | ||
|
||
|
@@ -30,6 +30,8 @@ def __init__(self, protocol, transport, loop): | |
self._compress = None | ||
self._drain_waiter = None | ||
|
||
self._on_chunk_sent = on_chunk_sent | ||
|
||
@property | ||
def transport(self): | ||
return self._transport | ||
|
@@ -55,13 +57,16 @@ def _write(self, chunk): | |
raise asyncio.CancelledError('Cannot write to closing transport') | ||
self._transport.write(chunk) | ||
|
||
async def write(self, chunk, *, drain=True, LIMIT=64*1024): | ||
async def write(self, chunk, *, drain=True, LIMIT=0x10000): | ||
"""Writes chunk of data to a stream. | ||
|
||
write_eof() indicates end of stream. | ||
writer can't be used after write_eof() method being called. | ||
write() return drain future. | ||
""" | ||
if self._on_chunk_sent is not None: | ||
await self._on_chunk_sent(chunk) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This has to be covered with specific tests/changes under There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
if self._compress is not None: | ||
chunk = self._compress.compress(chunk) | ||
if not chunk: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,16 +34,26 @@ Overview | |
exception[shape=flowchart.terminator, description="on_request_exception"]; | ||
|
||
acquire_connection[description="Connection acquiring"]; | ||
got_response; | ||
send_request; | ||
headers_received; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The diagram is cryptic a little but I can live with it. |
||
headers_sent; | ||
chunk_sent[description="on_request_chunk_sent"]; | ||
chunk_received[description="on_response_chunk_received"]; | ||
|
||
start -> acquire_connection; | ||
acquire_connection -> send_request; | ||
send_request -> got_response; | ||
got_response -> redirect; | ||
got_response -> end; | ||
redirect -> send_request; | ||
send_request -> exception; | ||
acquire_connection -> headers_sent; | ||
headers_sent -> headers_received; | ||
headers_sent -> chunk_sent; | ||
chunk_sent -> chunk_sent; | ||
chunk_sent -> headers_received; | ||
headers_received -> chunk_received; | ||
chunk_received -> chunk_received; | ||
chunk_received -> end; | ||
headers_received -> redirect; | ||
headers_received -> end; | ||
redirect -> headers_sent; | ||
chunk_received -> exception; | ||
chunk_sent -> exception; | ||
headers_sent -> exception; | ||
|
||
} | ||
|
||
|
@@ -147,6 +157,26 @@ TraceConfig | |
|
||
``params`` is :class:`aiohttp.TraceRequestStartParams` instance. | ||
|
||
.. attribute:: on_request_chunk_sent | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please also upgrade flow diagrams at the beginning of file. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
|
||
Property that gives access to the signals that will be executed | ||
when a chunk of request body is sent. | ||
|
||
``params`` is :class:`aiohttp.TraceRequestChunkSentParams` instance. | ||
|
||
.. versionadded:: 3.1 | ||
|
||
.. attribute:: on_response_chunk_received | ||
|
||
|
||
Property that gives access to the signals that will be executed | ||
when a chunk of response body is received. | ||
|
||
``params`` is :class:`aiohttp.TraceResponseChunkReceivedParams` instance. | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
.. versionadded:: 3.1 | ||
|
||
.. attribute:: on_request_redirect | ||
|
||
Property that gives access to the signals that will be executed when a | ||
|
@@ -259,6 +289,35 @@ TraceRequestStartParams | |
|
||
Headers that will be used for the request, can be mutated. | ||
|
||
|
||
TraceRequestChunkSentParams | ||
--------------------------- | ||
|
||
.. class:: TraceRequestChunkSentParams | ||
|
||
.. versionadded:: 3.1 | ||
|
||
See :attr:`TraceConfig.on_request_chunk_sent` for details. | ||
|
||
.. attribute:: chunk | ||
|
||
Bytes of chunk sent | ||
|
||
|
||
TraceResponseChunkSentParams | ||
---------------------------- | ||
|
||
.. class:: TraceResponseChunkSentParams | ||
|
||
.. versionadded:: 3.1 | ||
|
||
See :attr:`TraceConfig.on_response_chunk_received` for details. | ||
|
||
.. attribute:: chunk | ||
|
||
Bytes of chunk received | ||
|
||
|
||
TraceRequestEndParams | ||
--------------------- | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For consistency reasons why we can't give the
traces
as kwarg?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamWriter
is used in (at least) two scenarios:In the later case, having
traces
an callingon_request_chunk_sent
seems wrong, because the class is used for writting something completely different. Soon_chunk_sent
is introduced to have something generic.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oks good point!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1