Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify HTTP pipelining implementation #2623

Merged
merged 3 commits into from
Dec 26, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/2109.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Simplify HTTP pipelining implementation
70 changes: 5 additions & 65 deletions aiohttp/http_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import suppress

from .abc import AbstractPayloadWriter
from .helpers import noop, set_result
from .helpers import noop


__all__ = ('PayloadWriter', 'HttpVersion', 'HttpVersion10', 'HttpVersion11',
Expand Down Expand Up @@ -34,34 +34,8 @@ def __init__(self, protocol, transport, loop):
self._tcp_cork = False
self._socket = transport.get_extra_info('socket')
self._waiters = []
self.available = True
self.transport = transport

def acquire(self, writer):
if self.available:
self.available = False
writer.set_transport(self.transport)
else:
self._waiters.append(writer)

def release(self):
if self._waiters:
self.available = False
writer = self._waiters.pop(0)
writer.set_transport(self.transport)
else:
self.available = True

def replace(self, writer, factory):
try:
idx = self._waiters.index(writer)
writer = factory(self, self._loop, False)
self._waiters[idx] = writer
return writer
except ValueError:
self.available = True
return factory(self, self._loop)

@property
def tcp_nodelay(self):
return self._tcp_nodelay
Expand Down Expand Up @@ -122,10 +96,9 @@ async def drain(self):

class PayloadWriter(AbstractPayloadWriter):

def __init__(self, stream, loop, acquire=True):
def __init__(self, stream, loop):
self._stream = stream
self._transport = None
self._buffer = []

self.loop = loop
self.length = None
Expand All @@ -136,32 +109,9 @@ def __init__(self, stream, loop, acquire=True):
self._eof = False
self._compress = None
self._drain_waiter = None

if self._stream.available:
self._transport = self._stream.transport
self._buffer = None
self._stream.available = False
elif acquire:
self._stream.acquire(self)

def set_transport(self, transport):
self._transport = transport

if self._buffer is not None:
for chunk in self._buffer:
transport.write(chunk)
self._buffer = None

if self._drain_waiter is not None:
waiter, self._drain_waiter = self._drain_waiter, None
set_result(waiter, None)
self._transport = self._stream.transport

async def get_transport(self):
if self._transport is None:
if self._drain_waiter is None:
self._drain_waiter = self.loop.create_future()
await self._drain_waiter

return self._transport

def enable_chunking(self):
Expand All @@ -176,12 +126,7 @@ def _write(self, chunk):
size = len(chunk)
self.buffer_size += size
self.output_size += size

# see set_transport: exactly one of _buffer or _transport is None
if self._transport is not None:
self._transport.write(chunk)
else:
self._buffer.append(chunk)
self._transport.write(chunk)

def write(self, chunk, *, drain=True, LIMIT=64*1024):
"""Writes chunk of data to a stream.
Expand Down Expand Up @@ -253,11 +198,6 @@ async def write_eof(self, chunk=b''):

self._eof = True
self._transport = None
self._stream.release()

async def drain(self):
if self._transport is not None:
await self._stream.drain()
else:
# wait for transport
await self.get_transport()
await self._stream.drain()
7 changes: 5 additions & 2 deletions aiohttp/web_fileresponse.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,12 @@ async def _sendfile_system(self, request, fobj, count):
transport.get_extra_info("socket") is None):
writer = await self._sendfile_fallback(request, fobj, count)
else:
writer = request._protocol.writer.replace(
request._payload_writer, SendfilePayloadWriter)
writer = SendfilePayloadWriter(
request._protocol.writer,
request.loop
)
request._payload_writer = writer

await super().prepare(request)
await writer.sendfile(fobj, count)

Expand Down
Loading