Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tracking signals for getting request/response bodies. #2767

Merged
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
158cdbe
Add tracking signals for getting request/response bodies.
kowalski Feb 26, 2018
329f89a
Revert automatic pep8 fix.
kowalski Feb 27, 2018
24e1db9
Remove internal usage of Signal in favor of simple callbacks.
kowalski Feb 27, 2018
1be8ecb
Document new signals
kowalski Feb 27, 2018
f19e7c0
Move callback to a private method.
kowalski Feb 27, 2018
45d6332
Make check more idiomatic
kowalski Feb 27, 2018
e1e82e5
Reorder classes in __all__
kowalski Feb 27, 2018
6e3819f
Update request lifecycle diagram to include new signals
kowalski Feb 27, 2018
89dcb0f
Don't use mutable defaults for traces. Make it private in ClientRequest
kowalski Feb 27, 2018
8288c26
Further updates to tracing documentation
kowalski Feb 27, 2018
9265d0b
Merge branch 'master' into feature/add-signals-for-reqres-chunks
asvetlov Feb 27, 2018
202cb86
Polish docs
kowalski Feb 28, 2018
d687b92
Merge branch 'feature/add-signals-for-reqres-chunks' of github.com:ko…
kowalski Feb 28, 2018
d30d50b
Merge branch 'master' into feature/add-signals-for-reqres-chunks
asvetlov Feb 28, 2018
57e3060
Revert ignoring pep8 rules
kowalski Mar 1, 2018
f944a17
Subtle optimisation - don't create list instance if not needed
kowalski Mar 1, 2018
6a93b16
Remove assert statement
kowalski Mar 1, 2018
9f8d389
Add test case ensuring StreamWriter calls callback
kowalski Mar 1, 2018
4fbc080
Add test checking that response.read() trigger trace callback
kowalski Mar 1, 2018
1034104
Merge branch 'feature/add-signals-for-reqres-chunks' of github.com:ko…
kowalski Mar 1, 2018
dcd7366
Merge branch 'master' into feature/add-signals-for-reqres-chunks
asvetlov Mar 1, 2018
7badf72
Add CHANGES record
kowalski Mar 1, 2018
d7c995a
Merge branch 'feature/add-signals-for-reqres-chunks' of github.com:ko…
kowalski Mar 1, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ async def _request(self, method, url, *,
response_class=self._response_class,
proxy=proxy, proxy_auth=proxy_auth, timer=timer,
session=self, auto_decompress=self._auto_decompress,
ssl=ssl, proxy_headers=proxy_headers)
ssl=ssl, proxy_headers=proxy_headers, traces=traces)

# connection timeout
try:
Expand Down
24 changes: 19 additions & 5 deletions aiohttp/client_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ def __init__(self, method, url, *,
proxy=None, proxy_auth=None,
timer=None, session=None, auto_decompress=True,
ssl=None,
proxy_headers=None):
proxy_headers=None,
traces=None):

if loop is None:
loop = asyncio.get_event_loop()
Expand Down Expand Up @@ -209,6 +210,7 @@ def __init__(self, method, url, *,
if data or self.method not in self.GET_METHODS:
self.update_transfer_encoding()
self.update_expect_continue(expect100)
self._traces = traces or []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other aiohttp code we usually use another pattern:

if traces is None:
    traces = []
self._traces = traces

Please change the one-liner to explicit check.
It is a very subtle optimization for if traces is an empty list already you code calculates False for bool(traces) check and creates an empty list again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def is_ssl(self):
return self.url.scheme in ('https', 'wss')
Expand Down Expand Up @@ -475,7 +477,10 @@ async def send(self, conn):
if self.url.raw_query_string:
path += '?' + self.url.raw_query_string

writer = StreamWriter(conn.protocol, conn.transport, self.loop)
writer = StreamWriter(
conn.protocol, conn.transport, self.loop,
on_chunk_sent=self._on_chunk_request_sent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency reasons why we can't give the traces as kwarg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamWriter is used in (at least) two scenarios:

  • here, for writing request body
  • in RequestHandler for writing response to incoming request

In the later case, having traces an calling on_request_chunk_sent seems wrong, because the class is used for writting something completely different. So on_chunk_sent is introduced to have something generic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oks good point!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

)

if self.compress:
writer.enable_compression(self.compress)
Expand Down Expand Up @@ -513,8 +518,9 @@ async def send(self, conn):
self.method, self.original_url,
writer=self._writer, continue100=self._continue, timer=self._timer,
request_info=self.request_info,
auto_decompress=self._auto_decompress)

