Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

Sentinel implementation #181

Merged
merged 34 commits into from
Jan 17, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
5b93dea
Some redis sentinel WIP
bdrosen96 Aug 24, 2015
38cc0ce
Cleaned up somewhat and did some simple testing
bdrosen96 Aug 24, 2015
e4915d0
Some cleanup, basic operations working
bdrosen96 Aug 25, 2015
89bcce5
Tests mostly working (one test case has intermittent failure
bdrosen96 Aug 25, 2015
336f1e1
Fix bug with failover for slave
bdrosen96 Aug 25, 2015
dec407c
pub sub working
bdrosen96 Aug 26, 2015
dc5312d
minor clean up
jettify Sep 13, 2015
23dda48
fix flake8 error
jettify Feb 9, 2016
e412690
get rid of pytest
jettify Feb 9, 2016
5545b78
more clean asserts on exception
jettify Feb 9, 2016
e9f2ea3
clean up doc strings
jettify Feb 9, 2016
4fa6d3d
add missed coroutine decorator
jettify Feb 10, 2016
2462ebe
drop function
popravich Feb 29, 2016
0b24337
.travis.yml updated
popravich Feb 29, 2016
f4a7a96
test assertions fixed
popravich Feb 29, 2016
2b8f0e0
RedisSentinelTest refactoring
popravich Feb 29, 2016
7eed8f5
fixed sentinel tests
popravich Mar 1, 2016
a50d78d
convert failing sentinel tests to py.test and mark xfail
popravich Jul 15, 2016
3dc3b7d
Pool: change db initial value to None (do not issue SELECT command)
popravich Jul 29, 2016
4fc0a0e
fix sentinel info for role command
popravich Jul 29, 2016
7c70829
update conftest: add sentinel server runner
popravich Jul 29, 2016
84ea3a5
sentinel test: redis min version
popravich Jul 30, 2016
2a9391f
adopt sentinel tests
popravich Aug 1, 2016
a6e1c76
Sentinel implementation;
popravich Dec 29, 2016
1c8d267
.travis.yml update; conftest win32 fixes
popravich Jan 13, 2017
c2b6b56
mark two sentinel tests xfail (until fixed)
popravich Jan 16, 2017
1ab46a6
drop sentinel_pubsub tests (basically duplicate simple pubsub tests)
popravich Jan 16, 2017
844f97c
split sentinel tests into commands and failover tests
popravich Jan 16, 2017
8ee8b67
get rid of wait_for in conftest
popravich Jan 16, 2017
f2a8959
increase sentinel tests timeout
popravich Jan 16, 2017
914f3eb
skip sentinel tests on windows (unstable)
popravich Jan 16, 2017
9c950fd
pytest doesn't fail from pytest.raises with wrong error
popravich Jan 16, 2017
9a3fcfb
call _do_close directly from _read_data
popravich Jan 16, 2017
7e931b7
try to fix test_quit for win32
popravich Jan 16, 2017
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 8 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@ env:
matrix:
include:
- python: "3.3"
env: TESTS="flake examples"
env: TASKS="ci-test flake examples"
- python: "3.4"
env: TESTS="flake examples"
env: TASKS="ci-test flake examples"
- python: "3.5"
env: TESTS="flake examples spelling"
env: TASKS="ci-test flake examples"
- python: "3.6"
env: TESTS="flake examples" TEST_ARGS=--uvloop
env: TASKS="ci-test flake examples spelling"
- python: "3.6"
env: TASKS="ci-test flake examples" TEST_ARGS="--uvloop"
allow_failures:
- env: TESTS="flake examples" TEST_ARGS=--uvloop
- env: TASKS="ci-test flake examples" TEST_ARGS="--uvloop"

cache: pip
# directories:
Expand All @@ -51,8 +53,7 @@ before_script:
- make -j certificate ci-build-redis

script:
- make ci-test
- make $TESTS
- make $TASKS

after_script:
- codecov
24 changes: 12 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ $(CERT_DIR)/.test.key:
ci-test: $(REDIS_TARGETS)
$(call travis_start,tests)
@echo "Tests run"
py.test -rsxX --cov -n auto \
py.test -rsxX --cov -n 4 \
--ssl-cafile=$(CERT_DIR)/test.crt \
$(foreach T,$(REDIS_TARGETS),--redis-server=$T) $(TEST_ARGS)
$(call travis_end,tests)
Expand All @@ -124,14 +124,14 @@ $(BUILD_DIR)/redis-%.tar.gz:
wget -c https://github.com/antirez/redis/archive/$*.tar.gz \
-O $(BUILD_DIR)/redis-$*.tar.gz

ifdef TRAVIS

define travis_start
@echo "travis_fold:start:$1"
endef

define travis_end
@echo "travis_fold:end:$1"
endef

endif
# ifdef TRAVIS
#
# define travis_start
# @echo "travis_fold:start:$1"
# endef
#
# define travis_end
# @echo "travis_fold:end:$1"
# endef
#
# endif
9 changes: 7 additions & 2 deletions aioredis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@
)
from .pool import ConnectionsPool, create_pool
from .pubsub import Channel
from .sentinel import RedisSentinel, create_sentinel
from .errors import (
ConnectionClosedError,
MasterNotFoundError,
MultiExecError,
PipelineError,
ProtocolError,
ReadOnlyError,
RedisError,
ReplyError,
ChannelClosedError,
WatchVariableError,
PoolClosedError,
SlaveNotFoundError,
)


