Skip to content

Commit

Permalink
fix: misc
Browse files Browse the repository at this point in the history
  • Loading branch information
dmyersturnbull committed Nov 1, 2021
1 parent 81ca098 commit b621574
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 39 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ Tools.is_lambda(lambda: None) # True
Tools.longest(["a", "a+b"]) # "a+b" # anything with len
Tools.only([1, 2]) # error -- multiple items
Tools.first(iter([])) # None <-- better than try: next(iter(x)) except:...
Tools.trace_signals(sink=sys.stderr) # log traceback on all signals
Tools.trace_exit(sink=sys.stderr) # log traceback on exit
# lots of others
```

Expand Down
29 changes: 25 additions & 4 deletions pocketutils/core/dot_dict.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from __future__ import annotations

import pickle
import sys
from copy import copy
from datetime import date, datetime
from pathlib import Path, PurePath
from typing import Any, ByteString, Callable, Mapping, Optional, Sequence
from typing import Any, ByteString, Callable, Mapping, Optional, Sequence, Iterable, Collection
from typing import Tuple as Tup
from typing import Type, TypeVar, Union

Expand Down Expand Up @@ -142,6 +143,28 @@ def to_toml(self) -> str:
"""
return toml.dumps(self._x)

def n_elements_total(self) -> int:
return len(self._all_elements())

def n_bytes_total(self) -> int:
return sum([sys.getsizeof(x) for x in self._all_elements()])

def _all_elements(self) -> Sequence[Any]:
i = []
for key, value in self._x.items():
if value is not None and isinstance(value, Mapping):
i += NestedDotDict(value)._all_elements()
elif (
value is not None
and isinstance(value, Collection)
and not isinstance(value, str)
and not isinstance(value, ByteString)
):
i += list(value)
else:
i.append(value)
return i

def leaves(self) -> Mapping[str, Any]:
"""
Gets the leaves in this tree.
Expand All @@ -151,9 +174,7 @@ def leaves(self) -> Mapping[str, Any]:
"""
mp = {}
for key, value in self._x.items():
if len(key) == 0:
raise AssertionError(f"Key is empty (value={value})")
if isinstance(value, dict):
if value is not None and isinstance(value, Mapping):
mp.update({key + "." + k: v for k, v in NestedDotDict(value).leaves().items()})
else:
mp[key] = value
Expand Down
27 changes: 22 additions & 5 deletions pocketutils/core/query_utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
import random
import time
from typing import Callable, Mapping, Optional
from dataclasses import dataclass
from typing import Callable, Mapping, Optional, ByteString
from urllib import request
from datetime import timedelta


def download_urllib(req: request.Request) -> bytes:
with request.urlopen(req) as q:
return q.read()


@dataclass(frozen=True, repr=True, order=True)
class TimeTaken:
query: timedelta
wait: timedelta


class QueryExecutor:
"""
A synchronous GET/POST query executor that limits the rate of requests.
Expand All @@ -19,14 +27,19 @@ def __init__(
sec_delay_min: float = 0.25,
sec_delay_max: float = 0.25,
encoding: Optional[str] = "utf-8",
querier: Optional[Callable[[request.Request], bytes]] = None,
querier: Optional[Callable[[request.Request], ByteString]] = None,
):
self._min = sec_delay_min
self._max = sec_delay_max
self._rand = random.Random() # nosec
self._encoding = encoding
self._next_at = 0
self._querier = download_urllib if querier is None else querier
self._time_taken = None

@property
def last_time_taken(self) -> TimeTaken:
return self._time_taken

def __call__(
self,
Expand All @@ -39,16 +52,20 @@ def __call__(
headers = {} if headers is None else headers
encoding = self._encoding if encoding == "-1" else encoding
now = time.monotonic()
wait_secs = self._next_at - now
if now < self._next_at:
time.sleep(self._next_at - now)
time.sleep(wait_secs)
now = time.monotonic()
req = request.Request(url=url, method=method, headers=headers)
content = self._querier(req)
if encoding is None:
data = content.decode(errors=errors)
else:
data = content.decode(encoding=encoding, errors=errors)
self._next_at = time.monotonic() + self._rand.uniform(self._min, self._max)
now_ = time.monotonic()
self._time_taken = TimeTaken(timedelta(seconds=wait_secs), timedelta(seconds=now_ - now))
self._next_at = now_ + self._rand.uniform(self._min, self._max)
return data


__all__ = ["QueryExecutor"]
__all__ = ["QueryExecutor", "TimeTaken"]
76 changes: 49 additions & 27 deletions pocketutils/misc/loguru_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@
import logging
import os
import sys
import traceback
import traceback as _traceback
from collections import deque
from dataclasses import dataclass
from functools import partialmethod
from inspect import cleandoc
from pathlib import Path
from typing import (
Expand Down Expand Up @@ -62,7 +61,7 @@
<cyan>({thread.id}){name}</cyan><bold>:</bold>
<cyan>{function}</cyan><bold>:</bold>
<cyan>{line}</cyan><bold> — </bold>
<level>{message}{{EXTRA}}</level>
<level>{message}{{EXTRA}}{{TRACEBACK}}</level>
{exception}
"""
).replace("\n", "")
Expand All @@ -76,10 +75,10 @@ class _SENTINEL:
Z = TypeVar("Z", covariant=True, bound=Logger)


def _add_traceback(record):
def log_traceback(record):
extra = record["extra"]
if extra.get("traceback", False):
extra["traceback"] = "\n" + "".join(traceback.format_stack())
extra["traceback"] = "\n" + "".join(_traceback.format_stack())
else:
extra["traceback"] = ""

Expand Down Expand Up @@ -112,10 +111,19 @@ def wrap_extended_fmt(
eq_sign: str = " ",
) -> Callable[[Mapping[str, Any]], str]:
def FMT(record: Mapping[str, Any]) -> str:
extra = sep.join([e + eq_sign + "{extra[" + e + "]}" for e in record["extra"].keys()])
extra = [e for e in record["extra"] if e != "traceback"]
if len(extra) > 0:
extra = sep.join([e + eq_sign + "{extra[" + e + "]}" for e in extra])
extra = f" [ {extra} ]"
return fmt.replace("{{EXTRA}}", extra) + os.linesep
else:
extra = ""
f = fmt.replace("{{EXTRA}}", extra)
tb = record["extra"].get("traceback", False)
if tb:
f = f.replace("{{TRACEBACK}}", "{extra[traceback]}")
else:
f = f.replace("{{TRACEBACK}}", "")
return f + os.linesep

return FMT

Expand Down Expand Up @@ -203,11 +211,11 @@ def level(self) -> str:

@property
def fmt_simplified(self):
return self.wrap_plain_fmt()
return self.wrap_extended_fmt()

@property
def fmt_built_in(self):
return self.wrap_extended_fmt()
return self.wrap_plain_fmt()

@property
def fmt_built_in_raw(self):
Expand Down Expand Up @@ -333,7 +341,7 @@ def __init__(self, logger: T = _logger):

@staticmethod
def new(t: Type[Z]) -> FancyLoguru[Z]:
ell = _logger.patch(_add_traceback)
ell = _logger.patch(log_traceback)
logger = t(ell._core, *ell._options)
return FancyLoguru[Z](logger)

Expand Down Expand Up @@ -515,6 +523,7 @@ def intercept_std(self, *, warnings: bool = True) -> __qualname__:
logging.basicConfig(handlers=[InterceptHandler()], level=0, encoding="utf-8")
if warnings:
logging.captureWarnings(True)
return self

def config_main(
self,
Expand Down Expand Up @@ -567,7 +576,9 @@ def remember(self, *, n_messages: int = 100) -> __qualname__:
extant = self.recent_messages
self._rememberer = Rememberer(n_messages)
self._rememberer.hid = self._logger.add(
self._rememberer, level="TRACE", format=self._main.fmt
self._rememberer,
level="TRACE",
format=self._defaults.fmt_simplified if self._main is None else self._main.fmt,
)
for msg in extant:
self._rememberer(msg)
Expand Down Expand Up @@ -670,23 +681,18 @@ def from_cli(
_msg_level = self._aliases.get(_msg_level.upper(), _msg_level.upper())
if not self._control_enabled:
if self._main is not None:
self._main.level = _msg_level
self._main.level = _msg_level # just set
return self
if main is _SENTINEL:
main = None
if main is None and self._main is None:
main = self._defaults.level
elif main is None:
main = self._main.level
main = self._aliases.get(main.upper(), main.upper())
if main not in self._defaults.levels_extended:
_permitted = ", ".join(
[*self._defaults.levels_extended, *self._defaults.aliases.keys()]
)
raise XValueError(
f"{main.lower()} not a permitted log level (allowed: {_permitted}", value=main
)
self.config_main(level=main)
if main is not None and main is not _SENTINEL:
main = self._aliases.get(main.upper(), main.upper())
if main not in self._defaults.levels_extended:
_permitted = ", ".join(
[*self._defaults.levels_extended, *self._defaults.aliases.keys()]
)
raise XValueError(
f"{main.lower()} not a permitted log level (allowed: {_permitted}", value=main
)
self.config_main(level=main)
self.remove_paths()
if path is not None and len(str(path)) > 0:
match = _LOGGER_ARG_PATTERN.match(str(path))
Expand Down Expand Up @@ -813,6 +819,22 @@ def extended(
FANCY_LOGURU_DEFAULTS = _Defaults()


if __name__ == "__main__":
_logger.remove(None)
lg = (
FancyLoguru.new(LoggerWithCautionAndNotice)
.set_control(False)
.set_control(True)
.config_main(fmt=FANCY_LOGURU_DEFAULTS.fmt_simplified)
.intercept_std()
)
lg.from_cli(path="nope.log.tmp")

with lg.logger.contextualize(omg="why"):
lg.logger.info("hello", traceback=True)
print(lg.recent_messages)


__all__ = [
"FancyLoguru",
"FANCY_LOGURU_DEFAULTS",
Expand Down
2 changes: 2 additions & 0 deletions pocketutils/tools/all_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from pocketutils.tools.reflection_tools import ReflectionTools
from pocketutils.tools.string_tools import StringTools
from pocketutils.tools.unit_tools import UnitTools
from pocketutils.tools.sys_tools import SystemTools


class Tools(
Expand All @@ -23,6 +24,7 @@ class Tools(
UnitTools,
LoopTools,
ReflectionTools,
SystemTools,
):
"""
A collection of utility static functions.
Expand Down
10 changes: 8 additions & 2 deletions pocketutils/tools/base_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
Tuple,
TypeVar,
Union,
ByteString,
)

from pocketutils.core.input_output import Writeable
Expand Down Expand Up @@ -187,13 +188,18 @@ def to_true_iterable(cls, s: Any) -> Iterable[Any]:
def is_true_iterable(cls, s: Any) -> bool:
"""
Returns whether ``s`` is a probably "proper" iterable.
In other words, iterable but not a string or bytes
In other words, iterable but not a string or bytes.
.. caution::
This is not fully reliable.
Types that do not define ``__iter__`` but are iterable
via ``__getitem__`` will not be included.
"""
return (
s is not None
and isinstance(s, Iterable)
and not isinstance(s, str)
and not isinstance(s, bytes)
and not isinstance(s, ByteString)
)

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ build-backend = "poetry.masonry.api"

[tool.poetry]
name = "pocketutils"
version = "0.8.3"
version = "0.8.4"
description = "Adorable little Python code for you to copy or import."
keywords = ["python", "snippets", "utils", "gists", "bioinformatics"]
authors = ["Douglas Myers-Turnbull"]
Expand Down
12 changes: 12 additions & 0 deletions tests/pocketutils/core/test_dot_dict.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,18 @@ def test_string(self):
assert lines[-1] == "}"
assert lines[1] == ' "a.b": 1,'

def test_size(self):
t = NestedDotDict(dict(a=dict(b=1), b=2, c=dict(a=dict(a=3))))
assert t.n_elements_total() == 3
t = NestedDotDict(dict(a=dict(b=1), b=[1, 2, 3], c=dict(a=dict(a=3))))
assert t.n_elements_total() == 5

def test_bytes(self):
t = NestedDotDict(dict(a=dict(b=1), b=2, c=dict(a=dict(a=3))))
assert t.n_bytes_total() == 84
t = NestedDotDict(dict(a=dict(b=1), b=[1, 2, 3], c=dict(a=dict(a=3))))
assert t.n_bytes_total() == 140

def test_as_exactly(self):
t = NestedDotDict({"zoo": {"animals": "jackets"}, "what": 0.1})
assert t.exactly("zoo.animals", str) == "jackets"
Expand Down
12 changes: 12 additions & 0 deletions tests/pocketutils/tools/test_sys_tools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import pytest

from pocketutils.tools.sys_tools import SystemTools


class TestSysTools:
def test_add_signals(self):
pass


if __name__ == "__main__":
pytest.main()

0 comments on commit b621574

Please sign in to comment.