Skip to content
This repository has been archived by the owner on Feb 21, 2023. It is now read-only.

add migrate_keys command #187

Merged
merged 2 commits into from
Jan 18, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 35 additions & 1 deletion aioredis/commands/generic.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def keys(self, pattern, *, encoding=_NOTSET):
"""Returns all keys matching pattern."""
return self.execute(b'KEYS', pattern, encoding=encoding)

def migrate(self, host, port, key, dest_db, timeout,
def migrate(self, host, port, key, dest_db, timeout, *,
copy=False, replace=False):
"""Atomically transfer a key from a Redis instance to another one."""
if not isinstance(host, str):
Expand All @@ -88,6 +88,40 @@ def migrate(self, host, port, key, dest_db, timeout,
key, dest_db, timeout, *flags)
return wait_ok(fut)

def migrate_keys(self, host, port, keys, dest_db, timeout, *,
copy=False, replace=False):
"""Atomically transfer keys from one Redis instance to another one.

Keys argument must be list/tuple of keys to migrate.
"""
if not isinstance(host, str):
raise TypeError("host argument must be str")
if not isinstance(timeout, int):
raise TypeError("timeout argument must be int")
if not isinstance(dest_db, int):
raise TypeError("dest_db argument must be int")
if not isinstance(keys, (list, tuple)):
raise TypeError("keys argument must be list or tuple")
if not host:
raise ValueError("Got empty host")
if dest_db < 0:
raise ValueError("dest_db must be greater equal 0")
if timeout < 0:
raise ValueError("timeout must be greater equal 0")
if not keys:
raise ValueError("keys must not be empty")

flags = []
if copy:
flags.append(b'COPY')
if replace:
flags.append(b'REPLACE')
flags.append(b'KEYS')
flags.extend(keys)
fut = self.execute(b'MIGRATE', host, port,
"", dest_db, timeout, *flags)
return wait_ok(fut)

def move(self, key, db):
"""Move key from currently selected database to specified destination.

Expand Down
2 changes: 1 addition & 1 deletion aioredis/commands/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def dbsize(self):

def debug_sleep(self, timeout):
"""Get debugging information about a key."""
fut = self._conn.execute(b'DEBUG', b'SLEEP', timeout)
fut = self.execute(b'DEBUG', b'SLEEP', timeout)
return wait_ok(fut)

def debug_object(self, key):
Expand Down
105 changes: 105 additions & 0 deletions tests/generic_commands_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import time
import math
import pytest
import sys

from unittest import mock

from aioredis import ReplyError
Expand Down Expand Up @@ -232,6 +234,109 @@ def test_migrate_copy_replace(create_redis, loop, server, serverB):
assert (yield from redisB.get('my-key'))


@pytest.redis_version(
3, 0, 6, reason="MIGRATE…KEYS available since Redis 3.0.6")
@pytest.mark.skipif(
sys.platform == 'win32', reason="Seems to be unavailable in win32 build")
@pytest.mark.run_loop
def test_migrate_keys(create_redis, loop, server, serverB):
redisA = yield from create_redis(server.tcp_address)
redisB = yield from create_redis(serverB.tcp_address, db=0)

yield from add(redisA, 'key1', 123)
yield from add(redisA, 'key2', 123)
yield from add(redisA, 'key3', 123)
yield from redisB.delete('key1', 'key2', 'key3')

ok = yield from redisA.migrate_keys(
'localhost', serverB.tcp_address.port,
('key1', 'key2', 'key3', 'non-existing-key'),
dest_db=0, timeout=1000)
assert ok is True

assert (yield from redisB.get('key1')) == b'123'
assert (yield from redisB.get('key2')) == b'123'
assert (yield from redisB.get('key3')) == b'123'
assert (yield from redisA.get('key1')) is None
assert (yield from redisA.get('key2')) is None
assert (yield from redisA.get('key3')) is None

ok = yield from redisA.migrate_keys(
'localhost', serverB.tcp_address.port, ('key1', 'key2', 'key3'),
dest_db=0, timeout=1000)
assert not ok
ok = yield from redisB.migrate_keys(
'localhost', server.tcp_address.port, ('key1', 'key2', 'key3'),
dest_db=0, timeout=1000,
copy=True)
assert ok
assert (yield from redisB.get('key1')) == b'123'
assert (yield from redisB.get('key2')) == b'123'
assert (yield from redisB.get('key3')) == b'123'
assert (yield from redisA.get('key1')) == b'123'
assert (yield from redisA.get('key2')) == b'123'
assert (yield from redisA.get('key3')) == b'123'

assert (yield from redisA.set('key1', 'val'))
assert (yield from redisA.set('key2', 'val'))
assert (yield from redisA.set('key3', 'val'))
ok = yield from redisA.migrate_keys(
'localhost', serverB.tcp_address.port,
('key1', 'key2', 'key3', 'non-existing-key'),
dest_db=0, timeout=1000, replace=True)
assert ok is True

assert (yield from redisB.get('key1')) == b'val'
assert (yield from redisB.get('key2')) == b'val'
assert (yield from redisB.get('key3')) == b'val'
assert (yield from redisA.get('key1')) is None
assert (yield from redisA.get('key2')) is None
assert (yield from redisA.get('key3')) is None


@pytest.mark.run_loop
def test_migrate__exceptions(create_redis, loop, server, serverB):
redisA = yield from create_redis(server.tcp_address)
redisB = yield from create_redis(serverB.tcp_address, db=2)

yield from add(redisA, 'my-key', 123)

yield from redisB.delete('my-key')
assert (yield from redisA.exists('my-key'))
assert not (yield from redisB.exists('my-key'))

fut1 = redisB.debug_sleep(2)
fut2 = redisA.migrate('localhost', serverB.tcp_address.port,
'my-key', dest_db=2, timeout=10)
yield from fut1
with pytest.raises_regex(ReplyError, "IOERR .* timeout .*"):
assert not (yield from fut2)


@pytest.redis_version(
3, 0, 6, reason="MIGRATE…KEYS available since Redis 3.0.6")
@pytest.mark.skipif(
sys.platform == 'win32', reason="Seems to be unavailable in win32 build")
@pytest.mark.run_loop
def test_migrate_keys__errors(redis):
with pytest.raises_regex(TypeError, "host .* str"):
yield from redis.migrate_keys(None, 1234, 'key', 1, 23)
with pytest.raises_regex(TypeError, "keys .* list or tuple"):
yield from redis.migrate_keys('host', '1234', None, 1, 123)
with pytest.raises_regex(TypeError, "dest_db .* int"):
yield from redis.migrate_keys('host', 123, ('key',), 1.0, 123)
with pytest.raises_regex(TypeError, "timeout .* int"):
yield from redis.migrate_keys('host', '1234', ('key',), 2, None)
with pytest.raises_regex(ValueError, "Got empty host"):
yield from redis.migrate_keys('', '123', ('key',), 1, 123)
with pytest.raises_regex(ValueError, "dest_db .* greater equal 0"):
yield from redis.migrate_keys('host', 6379, ('key',), -1, 1000)
with pytest.raises_regex(ValueError, "timeout .* greater equal 0"):
yield from redis.migrate_keys('host', 6379, ('key',), 1, -1000)
with pytest.raises_regex(ValueError, "keys .* empty"):
yield from redis.migrate_keys('host', '1234', (), 2, 123)


@pytest.mark.run_loop
def test_move(redis):
yield from add(redis, 'my-key', 123)
Expand Down