Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat:shared_utils #324

Open
wants to merge 2 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 16 additions & 146 deletions ovos_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,123 +11,28 @@
# limitations under the License.
#
import datetime
import re
from functools import lru_cache, wraps
from threading import Thread, Event
from time import monotonic_ns, sleep
import warnings
import kthread
from time import sleep

from ovos_utils.log import LOG


def threaded_timeout(timeout=5):
"""
Start a thread with a specified timeout. If timeout is exceeded, an
exception is raised and the thread is terminated.
Adapted from https://github.com/OpenJarbas/InGeo
@param timeout: Timeout in seconds to wait before terminating the process
"""

def deco(func):
@wraps(func)
def wrapper(*args, **kwargs):
res = [Exception(f'function [{func.__name__}] timeout '
f'[{timeout}] exceeded!')]

def func_wrapped():
try:
res[0] = func(*args, **kwargs)
except Exception as e:
res[0] = e

t = Thread(target=func_wrapped)
t.daemon = True
try:
t.start()
t.join(timeout)
except Exception as je:
raise je
ret = res[0]
if isinstance(ret, BaseException):
raise ret
return ret

return wrapper

return deco


class classproperty(property):
"""Decorator for a Class-level property.
Credit to Denis Rhyzhkov on Stackoverflow: https://stackoverflow.com/a/13624858/1280629"""

def __get__(self, owner_self, owner_cls):
return self.fget(owner_cls)


def timed_lru_cache(
_func=None, *, seconds: int = 7000, maxsize: int = 128, typed: bool = False
):
""" Extension over existing lru_cache with timeout

taken from: https://blog.soumendrak.com/cache-heavy-computation-functions-with-a-timeout-value

:param seconds: timeout value
:param maxsize: maximum size of the cache
:param typed: whether different keys for different types of cache keys
"""

def wrapper_cache(f):
# create a function wrapped with traditional lru_cache
f = lru_cache(maxsize=maxsize, typed=typed)(f)
# convert seconds to nanoseconds to set the expiry time in nanoseconds
f.delta = seconds * 10 ** 9
f.expiration = monotonic_ns() + f.delta

@wraps(f) # wraps is used to access the decorated function attributes
def wrapped_f(*args, **kwargs):
if monotonic_ns() >= f.expiration:
# if the current cache expired of the decorated function then
# clear cache for that function and set a new cache value with new expiration time
f.cache_clear()
f.expiration = monotonic_ns() + f.delta
return f(*args, **kwargs)

wrapped_f.cache_info = f.cache_info
wrapped_f.cache_clear = f.cache_clear
return wrapped_f

# To allow decorator to be used without arguments
if _func is None:
return wrapper_cache
else:
return wrapper_cache(_func)


def create_killable_daemon(target, args=(), kwargs=None, autostart=True):
"""Helper to quickly create and start a thread with daemon = True"""
t = kthread.KThread(target=target, args=args, kwargs=kwargs)
t.daemon = True
if autostart:
t.start()
return t


def create_daemon(target, args=(), kwargs=None, autostart=True):
"""Helper to quickly create and start a thread with daemon = True"""
t = Thread(target=target, args=args, kwargs=kwargs)
t.daemon = True
if autostart:
t.start()
return t
from ovos_utils.decorators import classproperty, timed_lru_cache
from ovos_utils.list_utils import flatten_list, rotate_list
from ovos_utils.log import LOG, log_deprecation
from ovos_utils.text_utils import camel_case_split
from ovos_utils.thread_utils import wait_for_exit_signal, threaded_timeout, create_killable_daemon, create_daemon


def create_loop(target, interval, args=(), kwargs=None):
"""
Helper to quickly create and start a thread with daemon = True
and repeat it every interval seconds
"""
warnings.warn(
"deprecated without replacement and will be removed in a future release.",
DeprecationWarning,
stacklevel=2,
)

log_deprecation("deprecated without replacement", "2.0.0")

def loop(*args, **kwargs):
try:
Expand All @@ -140,50 +45,15 @@ def loop(*args, **kwargs):
return create_daemon(loop, args, kwargs)


def wait_for_exit_signal():
"""Blocks until KeyboardInterrupt is received"""
try:
Event().wait()
except KeyboardInterrupt:
LOG.debug(f"Exiting on KeyboardInterrupt")


def get_handler_name(*args, **kwargs):
from ovos_utils.log import log_deprecation
log_deprecation("Import from `ovos_utils.events`", "0.1.0")
from ovos_utils.events import get_handler_name
def datestr2ts(datestr):
warnings.warn(
"Import from `ovos_utils.events`",
"deprecated without replacement and will be removed in a future release.",
DeprecationWarning,
stacklevel=2,
)
return get_handler_name(*args, **kwargs)


def camel_case_split(identifier: str) -> str:
"""Split camel case string"""
regex = '.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)'
matches = re.finditer(regex, identifier)
return ' '.join([m.group(0) for m in matches])

log_deprecation("deprecated without replacement", "2.0.0")

def rotate_list(l, n=1):
return l[n:] + l[:n]


def flatten_list(some_list, tuples=True):
_flatten = lambda l: [item for sublist in l for item in sublist]
if tuples:
while any(isinstance(x, list) or isinstance(x, tuple)
for x in some_list):
some_list = _flatten(some_list)
else:
while any(isinstance(x, list) for x in some_list):
some_list = _flatten(some_list)
return some_list


def datestr2ts(datestr):
y = int(datestr[:4])
m = int(datestr[4:6])
d = int(datestr[-2:])
Expand Down
71 changes: 71 additions & 0 deletions ovos_utils/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
from functools import lru_cache, wraps
from time import monotonic_ns
from typing import Callable, Optional, Any


