diff --git a/CHANGES.rst b/CHANGES.rst index efb105550e..67ba593e0c 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,3 +1,8 @@ +2.2.903 (2023-11-06) +==================== + +- Improved overall performances in HTTP/2, and HTTP/3, with or without multiplexed. + 2.2.902 (2023-11-05) ==================== diff --git a/src/urllib3/_version.py b/src/urllib3/_version.py index 1ca688affe..a38f8776a1 100644 --- a/src/urllib3/_version.py +++ b/src/urllib3/_version.py @@ -1,4 +1,4 @@ # This file is protected via CODEOWNERS from __future__ import annotations -__version__ = "2.2.902" +__version__ = "2.2.903" diff --git a/src/urllib3/backend/_base.py b/src/urllib3/backend/_base.py index e70e0574d7..dcb089f4ec 100644 --- a/src/urllib3/backend/_base.py +++ b/src/urllib3/backend/_base.py @@ -282,11 +282,11 @@ def __init__( # valuable intel self.conn_info: ConnectionInfo | None = None - self._promises: list[ResponsePromise] = [] - self._pending_responses: list[LowLevelResponse] = [] + self._promises: dict[str, ResponsePromise] = {} + self._pending_responses: dict[int, LowLevelResponse] = {} def __contains__(self, item: ResponsePromise) -> bool: - return item in self._promises + return item.uid in self._promises @property def disabled_svn(self) -> set[HttpVersion]: diff --git a/src/urllib3/backend/hface.py b/src/urllib3/backend/hface.py index 5ddae4ef2a..6a975fc59b 100644 --- a/src/urllib3/backend/hface.py +++ b/src/urllib3/backend/hface.py @@ -732,7 +732,7 @@ def endheaders( if should_end_stream: rp = ResponsePromise(self, self._stream_id, self.__headers) - self._promises.append(rp) + self._promises[rp.uid] = rp return rp return None @@ -757,14 +757,9 @@ def __read_st( if events and events[-1].end_stream: eot = True - _r = None - for _r in self._pending_responses: - if _r._stream_id == __stream_id: - break - - if _r: - self._pending_responses.remove(_r) + if __stream_id in self._pending_responses: + del self._pending_responses[__stream_id] if self.is_idle: # probe for h3/quic if available, and remember it. @@ -820,7 +815,7 @@ def getresponse( headers.add(header, value) if promise is None: - for p in self._promises: + for p in self._promises.values(): if p.stream_id == events[-1].stream_id: promise = p break @@ -853,7 +848,7 @@ def getresponse( response.from_promise = promise # we delivered a response, we can safely remove the promise from queue. - self._promises.remove(promise) + del self._promises[promise.uid] # keep last response self._response: LowLevelResponse = response @@ -870,7 +865,7 @@ def getresponse( if self._protocol and self._protocol.has_expired(): self.close() else: - self._pending_responses.append(response) + self._pending_responses[promise.stream_id] = response return response @@ -913,7 +908,7 @@ def send( # this is a bad sign. we should stop sending and instead retrieve the response. if self._protocol.has_pending_event(stream_id=self._stream_id): rp = ResponsePromise(self, self._stream_id, self.__headers) - self._promises.append(rp) + self._promises[rp.uid] = rp raise EarlyResponse(promise=rp) @@ -952,7 +947,7 @@ def send( if eot or remote_pipe_shutdown: rp = ResponsePromise(self, self._stream_id, self.__headers) - self._promises.append(rp) + self._promises[rp.uid] = rp if remote_pipe_shutdown: remote_pipe_shutdown.promise = rp # type: ignore[attr-defined] raise remote_pipe_shutdown diff --git a/src/urllib3/contrib/hface/protocols/http2/_h2.py b/src/urllib3/contrib/hface/protocols/http2/_h2.py index 49a52bf386..fc8e7ccb7a 100644 --- a/src/urllib3/contrib/hface/protocols/http2/_h2.py +++ b/src/urllib3/contrib/hface/protocols/http2/_h2.py @@ -58,7 +58,6 @@ def __init__( ) self._connection.initiate_connection() self._events: deque[Event] = deque() - self._events_streams: list[int] = [] self._terminated: bool = False @staticmethod @@ -95,28 +94,21 @@ def submit_stream_reset(self, stream_id: int, error_code: int = 0) -> None: def next_event(self) -> Event | None: if not self._events: return None - ev = self._events.popleft() - if hasattr(ev, "stream_id"): - self._events_streams.remove(ev.stream_id) - - return ev + return self._events.popleft() def has_pending_event(self, *, stream_id: int | None = None) -> bool: if stream_id is None: return len(self._events) > 0 - try: - self._events_streams.index(stream_id) - except ValueError: - return False + for ev in self._events: + if hasattr(ev, "stream_id") and ev.stream_id == stream_id: + return True - return True + return False def _map_events(self, h2_events: list[h2.events.Event]) -> Iterator[Event]: for e in h2_events: - if hasattr(e, "stream_id"): - self._events_streams.append(e.stream_id) if isinstance( e, ( @@ -188,5 +180,3 @@ def should_wait_remote_flow_control( def reshelve(self, *events: Event) -> None: for ev in reversed(events): self._events.appendleft(ev) - if hasattr(ev, "stream_id"): - self._events_streams.append(ev.stream_id) diff --git a/src/urllib3/contrib/hface/protocols/http3/_qh3.py b/src/urllib3/contrib/hface/protocols/http3/_qh3.py index 7b10c789b9..689c479102 100644 --- a/src/urllib3/contrib/hface/protocols/http3/_qh3.py +++ b/src/urllib3/contrib/hface/protocols/http3/_qh3.py @@ -99,7 +99,6 @@ def __init__( self._connection_ids: set[bytes] = set() self._remote_address = remote_address self._event_buffer: deque[Event] = deque() - self._events_streams: list[int] = [] self._packets: deque[bytes] = deque() self._http: H3Connection | None = None self._terminated: bool = False @@ -110,8 +109,8 @@ def exceptions() -> tuple[type[BaseException], ...]: return ProtocolError, H3Error, QuicConnectionError, AssertionError def is_available(self) -> bool: - # todo: qh3 hardcode 128 streams as max, change this! - return self._terminated is False and 128 > len(self._quic._streams) + max_stream_bidi = self._quic._local_max_streams_bidi.value + return self._terminated is False and max_stream_bidi > len(self._quic._streams) def has_expired(self) -> bool: return self._terminated @@ -156,19 +155,15 @@ def submit_stream_reset(self, stream_id: int, error_code: int = 0) -> None: def next_event(self) -> Event | None: if not self._event_buffer: return None - ev = self._event_buffer.popleft() - if hasattr(ev, "stream_id"): - self._events_streams.remove(ev.stream_id) - return ev + return self._event_buffer.popleft() def has_pending_event(self, *, stream_id: int | None = None) -> bool: if stream_id is None: return len(self._event_buffer) > 0 - try: - self._events_streams.index(stream_id) - except ValueError: - return False - return True + for ev in self._event_buffer: + if hasattr(ev, "stream_id") and ev.stream_id == stream_id: + return True + return False @property def connection_ids(self) -> Sequence[bytes]: @@ -240,8 +235,6 @@ def _map_quic_event(self, quic_event: quic_events.QuicEvent) -> Iterable[Event]: yield StreamResetReceived(quic_event.stream_id, quic_event.error_code) def _map_h3_event(self, h3_event: h3_events.H3Event) -> Iterable[Event]: - if hasattr(h3_event, "stream_id"): - self._events_streams.append(h3_event.stream_id) if isinstance(h3_event, h3_events.HeadersReceived): yield HeadersReceived( h3_event.stream_id, h3_event.headers, h3_event.stream_ended @@ -506,6 +499,4 @@ def cipher(self) -> str | None: def reshelve(self, *events: Event) -> None: for ev in reversed(events): - if hasattr(ev, "stream_id"): - self._events_streams.append(ev.stream_id) self._event_buffer.appendleft(ev)