Expand All @@ -28,10 +32,11 @@
(create_connection, RedisConnection,
create_redis, create_reconnecting_redis, Redis,
create_redis_pool, create_pool,
create_sentinel, RedisSentinel,
RedisPool, ConnectionsPool, Channel,
RedisError, ProtocolError, ReplyError,
PipelineError, MultiExecError, ConnectionClosedError,
ChannelClosedError, WatchVariableError,
PoolClosedError,
ChannelClosedError, WatchVariableError, PoolClosedError,
MasterNotFoundError, SlaveNotFoundError, ReadOnlyError,
GeoPoint, GeoMember,
)
9 changes: 8 additions & 1 deletion aioredis/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ def execute(self, command, *args, **kwargs):
return self._pool_or_conn.execute(command, *args, **kwargs)

def close(self):
"""Close client connections."""
self._pool_or_conn.close()

@asyncio.coroutine
def wait_closed(self):
"""Coroutine waiting until underlying connections are closed."""
yield from self._pool_or_conn.wait_closed()

@property
Expand All @@ -74,6 +76,11 @@ def connection(self):
"""
return self._pool_or_conn

@property
def address(self):
"""Redis connection address (if applicable)."""
return self._pool_or_conn.address

@property
def in_transaction(self):
"""Set to True when MULTI command was issued."""
Expand Down Expand Up @@ -130,7 +137,7 @@ def create_redis(address, *, db=None, password=None, ssl=None,


@asyncio.coroutine
def create_redis_pool(address, *, db=0, password=None, ssl=None,
def create_redis_pool(address, *, db=None, password=None, ssl=None,
encoding=None, commands_factory=Redis,
minsize=1, maxsize=10,
loop=None):
Expand Down
7 changes: 6 additions & 1 deletion aioredis/commands/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ def dbsize(self):
"""Return the number of keys in the selected database."""
return self.execute(b'DBSIZE')

def debug_sleep(self, timeout):
"""Get debugging information about a key."""
fut = self._conn.execute(b'DEBUG', b'SLEEP', timeout)
return wait_ok(fut)

def debug_object(self, key):
"""Get debugging information about a key."""
return self.execute(b'DEBUG', b'OBJECT', key)
Expand Down Expand Up @@ -250,7 +255,7 @@ def parse_info(info):
SlaveInfo = namedtuple('SlaveInfo',
'role master_ip master_port state received')

SentinelInfo = namedtuple('SentinelInfo', 'masters')
SentinelInfo = namedtuple('SentinelInfo', 'role masters')


def parse_role(role):
Expand Down
6 changes: 3 additions & 3 deletions aioredis/commands/transaction.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import functools

from ..pool import ConnectionsPool
from ..abc import AbcPool
from ..errors import RedisError, PipelineError, MultiExecError
from ..util import wait_ok, async_task, create_future

Expand Down Expand Up @@ -162,7 +162,7 @@ def execute(self, *, return_exceptions=False):

@asyncio.coroutine
def _do_execute(self, *, return_exceptions=False):
if isinstance(self._pool_or_conn, ConnectionsPool):
if isinstance(self._pool_or_conn, AbcPool):
with (yield from self._pool_or_conn) as conn:
yield from asyncio.gather(*self._send_pipeline(conn),
loop=self._loop,
Expand Down Expand Up @@ -249,7 +249,7 @@ class MultiExec(Pipeline):
@asyncio.coroutine
def _do_execute(self, *, return_exceptions=False):
self._waiters = waiters = []
is_pool = isinstance(self._pool_or_conn, ConnectionsPool)
is_pool = isinstance(self._pool_or_conn, AbcPool)
if is_pool:
conn = yield from self._pool_or_conn.acquire()
else:
Expand Down
14 changes: 11 additions & 3 deletions aioredis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
ProtocolError,
ReplyError,
WatchVariableError,
ReadOnlyError,
)
from .pubsub import Channel
from .abc import AbcChannel
Expand Down Expand Up @@ -137,17 +138,20 @@ def _read_data(self):
# before response
logger.error("Exception on data read %r", exc, exc_info=True)
break
if data == b'' and self._reader.at_eof():
logger.debug("Connection has been closed by server")
break
self._parser.feed(data)
while True:
try:
obj = self._parser.gets()
except ProtocolError as exc:
# ProtocolError is fatal
# so connection must be closed
self._closing = True
self._loop.call_soon(self._do_close, exc)
if self._in_transaction is not None:
self._transaction_error = exc
self._closing = True
self._do_close(exc)
return
else:
if obj is False:
Expand All @@ -157,12 +161,16 @@ def _read_data(self):
else:
self._process_data(obj)
self._closing = True
self._loop.call_soon(self._do_close, None)
self._do_close(None)

def _process_data(self, obj):
"""Processes command results."""
assert len(self._waiters) > 0, (type(obj), obj)
waiter, encoding, cb = self._waiters.popleft()
if isinstance(obj, RedisError):
if isinstance(obj, ReplyError):
if obj.args[0].startswith('READONLY'):
obj = ReadOnlyError(obj.args[0])
_set_exception(waiter, obj)
if self._in_transaction is not None:
self._transaction_error = obj
Expand Down
15 changes: 15 additions & 0 deletions aioredis/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
'ChannelClosedError',
'ConnectionClosedError',
'PoolClosedError',
'MasterNotFoundError',
'SlaveNotFoundError',
'ReadOnlyError',
]


Expand Down Expand Up @@ -43,6 +46,18 @@ class ChannelClosedError(RedisError):
"""


