Skip to content

Commit

Permalink
[GH-46]Support persistent queue for storops
Browse files Browse the repository at this point in the history
With the support of this commit, user can:

* add storops operations to a queue, and they can be executed periodically till
operation executes correctly.
* unfinished items in the queue can be restored automatically after restart.
  • Loading branch information
peter-wangxu committed Nov 22, 2016
1 parent e797cbd commit 0f561e1
Show file tree
Hide file tree
Showing 10 changed files with 278 additions and 26 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ retryz>=0.1.8
cachez>=0.1.0
six>=1.9.0
bitmath>=1.3.0
queuelib
39 changes: 23 additions & 16 deletions storops/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ def error_code(self):
return ret


class VNXObjectNotFoundError(object):
pass


class VNXBackendError(VNXException):
message_template = "vnx backend error. {err}"

Expand Down Expand Up @@ -503,8 +507,8 @@ class NaviseccliNotAvailableError(VNXException):
" and available in path.")


class VNXObjectNotFound(VNXException):
message_template = "object is not found. {err}"
class VNXNasObjectNotFound(VNXException, VNXObjectNotFoundError):
message_template = "NAS object is not found. {err}"


class VNXSetArrayNameError(VNXException):
Expand Down Expand Up @@ -588,7 +592,7 @@ class VNXAluAlreadyAttachedError(VNXAttachAluError):


@cli_exception
class VNXAluNotFoundError(VNXAttachAluError):
class VNXAluNotFoundError(VNXAttachAluError, VNXObjectNotFoundError):
error_message = 'The ALU number specified by user is not a bound'


Expand All @@ -602,7 +606,7 @@ class VNXDetachAluError(VNXStorageGroupError):


@cli_exception
class VNXDetachAluNotFoundError(VNXDetachAluError):
class VNXDetachAluNotFoundError(VNXDetachAluError, VNXObjectNotFoundError):
error_message = 'No such Host LUN in this Storage Group'


Expand All @@ -620,7 +624,8 @@ class VNXDeleteStorageGroupError(VNXStorageGroupError):


@cli_exception
class VNXStorageGroupNotFoundError(VNXStorageGroupError):
class VNXStorageGroupNotFoundError(VNXStorageGroupError,
VNXObjectNotFoundError):
error_message = ('The group name or UID does not match any '
'storage groups for this array')

Expand Down Expand Up @@ -698,7 +703,7 @@ class VNXSnapNameInUseError(VNXSnapError):


@cli_exception
class VNXCreateSnapResourceNotFoundError(VNXSnapError):
class VNXCreateSnapResourceNotFoundError(VNXSnapError, VNXObjectNotFoundError):
error_message = 'The specified resource does not exist.'


Expand Down Expand Up @@ -752,7 +757,7 @@ class VNXLunPreparingError(VNXLunError):


@cli_exception
class VNXLunNotFoundError(VNXLunError):
class VNXLunNotFoundError(VNXLunError, VNXObjectNotFoundError):
error_message = 'Could not retrieve the specified (pool lun).'


Expand Down Expand Up @@ -821,7 +826,8 @@ class VNXConsistencyGroupNameInUseError(VNXCreateConsistencyGroupError):


@cli_exception
class VNXConsistencyGroupNotFoundError(VNXConsistencyGroupError):
class VNXConsistencyGroupNotFoundError(VNXConsistencyGroupError,
VNXObjectNotFoundError):
error_message = 'Cannot find the consistency group'


Expand All @@ -835,7 +841,7 @@ class VNXDeleteHbaError(VNXException):


@cli_exception
class VNXDeleteHbaNotFoundError(VNXException):
class VNXDeleteHbaNotFoundError(VNXException, VNXObjectNotFoundError):
error_message = 'The HBA UID specified is not known by the array'


Expand Down Expand Up @@ -869,7 +875,7 @@ class VNXUserNameInUseError(VNXSecurityException):


@cli_exception
class VNXUserNotFoundError(VNXSecurityException):
class VNXUserNotFoundError(VNXSecurityException, VNXObjectNotFoundError):
error_message = 'User does not exist'


