Skip to content

Commit

Permalink
Merge branch 'master' into freeze-resets
Browse files Browse the repository at this point in the history
  • Loading branch information
nolar authored Dec 19, 2019
2 parents 9d65642 + 60038b5 commit 222d5f3
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 1 deletion.
57 changes: 56 additions & 1 deletion kopf/clients/watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,65 @@ async def watch_objs(
freeze_waiter.add_done_callback(response_close_callback)
try:
async with response:
async for line in response.content:
async for line in _iter_jsonlines(response.content):
event = cast(bodies.RawEvent, json.loads(line.decode("utf-8")))
yield event
except (aiohttp.ClientConnectionError, aiohttp.ClientPayloadError):
pass
finally:
freeze_waiter.remove_done_callback(response_close_callback)


async def _iter_jsonlines(
content: aiohttp.StreamReader,
chunk_size: int = 1024 * 1024,
) -> AsyncIterator[bytes]:
"""
Iterate line by line over the response's content.
Usage::
async for line in _iter_lines(response.content):
pass
This is an equivalent of::
async for line in response.content:
pass
Except that the aiohttp's line iteration fails if the accumulated buffer
length is above 2**17 bytes, i.e. 128 KB (`aiohttp.streams.DEFAULT_LIMIT`
for the buffer's low-watermark, multiplied by 2 for the high-watermark).
Kubernetes secrets and other fields can be much longer, up to MBs in length.
The chunk size of 1MB is an empirical guess for keeping the memory footprint
reasonably low on huge amount of small lines (limited to 1 MB in total),
while ensuring the near-instant reads of the huge lines (can be a problem
with a small chunk size due to too many iterations).
.. seealso::
https://github.com/zalando-incubator/kopf/issues/275
"""

# Minimize the memory footprint by keeping at most 2 copies of a yielded line in memory
# (in the buffer and as a yielded value), and at most 1 copy of other lines (in the buffer).
buffer = b''
async for data in content.iter_chunked(chunk_size):
buffer += data
del data

start = 0
index = buffer.find(b'\n', start)
while index >= 0:
line = buffer[start:index]
if line:
yield line
del line
start = index + 1
index = buffer.find(b'\n', start)

if start > 0:
buffer = buffer[start:]

if buffer:
yield buffer
20 changes: 20 additions & 0 deletions tests/k8s/test_watching.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,3 +236,23 @@ async def test_infinite_watch_never_exits_normally(resource, stream, namespace,
assert events[0]['object']['spec'] == 'a'
assert events[1]['object']['spec'] == 'a'
assert events[2]['object']['spec'] == 'b'


# See: See: https://github.com/zalando-incubator/kopf/issues/275
async def test_long_line_parsing(resource, stream, namespace, aresponses):
content = [
{'type': 'ADDED', 'object': {'spec': {'field': 'x'}}},
{'type': 'ADDED', 'object': {'spec': {'field': 'y' * (2 * 1024 * 1024)}}},
{'type': 'ADDED', 'object': {'spec': {'field': 'z' * (4 * 1024 * 1024)}}},
]
stream.feed(content, namespace=namespace)
stream.close(namespace=namespace)

events = []
async for event in streaming_watch(resource=resource, namespace=namespace):
events.append(event)

assert len(events) == 3
assert len(events[0]['object']['spec']['field']) == 1
assert len(events[1]['object']['spec']['field']) == 2 * 1024 * 1024
assert len(events[2]['object']['spec']['field']) == 4 * 1024 * 1024
77 changes: 77 additions & 0 deletions tests/k8s/test_watching_iterjsonlines.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import asynctest
from kopf.clients.watching import _iter_jsonlines


async def test_empty_content():
async def iter_chunked(n: int):
if False: # to make this function a generator
yield b''

content = asynctest.Mock(iter_chunked=iter_chunked)
lines = []
async for line in _iter_jsonlines(content):
lines.append(line)

assert lines == []


async def test_empty_chunk():
async def iter_chunked(n: int):
yield b''

content = asynctest.Mock(iter_chunked=iter_chunked)
lines = []
async for line in _iter_jsonlines(content):
lines.append(line)

assert lines == []


async def test_one_chunk_one_line():
async def iter_chunked(n: int):
yield b'hello'

content = asynctest.Mock(iter_chunked=iter_chunked)
lines = []
async for line in _iter_jsonlines(content):
lines.append(line)

assert lines == [b'hello']


async def test_one_chunk_two_lines():
async def iter_chunked(n: int):
yield b'hello\nworld'

content = asynctest.Mock(iter_chunked=iter_chunked)
lines = []
async for line in _iter_jsonlines(content):
lines.append(line)

assert lines == [b'hello', b'world']


async def test_one_chunk_empty_lines():
async def iter_chunked(n: int):
yield b'\n\nhello\n\nworld\n\n'

content = asynctest.Mock(iter_chunked=iter_chunked)
lines = []
async for line in _iter_jsonlines(content):
lines.append(line)

assert lines == [b'hello', b'world']


async def test_few_chunks_split():
async def iter_chunked(n: int):
yield b'\n\nhell'
yield b'o\n\nwor'
yield b'ld\n\n'

content = asynctest.Mock(iter_chunked=iter_chunked)
lines = []
async for line in _iter_jsonlines(content):
lines.append(line)

assert lines == [b'hello', b'world']

0 comments on commit 222d5f3

Please sign in to comment.