class ReadOnlyError(RedisError):
"""Raised from slave when read-only mode is enabled"""


class MasterNotFoundError(RedisError):
"""Raised for sentinel master not found error."""


class SlaveNotFoundError(RedisError):
"""Raised for sentinel slave not found error."""


class ConnectionClosedError(RedisError):
"""Raised if connection to server was closed."""

Expand Down
6 changes: 5 additions & 1 deletion aioredis/log.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@


logger = logging.getLogger('aioredis')
sentinel_logger = logger.getChild('sentinel')

if os.environ.get("AIOREDIS_DEBUG"):
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(stream=sys.stderr))
handler = logging.StreamHandler(stream=sys.stderr)
handler.setFormatter(logging.Formatter(
"%(asctime)s %(name)s %(levelname)s %(message)s"))
logger.addHandler(handler)
os.environ["AIOREDIS_DEBUG"] = ""
32 changes: 21 additions & 11 deletions aioredis/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
from .log import logger
from .util import async_task, _NOTSET
from .errors import PoolClosedError
from .abc import AbcPool


PY_35 = sys.version_info >= (3, 5)


@asyncio.coroutine
def create_pool(address, *, db=0, password=None, ssl=None, encoding=None,
def create_pool(address, *, db=None, password=None, ssl=None, encoding=None,
minsize=1, maxsize=10, commands_factory=_NOTSET, loop=None):
# FIXME: rewrite docstring
"""Creates Redis Pool.
Expand Down Expand Up @@ -46,10 +47,10 @@ def create_pool(address, *, db=0, password=None, ssl=None, encoding=None,
return pool


class ConnectionsPool:
class ConnectionsPool(AbcPool):
"""Redis connections pool."""

def __init__(self, address, db=0, password=None, encoding=None,
def __init__(self, address, db=None, password=None, encoding=None,
*, minsize, maxsize, ssl=None, loop=None):
assert isinstance(minsize, int) and minsize >= 0, (
"minsize must be int >= 0", minsize, type(minsize))
Expand Down Expand Up @@ -100,19 +101,27 @@ def freesize(self):
"""Current number of free connections."""
return len(self._pool)

@property
def address(self):
return self._address

@asyncio.coroutine
def clear(self):
"""Clear pool connections.

Close and remove all free connections.
"""
with (yield from self._cond):
waiters = []
while self._pool:
conn = self._pool.popleft()
conn.close()
waiters.append(conn.wait_closed())
yield from asyncio.gather(*waiters, loop=self._loop)
yield from self._do_clear()

@asyncio.coroutine
def _do_clear(self):
waiters = []
while self._pool:
conn = self._pool.popleft()
conn.close()
waiters.append(conn.wait_closed())
yield from asyncio.gather(*waiters, loop=self._loop)

@asyncio.coroutine
def _do_close(self):
Expand All @@ -128,7 +137,8 @@ def _do_close(self):
conn.close()
waiters.append(conn.wait_closed())
yield from asyncio.gather(*waiters, loop=self._loop)
logger.debug("Closed %d connections", len(waiters))
# TODO: close _pubsub_conn connection
logger.debug("Closed %d connection(s)", len(waiters))

def close(self):
"""Close all free and in-progress connections and mark pool as closed.
Expand All @@ -149,7 +159,7 @@ def wait_closed(self):
@property
def db(self):
"""Currently selected db index."""
return self._db
return self._db or 0

@property
def encoding(self):
Expand Down
9 changes: 9 additions & 0 deletions aioredis/sentinel/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .commands import RedisSentinel, create_sentinel
from .pool import SentinelPool, create_sentinel_pool

__all__ = [
"create_sentinel",
"create_sentinel_pool",
"RedisSentinel",
"SentinelPool",
]
Loading