Expand Down Expand Up @@ -917,7 +923,7 @@ class VNXPoolDestroyingError(VNXDeletePoolError):


@cli_exception
class VNXPoolNotFoundError(VNXPoolError):
class VNXPoolNotFoundError(VNXPoolError, VNXObjectNotFoundError):
error_message = ['The (Storagepool) may not exist',
'was not found in any provider']

Expand Down Expand Up @@ -947,7 +953,7 @@ class VNXMirrorAlreadyMirroredError(VNXMirrorException):


@cli_exception
class VNXMirrorImageNotFoundError(VNXMirrorException):
class VNXMirrorImageNotFoundError(VNXMirrorException, VNXObjectNotFoundError):
error_message = 'Image not found'


Expand Down Expand Up @@ -982,7 +988,7 @@ class VNXMirrorFeatureNotAvailableError(VNXMirrorException):


@cli_exception
class VNXMirrorNotFoundError(VNXMirrorException):
class VNXMirrorNotFoundError(VNXMirrorException, VNXObjectNotFoundError):
error_message = 'Mirror not found'


Expand All @@ -1002,7 +1008,7 @@ class VNXGateWayError(VNXException):


@cli_exception
class VNXVirtualPortNotFoundError(VNXPortError):
class VNXVirtualPortNotFoundError(VNXPortError, VNXObjectNotFoundError):
error_message = 'Request failed. Specified virtual port not found.'


Expand Down Expand Up @@ -1035,7 +1041,7 @@ class VNXFsExistedError(VNXFsError):


@xmlapi_exception
class VNXFsNotFoundError(VNXFsError):
class VNXFsNotFoundError(VNXFsError, VNXObjectNotFoundError):
error_code = 18522112101


Expand All @@ -1053,7 +1059,8 @@ class VNXMoverInterfaceError(VNXException):


@xmlapi_exception
class VNXMoverInterfaceNotFoundError(VNXMoverInterfaceError):
class VNXMoverInterfaceNotFoundError(VNXMoverInterfaceError,
VNXObjectNotFoundError):
error_code = 13691781134


Expand Down
4 changes: 4 additions & 0 deletions storops/lib/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@


class Resource(JsonPrinter):

def __init__(self):
self._property_cache = {}
self._parsed_resource = None
Expand Down Expand Up @@ -180,6 +181,9 @@ def _get_raw_resource(self):
return ''

def __getattr__(self, item):
# To avoid infinite loop of accessing the nonexistent property
if '_property_cache' not in self.__dict__:
raise AttributeError(item)
if item in self._property_cache:
ret = self._property_cache[item]
elif not item.startswith('_'):
Expand Down
115 changes: 115 additions & 0 deletions storops/lib/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
# coding=utf-8
# Copyright (c) 2016 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import logging
import queuelib
import pickle
import threading
import time
from storops.exception import StoropsException
from storops.exception import VNXObjectNotFoundError

__author__ = 'Peter Wang'
log = logging.getLogger(__name__)


class PQueue(object):
DEFAULT_INTERVAL = 300
MAX_RETRIES = 100

def __init__(self, path, interval=None):
self.path = path
self._q = queuelib.FifoDiskQueue(self.path)
self._interval = (
self.DEFAULT_INTERVAL if interval is None else interval)
self.started = False

def put(self, func, **kwargs):
item = {'object': func.__self__, 'method': func.__name__,
'params': kwargs}
self._q.push(self._dumps(item))

def get(self):
item = self._q.pop()
return self._loads(item) if item else None

def start(self):
if not self.started:
self._run()
self.started = True
else:
log.info("PQueue[{}] had already started.".format(self.path))

def stop(self):
self._interval = 0
self.started = False

def _dumps(self, obj):
return pickle.dumps(obj)

def _loads(self, pickle_bytes):
return pickle.loads(pickle_bytes)

def set_interval(self, interval):
self._interval = interval

def _run(self):
self._t = threading.Thread(target=self._run_tasks)
self._t.setDaemon(True)
self._t.start()

