Skip to content

Commit

Permalink
Merge branch 'master' into fix_close_deprecation_version
Browse files Browse the repository at this point in the history
  • Loading branch information
zware authored Nov 18, 2023
2 parents 4dbece6 + 8875d55 commit 644ed6c
Show file tree
Hide file tree
Showing 31 changed files with 736 additions and 119 deletions.
1 change: 1 addition & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
doctests/* @dmaier-redislabs
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ The Python interface to the Redis key-value store.

---------------------------------------------

## How do I Redis?

[Learn for free at Redis University](https://university.redis.com/)

[Build faster with the Redis Launchpad](https://launchpad.redis.com/)

[Try the Redis Cloud](https://redis.com/try-free/)

[Dive in developer tutorials](https://developer.redis.com/)

[Join the Redis community](https://redis.com/community/)

[Work at Redis](https://redis.com/company/careers/jobs/)

## Installation

Start a redis via docker:
Expand Down
2 changes: 2 additions & 0 deletions redis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from redis import asyncio # noqa
from redis.backoff import default_backoff
from redis.cache import _LocalChace
from redis.client import Redis, StrictRedis
from redis.cluster import RedisCluster
from redis.connection import (
Expand Down Expand Up @@ -61,6 +62,7 @@ def int_or_str(value):
VERSION = tuple([99, 99, 99])

__all__ = [
"_LocalChace",
"AuthenticationError",
"AuthenticationWrongNumberOfArgsError",
"BlockingConnectionPool",
Expand Down
19 changes: 12 additions & 7 deletions redis/_parsers/resp3.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,9 @@ def _read_response(self, disable_decoding=False, push_request=False):
pass
# map response
elif byte == b"%":
# we use this approach and not dict comprehension here
# because this dict comprehension fails in python 3.7
# We cannot use a dict-comprehension to parse stream.
# Evaluation order of key:val expression in dict comprehension only
# became defined to be left-right in version 3.8
resp_dict = {}
for _ in range(int(response)):
key = self._read_response(disable_decoding=disable_decoding)
Expand Down Expand Up @@ -225,12 +226,16 @@ async def _read_response(
pass
# map response
elif byte == b"%":
response = {
(await self._read_response(disable_decoding=disable_decoding)): (
await self._read_response(disable_decoding=disable_decoding)
# We cannot use a dict-comprehension to parse stream.
# Evaluation order of key:val expression in dict comprehension only
# became defined to be left-right in version 3.8
resp_dict = {}
for _ in range(int(response)):
key = await self._read_response(disable_decoding=disable_decoding)
resp_dict[key] = await self._read_response(
disable_decoding=disable_decoding, push_request=push_request
)
for _ in range(int(response))
}
response = resp_dict
# push response
elif byte == b">":
response = [
Expand Down
3 changes: 3 additions & 0 deletions redis/asyncio/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,7 @@ def __del__(
_grl().call_exception_handler(context)
except RuntimeError:
pass
self.connection._close()

async def aclose(self, close_connection_pool: Optional[bool] = None) -> None:
"""
Expand Down Expand Up @@ -596,6 +597,7 @@ async def _disconnect_raise(self, conn: Connection, error: Exception):
async def execute_command(self, *args, **options):
"""Execute a command and return a parsed response"""
await self.initialize()
options.pop("keys", None) # the keys are used only for client side caching
pool = self.connection_pool
command_name = args[0]
conn = self.connection or await pool.get_connection(command_name, **options)
Expand Down Expand Up @@ -1274,6 +1276,7 @@ def multi(self):
def execute_command(
self, *args, **kwargs
) -> Union["Pipeline", Awaitable["Pipeline"]]:
kwargs.pop("keys", None) # the keys are used only for client side caching
if (self.watching or args[0] == "WATCH") and not self.explicit_transaction:
return self.immediate_execute_command(*args, **kwargs)
return self.pipeline_execute_command(*args, **kwargs)
Expand Down
2 changes: 2 additions & 0 deletions redis/asyncio/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
:raises RedisClusterException: if target_nodes is not provided & the command
can't be mapped to a slot
"""
kwargs.pop("keys", None) # the keys are used only for client side caching
command = args[0]
target_nodes = []
target_nodes_specified = False
Expand Down Expand Up @@ -1447,6 +1448,7 @@ def execute_command(
or List[:class:`~.ClusterNode`] or Dict[Any, :class:`~.ClusterNode`]
- Rest of the kwargs are passed to the Redis connection
"""
kwargs.pop("keys", None) # the keys are used only for client side caching
self._command_stack.append(
PipelineCommand(len(self._command_stack), *args, **kwargs)
)
Expand Down
56 changes: 44 additions & 12 deletions redis/asyncio/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import socket
import ssl
import sys
import warnings
import weakref
from abc import abstractmethod
from itertools import chain
Expand Down Expand Up @@ -204,6 +205,24 @@ def __init__(
raise ConnectionError("protocol must be either 2 or 3")
self.protocol = protocol

def __del__(self, _warnings: Any = warnings):
# For some reason, the individual streams don't get properly garbage
# collected and therefore produce no resource warnings. We add one
# here, in the same style as those from the stdlib.
if getattr(self, "_writer", None):
_warnings.warn(
f"unclosed Connection {self!r}", ResourceWarning, source=self
)
self._close()

def _close(self):
"""
Internal method to silently close the connection without waiting
"""
if self._writer:
self._writer.close()
self._writer = self._reader = None

def __repr__(self):
repr_args = ",".join((f"{k}={v}" for k, v in self.repr_pieces()))
return f"{self.__class__.__name__}<{repr_args}>"
Expand Down Expand Up @@ -861,6 +880,7 @@ def to_bool(value) -> Optional[bool]:
"max_connections": int,
"health_check_interval": int,
"ssl_check_hostname": to_bool,
"timeout": float,
}
)

Expand Down Expand Up @@ -1017,7 +1037,7 @@ def __repr__(self):

def reset(self):
self._available_connections = []
self._in_use_connections = set()
self._in_use_connections = weakref.WeakSet()

def can_get_connection(self) -> bool:
"""Return True if a connection can be retrieved from the pool."""
Expand All @@ -1027,21 +1047,25 @@ def can_get_connection(self) -> bool:
)

async def get_connection(self, command_name, *keys, **options):
"""Get a connection from the pool"""
"""Get a connected connection from the pool"""
connection = self.get_available_connection()
try:
await self.ensure_connection(connection)
except BaseException:
await self.release(connection)
raise

return connection

def get_available_connection(self):
"""Get a connection from the pool, without making sure it is connected"""
try:
connection = self._available_connections.pop()
except IndexError:
if len(self._in_use_connections) >= self.max_connections:
raise ConnectionError("Too many connections") from None
connection = self.make_connection()
self._in_use_connections.add(connection)

try:
await self.ensure_connection(connection)
except BaseException:
await self.release(connection)
raise

return connection

def get_encoder(self):
Expand Down Expand Up @@ -1166,13 +1190,21 @@ def __init__(
async def get_connection(self, command_name, *keys, **options):
"""Gets a connection from the pool, blocking until one is available"""
try:
async with async_timeout(self.timeout):
async with self._condition:
async with self._condition:
async with async_timeout(self.timeout):
await self._condition.wait_for(self.can_get_connection)
return await super().get_connection(command_name, *keys, **options)
connection = super().get_available_connection()
except asyncio.TimeoutError as err:
raise ConnectionError("No connection available.") from err

# We now perform the connection check outside of the lock.
try:
await self.ensure_connection(connection)
return connection
except BaseException:
await self.release(connection)
raise

async def release(self, connection: AbstractConnection):
"""Releases the connection back to the pool."""
async with self._condition:
Expand Down
1 change: 1 addition & 0 deletions redis/asyncio/sentinel.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ async def execute_command(self, *args, **kwargs):
once - If set to True, then execute the resulting command on a single
node at random, rather than across the entire sentinel cluster.
"""
kwargs.pop("keys", None) # the keys are used only for client side caching
once = bool(kwargs.get("once", False))
if "once" in kwargs.keys():
kwargs.pop("once")
Expand Down
Loading

0 comments on commit 644ed6c

Please sign in to comment.