Skip to content

Commit

Permalink
[GH-46]Support persistent queue for storops
Browse files Browse the repository at this point in the history
With the support of this commit, user can:

* add storops operations to a queue, and they can be executed periodically till
operation executes correctly.
* unfinished items in the queue can be restored automatically after restart.
  • Loading branch information
peter-wangxu committed Nov 11, 2016
1 parent 51c7121 commit bb38052
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 0 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ retryz>=0.1.8
cachez>=0.1.0
six>=1.9.0
bitmath>=1.3.0
pqueue>=0.1.3
86 changes: 86 additions & 0 deletions storops/lib/queue_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# coding=utf-8
# Copyright (c) 2016 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import logging
import pqueue
import threading
import time
from storops.exception import StoropsException

__author__ = 'Peter Wang'
LOG = logging.getLogger(__name__)


class PQueue(object):
DEFAULT_INTERVAL = 300

def __init__(self, path, interval=None):
self.path = path
self._interval = interval if interval else self.DEFAULT_INTERVAL

def init_queue(self, started=False):
self._q = pqueue.Queue(self.path)
self.started = started
if started:
self._run()
return self

def put(self, item):
return self._q.put(item)

def get(self):
return self._q.get()

def start(self):
if not self.started:
self._run()
self.started = True
else:
LOG.info("PQueue[{}] had already started.".format(self.path))

def stop(self):
self._interval = 0

def set_interval(self, interval):
self._interval = interval

def _run(self):
self._t = threading.Thread(target=self._run_tasks)
self._t.start()

def _run_tasks(self):
while True and self._interval > 0:
LOG.debug("Running periodical check.")
try:
data = self._q.get_nowait()
except pqueue.Empty:
LOG.debug("Queue is empty now.")
data = None
if data and type(data) is dict:
method = getattr(data['object'], data['method'], None)
try:
method(data['param'])
except Exception as ex:
LOG.debug("Failed to execute {}: {}, this message can be "
"safely ignored.".format(method.__name__,
ex.message))
if type(ex) is StoropsException:
# Re-enqueue since failed to execute
self._q.put(data)
self._q.task_done()
time.sleep(self._interval)
LOG.info("{} with path {} has been "
"stopped.".format(self.__class__.__name__, self._q.path))
4 changes: 4 additions & 0 deletions storops/lib/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@


class Resource(JsonPrinter):

def __init__(self):
self._property_cache = {}
self._parsed_resource = None
Expand Down Expand Up @@ -180,6 +181,9 @@ def _get_raw_resource(self):
return ''

def __getattr__(self, item):
# To avoid infinite loop of accessing the nonexistent property
if '_property_cache' not in self.__dict__:
self.__dict__['_property_cache'] = {}
if item in self._property_cache:
ret = self._property_cache[item]
elif not item.startswith('_'):
Expand Down
17 changes: 17 additions & 0 deletions storops/vnx/resource/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ def _init_file_cli(self):
self._file_username,
self._file_password)

def __getstate__(self):
d = {'ip': self._ip, 'username': self._username,
'password': self._password, 'scope': self._scope,
'sec_file': self._sec_file, 'naviseccli': self._naviseccli}
return d

def __setstate__(self, state):
self.__init__(heartbeat_interval=0, **state)

def __getinitargs__(self):
# Seems not used by pickle
return (self._ip, self._username, self._password, self._scope,
self._sec_file, None, None, self._naviseccli)

@property
@instance_cache
def _file_cli(self):
Expand Down Expand Up @@ -284,6 +298,9 @@ def create_cg(self, name, members=None):
def delete_cg(self, name):
self._delete_resource(VNXConsistencyGroup(name, self._cli))

def delete_lun(self, name):
self._delete_resource(VNXLun(name=name, cli=self._cli))

def get_disk(self, disk_index=None):
return VNXDisk.get(self._cli, disk_index)

Expand Down
73 changes: 73 additions & 0 deletions test/lib/test_queue_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# coding=utf-8
# Copyright (c) 2016 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import unicode_literals

from unittest import TestCase

from hamcrest import assert_that

from storops.lib import queue_tasks
from test.vnx.cli_mock import patch_cli, t_vnx
import time


class TestPQueue(TestCase):

def _get_queue(self, path):
return queue_tasks.PQueue(path)

def test_init_queue(self):
q = self._get_queue('/tmp/TestPQueue')
pq = q.init_queue()
assert_that(pq.path, '/tmp/TestPQueue')

def test_put(self):
q = self._get_queue('/tmp/test_put')
q.init_queue()
fake_vnx = t_vnx()
item = {b'object': fake_vnx, b'method': b'delete_lun', b'param': b'lun-1'}
q.put(item)

def test_get(self):
q = self._get_queue('/tmp/test_get')
q.init_queue()
fake_vnx = t_vnx()
item = {b'object': fake_vnx, b'method': b'delete_lun', b'param': b'lun-1'}
q.put(item)

pickled_item = q.get()
assert_that(pickled_item['object']._ip, fake_vnx._ip)
assert_that(pickled_item['method'], 'delete_lun')
assert_that(pickled_item['param'], 'lun-1')

@patch_cli
def test_run(self):
q = self._get_queue('/tmp/test_run')
q.set_interval(0.01)
q.init_queue(started=True)
q.stop()

@patch_cli
def test_run_tasks(self):
q = self._get_queue('/tmp/test_run_tasks')
q.init_queue()
q.set_interval(0.5)
fake_vnx = t_vnx()
item = {b'object': fake_vnx, b'method': b'delete_lun', b'param': b'lun-1'}
q.put(item)
q.start()
time.sleep(0.02)
q.stop()

0 comments on commit bb38052

Please sign in to comment.