class classproperty(property):
"""
A decorator for creating a class-level property.

Credit to Denis Rhyzhkov on Stackoverflow:
https://stackoverflow.com/a/13624858/1280629

Example:
class MyClass:
@classproperty
def my_property(cls):
return "Class-level property"
"""
def __get__(self, owner_self: Optional[object], owner_cls: type) -> Any:
return self.fget(owner_cls)


def timed_lru_cache(
_func: Optional[Callable] = None, *,
seconds: int = 7000, maxsize: int = 128, typed: bool = False
) -> Callable:
"""
A version of lru_cache with an added timeout feature. After the specified timeout (in seconds),
the cache is cleared and the function is recomputed.

taken from: https://gist.github.com/Morreski/c1d08a3afa4040815eafd3891e16b945

Args:
_func (Optional[Callable]): The function to cache, used when the decorator is called directly.
seconds (int): Timeout value in seconds. Default is 7000 seconds.
maxsize (int): Maximum size of the cache. Default is 128.
typed (bool): Whether to use different cache keys for different types of arguments. Default is False.

Returns:
Callable: A wrapped function that supports caching with a timeout.

Example:
@timed_lru_cache(seconds=3600)
def expensive_computation(arg):
# Some expensive computation here
return result
"""
def wrapper_cache(f: Callable) -> Callable:
# Create a function wrapped with traditional lru_cache
f = lru_cache(maxsize=maxsize, typed=typed)(f)
# Convert seconds to nanoseconds for cache expiration time
f.delta = seconds * 10 ** 9
f.expiration = monotonic_ns() + f.delta

@wraps(f) # wraps is used to access the decorated function's attributes
def wrapped_f(*args: Any, **kwargs: Any) -> Any:
if monotonic_ns() >= f.expiration:
# Clear the cache if expired and reset expiration time
f.cache_clear()
f.expiration = monotonic_ns() + f.delta
return f(*args, **kwargs)

wrapped_f.cache_info = f.cache_info
wrapped_f.cache_clear = f.cache_clear
return wrapped_f

# To allow decorator to be used without arguments
if _func is None:
return wrapper_cache
else:
return wrapper_cache(_func)
62 changes: 62 additions & 0 deletions ovos_utils/list_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from typing import List, Union, Any


def rotate_list(l: List[Any], n: int = 1) -> List[Any]:
"""
Rotate the elements of a list by a given number of positions.

Args:
l (List[Any]): The list to rotate.
n (int): The number of positions to rotate the list. Default is 1.

Returns:
List[Any]: The rotated list.

Example:
rotate_list([1, 2, 3], 1) -> [2, 3, 1]
"""
return l[n:] + l[:n]


def flatten_list(some_list: List[Union[List, tuple]], tuples: bool = True) -> List:
"""
Flatten a list of lists or tuples into a single list.

Args:
some_list (List[Union[List, tuple]]): The list to flatten.
tuples (bool): Whether to flatten both lists and tuples. Default is True.

Returns:
List: The flattened list.

Example:
flatten_list([[1, 2], [3, 4]]) -> [1, 2, 3, 4]
"""
_flatten = lambda l: [item for sublist in l for item in sublist]
if tuples:
while any(isinstance(x, (list, tuple)) for x in some_list):
some_list = _flatten(some_list)
else:
while any(isinstance(x, list) for x in some_list):
some_list = _flatten(some_list)
return some_list


def deduplicate_list(seq: List[str], keep_order: bool = True) -> List[str]:
"""
Deduplicate a list while optionally maintaining the original order.

Args:
seq (List[str]): The list to deduplicate.
keep_order (bool): Whether to preserve the order of elements. Default is True.

Returns:
List[str]: The deduplicated list.

Notes:
If `keep_order` is False, the function uses a set for faster deduplication.
"""
if not keep_order:
return list(set(seq))
else:
return list(dict.fromkeys(seq))
58 changes: 58 additions & 0 deletions ovos_utils/text_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import re
import string
import unicodedata

def camel_case_split(identifier: str) -> str:
"""
Split a camel case string into words.

Args:
identifier (str): The camel case string to split.

Returns:
str: A string with words separated by spaces.
"""
regex = '.+?(?:(?<=[a-z])(?=[A-Z])|(?<=[A-Z])(?=[A-Z][a-z])|$)'
matches = re.finditer(regex, identifier)
return ' '.join([m.group(0) for m in matches])

def collapse_whitespaces(text: str) -> str:
"""
Collapse multiple consecutive whitespace characters into a single space.

Args:
text (str): The input string.

Returns:
str: The string with collapsed whitespace.
"""
return re.sub(r'\s+', ' ', text)

def rm_parentheses(text: str) -> str:
"""
Remove text enclosed in parentheses from the given string.

Args:
text (str): Input string.

Returns:
str: String with parentheses and their contents removed.
"""
return re.sub(r"\((.*?)\)", "", text).replace(" ", " ")

def remove_accents_and_punct(input_str: str) -> str:
"""
Normalize the input string by removing accents and punctuation (except for '{' and '}').

Args:
input_str (str): The input string to be processed.

Returns:
str: The processed string with accents and punctuation removed.
"""
rm_chars = [c for c in string.punctuation if c not in ("{", "}")]
# Normalize to NFD (Normalization Form Decomposed), which separates characters and diacritical marks
nfkd_form = unicodedata.normalize('NFD', input_str)
# Remove characters that are not ASCII letters or punctuation we want to keep
return ''.join([char for char in nfkd_form
if unicodedata.category(char) != 'Mn' and char not in rm_chars])
Loading
Loading