Skip to content

Commit

Permalink
[GH-46]Support persistent queue for storops
Browse files Browse the repository at this point in the history
With the support of this commit, user can:

* add storops operations to a queue, and they can be executed periodically till
operation executes correctly.
* unfinished items in the queue can be restored automatically after restart.
  • Loading branch information
peter-wangxu committed Nov 17, 2016
1 parent 51c7121 commit fc0eeba
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 0 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ retryz>=0.1.8
cachez>=0.1.0
six>=1.9.0
bitmath>=1.3.0
pqueue>=0.1.4
4 changes: 4 additions & 0 deletions storops/lib/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@


class Resource(JsonPrinter):

def __init__(self):
self._property_cache = {}
self._parsed_resource = None
Expand Down Expand Up @@ -180,6 +181,9 @@ def _get_raw_resource(self):
return ''

def __getattr__(self, item):
# To avoid infinite loop of accessing the nonexistent property
if '_property_cache' not in self.__dict__:
raise AttributeError(item)
if item in self._property_cache:
ret = self._property_cache[item]
elif not item.startswith('_'):
Expand Down
109 changes: 109 additions & 0 deletions storops/lib/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# coding=utf-8
# Copyright (c) 2016 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import logging
import pqueue
import threading
import time
from storops.exception import StoropsException

__author__ = 'Peter Wang'
log = logging.getLogger(__name__)


class PQueue(object):
DEFAULT_INTERVAL = 300
MAX_RETRIES = 100

def __init__(self, path, interval=None):
self.path = path
self._q = pqueue.Queue(self.path)
self._interval = (
self.DEFAULT_INTERVAL if interval is None else interval)
self.started = False

def put(self, func, **kwargs):
item = {'object': func.__self__, 'method': func.__name__,
'params': kwargs}
self._q.put(item)

def get(self, block=True):
return self._q.get(block=block)

def start(self):
if not self.started:
self._run()
self.started = True
else:
log.info("PQueue[{}] had already started.".format(self.path))

def stop(self):
self._interval = 0
self.started = False

def task_done(self):
self._q.task_done()

def set_interval(self, interval):
self._interval = interval

def _run(self):
self._t = threading.Thread(target=self._run_tasks)
self._t.setDaemon(True)
self._t.start()

def re_enqueue(self, item):
"""Re-enqueue till reach max retries."""
if 'retries' in item:
retries = item['retries']
if retries >= self.MAX_RETRIES:
log.warn("Failed to execute {} after {} retries, give it "
" up.".format(item['method'], retries))
else:
retries += 1
item['retries'] = retries
self._q.put(item)
else:
item['retries'] = 1
self._q.put(item)

def _run_tasks(self):
while self._interval > 0:
log.debug("Running periodical check.")
try:
data = self._q.get_nowait()
except pqueue.Empty:
log.debug("Queue is empty now.")
data = None
if data:
method = getattr(data['object'], data['method'], None)
try:
method(**data['params'])
except Exception as ex:
log.debug("Failed to execute {}: {}, this message can be "
"safely ignored.".format(method.__name__,
ex))
if isinstance(ex, StoropsException):
# Re-enqueue since failed to execute
self.re_enqueue(data)
else:
log.error("Unexpected error occurs when executing {}:"
" {}, this job will not be executed"
" again".format(method.__name__, ex))
self._q.task_done()
time.sleep(self._interval)
log.info("{} with path {} has been "
"stopped.".format(self.__class__.__name__, self._q.path))
17 changes: 17 additions & 0 deletions storops/vnx/resource/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ def _init_file_cli(self):
self._file_username,
self._file_password)

def __getstate__(self):
d = {'ip': self._ip, 'username': self._username,
'password': self._password, 'scope': self._scope,
'sec_file': self._sec_file, 'naviseccli': self._naviseccli}
return d

def __setstate__(self, state):
self.__init__(heartbeat_interval=0, **state)

def __getinitargs__(self):
# Seems not used by pickle
return (self._ip, self._username, self._password, self._scope,
self._sec_file, None, None, self._naviseccli)

@property
@instance_cache
def _file_cli(self):
Expand Down Expand Up @@ -284,6 +298,9 @@ def create_cg(self, name, members=None):
def delete_cg(self, name):
self._delete_resource(VNXConsistencyGroup(name, self._cli))

def delete_lun(self, name):
self._delete_resource(VNXLun(name=name, cli=self._cli))

def get_disk(self, disk_index=None):
return VNXDisk.get(self._cli, disk_index)

Expand Down
95 changes: 95 additions & 0 deletions test/lib/test_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# coding=utf-8
# Copyright (c) 2016 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

from __future__ import unicode_literals

import shutil
from unittest import TestCase
import tempfile
from hamcrest import assert_that, equal_to

from storops.lib import tasks
from test.vnx.cli_mock import patch_cli, t_vnx
import time


class TestPQueue(TestCase):

def setUp(self):
self.path = tempfile.mkdtemp(suffix='storops')
self.q = tasks.PQueue(self.path)

def tearDown(self):
self.q.stop()
self.q = None
time.sleep(0.5)
shutil.rmtree(self.path)

def test_queue_path(self):
assert_that(self.q.path, equal_to(self.path))

def test_put(self):
fake_vnx = t_vnx()
self.q.put(fake_vnx.delete_lun, name='l1')
self.q.task_done()

def test_get(self):
fake_vnx = t_vnx()
self.q.put(fake_vnx.delete_lun, name='l1')

pickled_item = self.q.get()
self.q.task_done()
assert_that(pickled_item['object']._ip, equal_to(fake_vnx._ip))
assert_that(pickled_item['method'], equal_to('delete_lun'))
assert_that(pickled_item['params']['name'], equal_to('l1'))

@patch_cli
def test_run_tasks(self):
self.q.set_interval(0.01)
fake_vnx = t_vnx()
self.q.put(fake_vnx.delete_lun, name='l1')
self.q.start()
time.sleep(0.2)

def test_re_enqueue(self):
fake_vnx = t_vnx()
item = {'object': fake_vnx, 'method': 'delete_lun',
'params': {'name': 'l1'}}
self.q.re_enqueue(item)
assert_that(item['retries'], equal_to(1))

def test_re_enqueue_max_retries(self):
fake_vnx = t_vnx()
item = {'object': fake_vnx, 'method': 'delete_lun', 'params': 'l1'}
for i in range(100):
self.q.re_enqueue(item)
self.q.get(block=False)
self.q.task_done()

self.q.re_enqueue(item)
assert_that(item['retries'], equal_to(100))

@patch_cli
def test_enqueue_expected_error(self):
self.q.set_interval(0.1)
fake_vnx = t_vnx()
uid = '00:00:00:00:00:00:00:00:00:00:00:00:00:00:00:01'
self.q.put(fake_vnx.delete_hba, hba_uid=uid)
self.q.start()
time.sleep(0.2)
reenqueued_item = self.q.get()
self.q.task_done()
assert_that(uid, equal_to(reenqueued_item['params']['hba_uid']))

0 comments on commit fc0eeba

Please sign in to comment.