auto_decompress=self._auto_decompress,
traces=self._traces,
)
self.response._post_init(self.loop, self._session)
return self.response

Expand All @@ -531,6 +537,10 @@ def terminate(self):
self._writer.cancel()
self._writer = None

async def _on_chunk_request_sent(self, chunk):
for trace in self._traces:
await trace.send_request_chunk_sent(chunk)


class ClientResponse(HeadersMixin):

Expand All @@ -555,7 +565,8 @@ class ClientResponse(HeadersMixin):

def __init__(self, method, url, *,
writer=None, continue100=None, timer=None,
request_info=None, auto_decompress=True):
request_info=None, auto_decompress=True,
traces=None):
assert isinstance(url, URL)

self.method = method
Expand All @@ -572,6 +583,7 @@ def __init__(self, method, url, *,
self._timer = timer if timer is not None else TimerNoop()
self._auto_decompress = auto_decompress
self._cache = {} # reqired for @reify method decorator
self._traces = traces or []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same: replace or with explicit check for is None please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@property
def url(self):
Expand Down Expand Up @@ -796,6 +808,8 @@ async def read(self):
if self._content is None:
try:
self._content = await self.content.read()
for trace in self._traces:
await trace.send_response_chunk_received(self._content)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to be tested with specific tests/changes under the test_client_response.py file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

except BaseException:
self.close()
raise
Expand Down
13 changes: 12 additions & 1 deletion aiohttp/http_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
import collections
import inspect
import zlib

from .abc import AbstractStreamWriter
Expand All @@ -16,7 +17,12 @@

class StreamWriter(AbstractStreamWriter):

def __init__(self, protocol, transport, loop):
def __init__(self, protocol, transport, loop, on_chunk_sent=None):
assert (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assert is in the hot-pass route.
I understand your striving for type checking but all StreamWriter usage is under the library control, we always should pass the proper values for on_chunk_sent parameter.
Please remove the assert statement.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

on_chunk_sent is None or
inspect.iscoroutinefunction(on_chunk_sent)
)

self._protocol = protocol
self._transport = transport

Expand All @@ -30,6 +36,8 @@ def __init__(self, protocol, transport, loop):
self._compress = None
self._drain_waiter = None

self._on_chunk_sent = on_chunk_sent

@property
def transport(self):
return self._transport
Expand Down Expand Up @@ -62,6 +70,9 @@ async def write(self, chunk, *, drain=True, LIMIT=64*1024):
writer can't be used after write_eof() method being called.
write() return drain future.
"""
if self._on_chunk_sent is not None:
await self._on_chunk_sent(chunk)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has to be covered with specific tests/changes under test_http_writer.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if self._compress is not None:
chunk = self._compress.compress(chunk)
if not chunk:
Expand Down
41 changes: 40 additions & 1 deletion aiohttp/tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
'TraceConnectionCreateEndParams', 'TraceConnectionReuseconnParams',
'TraceDnsResolveHostStartParams', 'TraceDnsResolveHostEndParams',
'TraceDnsCacheHitParams', 'TraceDnsCacheMissParams',
'TraceRequestRedirectParams'
'TraceRequestRedirectParams',
'TraceRequestChunkSentParams', 'TraceResponseChunkReceivedParams',
)


Expand All @@ -25,6 +26,8 @@ class TraceConfig:

def __init__(self, trace_config_ctx_factory=SimpleNamespace):
self._on_request_start = Signal(self)
self._on_request_chunk_sent = Signal(self)
self._on_response_chunk_received = Signal(self)
self._on_request_end = Signal(self)
self._on_request_exception = Signal(self)
self._on_request_redirect = Signal(self)
Expand All @@ -47,6 +50,8 @@ def trace_config_ctx(self, trace_request_ctx=None):

def freeze(self):
self._on_request_start.freeze()
self._on_request_chunk_sent.freeze()
self._on_response_chunk_received.freeze()
self._on_request_end.freeze()
self._on_request_exception.freeze()
self._on_request_redirect.freeze()
Expand All @@ -64,6 +69,14 @@ def freeze(self):
def on_request_start(self):
return self._on_request_start

@property
def on_request_chunk_sent(self):
return self._on_request_chunk_sent

@property
def on_response_chunk_received(self):
return self._on_response_chunk_received

@property
def on_request_end(self):
return self._on_request_end
Expand Down Expand Up @@ -121,6 +134,18 @@ class TraceRequestStartParams:
headers = attr.ib(type=CIMultiDict)


@attr.s(frozen=True, slots=True)
class TraceRequestChunkSentParams:
""" Parameters sent by the `on_request_chunk_sent` signal"""
chunk = attr.ib(type=bytes)


@attr.s(frozen=True, slots=True)
class TraceResponseChunkReceivedParams:
""" Parameters sent by the `on_response_chunk_received` signal"""
chunk = attr.ib(type=bytes)


@attr.s(frozen=True, slots=True)
class TraceRequestEndParams:
""" Parameters sent by the `on_request_end` signal"""
Expand Down Expand Up @@ -213,6 +238,20 @@ async def send_request_start(self, method, url, headers):
TraceRequestStartParams(method, url, headers)
)

async def send_request_chunk_sent(self, chunk):
return await self._trace_config.on_request_chunk_sent.send(
self._session,
self._trace_config_ctx,
TraceRequestChunkSentParams(chunk)
)

async def send_response_chunk_received(self, chunk):
return await self._trace_config.on_response_chunk_received.send(
self._session,
self._trace_config_ctx,
TraceResponseChunkReceivedParams(chunk)
)

async def send_request_end(self, method, url, headers, response):
return await self._trace_config.on_request_end.send(
self._session,
Expand Down
43 changes: 41 additions & 2 deletions docs/tracing_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ Overview
exception[shape=flowchart.terminator, description="on_request_exception"];

acquire_connection[description="Connection acquiring"];
got_response;
send_request;
got_response[description="on_response_chunk_received"];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rename got_response and send_request nodes to be close to actual signal names. Maybe that long names could be shortened on diagram -- be creative.

Also I like to see arrows what explicitly points that chunk_sent and chunk_received events can come more than once.

send_request[description="on_request_chunk_sent"];

start -> acquire_connection;
acquire_connection -> send_request;
Expand Down Expand Up @@ -147,6 +147,20 @@ TraceConfig

``params`` is :class:`aiohttp.TraceRequestStartParams` instance.

.. attribute:: on_request_chunk_sent
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also upgrade flow diagrams at the beginning of file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done. Here I added new signals to description. Please let me know if you meant something more elaborate.
zrzut ekranu z 2018-02-27 15-04-09


Property that gives access to the signals that will be executed
when a chunk of request body is sent.

``params`` is :class:`aiohttp.TraceRequestChunkSentParams` instance.

.. attribute:: on_response_chunk_received

Property that gives access to the signals that will be executed
when a chunk of response body is received.

``params`` is :class:`aiohttp.TraceResponseChunkReceivedParams` instance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add versionadded here as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

.. attribute:: on_request_redirect

Property that gives access to the signals that will be executed when a
Expand Down Expand Up @@ -259,6 +273,31 @@ TraceRequestStartParams

Headers that will be used for the request, can be mutated.


TraceRequestChunkSentParams
---------------------------

.. class:: TraceRequestChunkSentParams

See :attr:`TraceConfig.on_request_chunk_sent` for details.

.. attribute:: chunk

Bytes of chunk sent


TraceResponseChunkSentParams
----------------------------

.. class:: TraceResponseChunkSentParams

See :attr:`TraceConfig.on_response_chunk_received` for details.

.. attribute:: chunk

Bytes of chunk received


TraceRequestEndParams
---------------------

Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
[pep8]
max-line-length=79
ignore=E225,E226
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in doubt that we should ignore these. Any reasons why to disable them globally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I'm not in place to judge this. I did it to be able to save a file which has 64*1000 literal without spaces around *. Otherwise autopep just fixes it for me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That autopep suggestion looks correct. You may also write 64000 instead without any loss in readability.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm with you. @asvetlov specifically asked me to keep it as was.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me the strange thing is successful passing flake8 checks without the ignore setting.
I'm totally fine with 64000 or even better 0x10000. Pretty sure it should be 64KiB instead of 64kB

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please drop the change. Feel free to replace the limit with 0x10000 if needed -- I'm +-0 for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


[easy_install]
zip_ok = false
Expand Down
29 changes: 24 additions & 5 deletions tests/test_client_session.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import asyncio
import contextlib
import gc
import json
import re
from http.cookies import SimpleCookie
from io import BytesIO
from unittest import mock

import pytest
Expand Down Expand Up @@ -457,33 +459,47 @@ def test_client_session_implicit_loop_warn():

async def test_request_tracing(loop, aiohttp_client):
async def handler(request):
return web.Response()
return web.json_response({'ok': True})

app = web.Application()
app.router.add_get('/', handler)
app.router.add_post('/', handler)

trace_config_ctx = mock.Mock()
trace_request_ctx = {}
body = 'This is request body'
gathered_req_body = BytesIO()
gathered_res_body = BytesIO()
on_request_start = mock.Mock(side_effect=asyncio.coroutine(mock.Mock()))
on_request_redirect = mock.Mock(side_effect=asyncio.coroutine(mock.Mock()))
on_request_end = mock.Mock(side_effect=asyncio.coroutine(mock.Mock()))

async def on_request_chunk_sent(session, context, params):
gathered_req_body.write(params.chunk)

async def on_response_chunk_received(session, context, params):
gathered_res_body.write(params.chunk)

trace_config = aiohttp.TraceConfig(
trace_config_ctx_factory=mock.Mock(return_value=trace_config_ctx)
)
trace_config.on_request_start.append(on_request_start)
trace_config.on_request_end.append(on_request_end)
trace_config.on_request_chunk_sent.append(on_request_chunk_sent)
trace_config.on_response_chunk_received.append(on_response_chunk_received)
trace_config.on_request_redirect.append(on_request_redirect)

session = await aiohttp_client(app, trace_configs=[trace_config])

async with session.get('/', trace_request_ctx=trace_request_ctx) as resp:
async with session.post(
'/', data=body, trace_request_ctx=trace_request_ctx) as resp:

await resp.json()

on_request_start.assert_called_once_with(
session.session,
trace_config_ctx,
aiohttp.TraceRequestStartParams(
hdrs.METH_GET,
hdrs.METH_POST,
session.make_url('/'),
CIMultiDict()
)
Expand All @@ -493,13 +509,16 @@ async def handler(request):
session.session,
trace_config_ctx,
aiohttp.TraceRequestEndParams(
hdrs.METH_GET,
hdrs.METH_POST,
session.make_url('/'),
CIMultiDict(),
resp
)
)
assert not on_request_redirect.called
assert gathered_req_body.getvalue() == body.encode('utf8')
assert gathered_res_body.getvalue() == json.dumps(
{'ok': True}).encode('utf8')


async def test_request_tracing_exception(loop):
Expand Down
Loading