def re_enqueue(self, item):
"""Re-enqueue till reach max retries."""
if 'retries' in item:
retries = item['retries']
if retries >= self.MAX_RETRIES:
log.warn("Failed to execute {} after {} retries, give it "
" up.".format(item['method'], retries))
else:
retries += 1
item['retries'] = retries
self._q.push(self._dumps(item))
else:
item['retries'] = 1
self._q.push(self._dumps(item))

def _run_tasks(self):
while self._interval > 0:
log.debug("Running periodical check.")
data = self.get()
if not data:
log.debug("Queue is empty now.")
else:
method = getattr(data['object'], data['method'], None)
try:
method(**data['params'])
except Exception as ex:
log.debug("Failed to execute {}: {}, this message can be "
"safely ignored.".format(method.__name__,
ex))
if isinstance(ex, VNXObjectNotFoundError):
log.info("Object had been deleted: {}, this message "
"can be safely ignored.".format(ex))
elif isinstance(ex, StoropsException):
# Re-enqueue since failed to execute
self.re_enqueue(data)
else:
log.error("Unexpected error occurs when executing {}:"
" {}, this job will not be executed"
" again".format(method.__name__, ex))
time.sleep(self._interval)
log.info("{} with path {} has been "
"stopped.".format(self.__class__.__name__, self._q.path))
6 changes: 3 additions & 3 deletions storops/vnx/nas_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@

from storops.connection import connector
from storops.exception import VNXBackendError, VNXLockRequiredException, \
VNXObjectNotFound, VNXInvalidMoverID, VNXException, get_xmlapi_exception, \
VNXFileCredentialError
VNXNasObjectNotFound, VNXInvalidMoverID, VNXException, \
get_xmlapi_exception, VNXFileCredentialError
from storops.lib.common import Enum, check_int
from storops.lib.converter import to_int, to_hex
from storops.vnx.nas_cmd import NasCommand
Expand Down Expand Up @@ -471,7 +471,7 @@ def raise_if_err(self, desc=None):
def raise_if_no_object(self, desc=None):
if not self.objects:
msg = self.get_status_msg(desc)
raise VNXObjectNotFound(err=msg)
raise VNXNasObjectNotFound(err=msg)

def is_ok(self):
return self.status in (XmlStatus.DEBUG, XmlStatus.INFO, XmlStatus.OK)
Expand Down
7 changes: 4 additions & 3 deletions storops/vnx/resource/mirror_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# under the License.
from __future__ import unicode_literals

from storops.exception import VNXObjectNotFound, raise_if_err, \
from storops.exception import raise_if_err, \
VNXMirrorException, VNXMirrorImageNotFoundError
from storops.lib.common import check_text, instance_cache
from storops.vnx.enums import VNXMirrorViewRecoveryPolicy
Expand Down Expand Up @@ -93,8 +93,9 @@ def get_image(self, image_id):
ret = image
break
else:
raise VNXObjectNotFound('image {} not found in mirror view {}.'
.format(image_id, self._get_name()))
raise VNXMirrorImageNotFoundError(
'image {} not found in mirror view {}.'.format(
image_id, self._get_name()))
return ret

@staticmethod
Expand Down
17 changes: 17 additions & 0 deletions storops/vnx/resource/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,20 @@ def _init_file_cli(self):
self._file_username,
self._file_password)

def __getstate__(self):
d = {'ip': self._ip, 'username': self._username,
'password': self._password, 'scope': self._scope,
'sec_file': self._sec_file, 'naviseccli': self._naviseccli}
return d

def __setstate__(self, state):
self.__init__(heartbeat_interval=0, **state)

def __getinitargs__(self):
# Seems not used by pickle
return (self._ip, self._username, self._password, self._scope,
self._sec_file, None, None, self._naviseccli)

@property
@instance_cache
def _file_cli(self):
Expand Down Expand Up @@ -284,6 +298,9 @@ def create_cg(self, name, members=None):
def delete_cg(self, name):
self._delete_resource(VNXConsistencyGroup(name, self._cli))

def delete_lun(self, name):
self._delete_resource(VNXLun(name=name, cli=self._cli))

def get_disk(self, disk_index=None):
return VNXDisk.get(self._cli, disk_index)

Expand Down
Loading

0 comments on commit 0f561e1

Please sign in to comment.