Skip to content

Commit

Permalink
minor cleanup / standardization (#84)
Browse files Browse the repository at this point in the history
* a few minor cleanups
* additional export work
* more __bytes__ and export standardization
* more load and mmap simplifications
* minor update to MMap class
* add countmin-sketch bytes test
  • Loading branch information
barrust authored Dec 28, 2021
1 parent d172fbd commit 9ccbaa2
Show file tree
Hide file tree
Showing 14 changed files with 464 additions and 357 deletions.
13 changes: 4 additions & 9 deletions probables/blooms/basebloom.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,15 +254,11 @@ def __load(
rep = self.__impt_type * self.bloom_length
self._bloom = list(unpack(rep, file.read(offset)))

def loads(self, d: ByteString) -> None:
with BytesIO(d) as f:
self.__load(f)

def _parse_footer(self, stct: Struct, d: ByteString) -> float:
tmp_data = stct.unpack_from(d)
tmp_data = stct.unpack_from(bytearray(d))
self.__est_elements = tmp_data[0]
self._els_added = tmp_data[1]
fpr = tmp_data[2]
fpr = float(tmp_data[2])
return fpr

def _load_hex(self, hex_string: str, hash_function: typing.Optional[HashFuncT] = None) -> None:
Expand Down Expand Up @@ -311,7 +307,7 @@ def export(self, file: typing.Union[Path, str, IOBase, mmap]) -> None:

if not isinstance(file, (IOBase, mmap)):
with open(file, "wb") as filepointer:
self.export(filepointer)
self.export(filepointer) # type:ignore
else:
rep = self.__impt_type * self.bloom_length
file.write(pack(rep, *self.bloom))
Expand All @@ -324,8 +320,7 @@ def export(self, file: typing.Union[Path, str, IOBase, mmap]) -> None:
)

def __bytes__(self) -> bytes:
"""Export cuckoo filter to `bytes`"""

"""Export bloom filter to `bytes`"""
with BytesIO() as f:
self.export(f)
return f.getvalue()
Expand Down
5 changes: 4 additions & 1 deletion probables/blooms/bloom.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ def __del__(self) -> None:
"""handle if user doesn't close the on disk Bloom Filter"""
self.close()

def __bytes__(self) -> bytes:
return bytes(self._bloom)

def close(self) -> None:
"""Clean up the BloomFilterOnDisk object"""
if self.__file_pointer is not None:
Expand Down Expand Up @@ -332,7 +335,7 @@ def __load(self, filepath: str, hash_function: typing.Optional[HashFuncT] = None
self._on_disk = True
self.__filename = filepath # type: ignore

def export(self, filename: str) -> None:
def export(self, filename: str) -> None: # type: ignore
""" Export to disk if a different location
Args:
Expand Down
4 changes: 2 additions & 2 deletions probables/blooms/countingbloom.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def __str__(self) -> str:
els_added,
)

def add(self, key: KeyT, num_els: int = 1) -> int:
def add(self, key: KeyT, num_els: int = 1) -> int: # type: ignore
"""Add the key to the Counting Bloom Filter
Args:
Expand All @@ -118,7 +118,7 @@ def add(self, key: KeyT, num_els: int = 1) -> int:
hashes = self.hashes(key)
return self.add_alt(hashes, num_els)

def add_alt(self, hashes: HashResultsT, num_els: int = 1) -> int:
def add_alt(self, hashes: HashResultsT, num_els: int = 1) -> int: # type: ignore
""" Add the element represented by hashes into the Counting Bloom
Filter
Expand Down
39 changes: 28 additions & 11 deletions probables/blooms/expandingbloom.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@

import os
import typing
from io import BytesIO, IOBase
from mmap import mmap
from pathlib import Path
from struct import calcsize, pack, unpack

from ..exceptions import RotatingBloomFilterError
from ..hashes import HashFuncT, HashResultsT, KeyT
from ..utilities import is_valid_file
from ..utilities import MMap, is_valid_file
from .bloom import BloomFilter


Expand Down Expand Up @@ -65,6 +68,13 @@ def __contains__(self, key: KeyT) -> bool:
"""setup the `in` functionality"""
return self.check(key)

def __bytes__(self) -> bytes:
"""Export bloom filter to `bytes`"""

with BytesIO() as f:
self.export(f)
return f.getvalue()

@property
def expansions(self) -> int:
"""int: The number of expansions"""
Expand Down Expand Up @@ -155,17 +165,21 @@ def __check_for_growth(self):
if self._blooms[-1].elements_added >= self.__est_elements:
self.__add_bloom_filter()

def export(self, filepath: str) -> None:
def export(self, file: typing.Union[Path, str, IOBase, mmap]) -> None:
"""Export an expanding Bloom Filter, or subclass, to disk
Args:
filepath (str): The path to the file to import"""
with open(filepath, "wb") as fileobj:
if not isinstance(file, (IOBase, mmap)):
with open(file, "wb") as filepointer:
self.export(filepointer) # type:ignore
else:
filepointer = file # type:ignore
# add all the different Bloom bit arrays...
for blm in self._blooms:
rep = "Q" + "B" * blm.bloom_length
fileobj.write(pack(rep, blm.elements_added, *blm.bloom))
fileobj.write(
filepointer.write(pack(rep, blm.elements_added, *blm.bloom))
filepointer.write(
pack(
"QQQf",
len(self._blooms),
Expand All @@ -175,14 +189,17 @@ def export(self, filepath: str) -> None:
)
)

def __load(self, filename: str):
def __load(self, file: typing.Union[Path, str, IOBase, mmap]):
"""load a file"""
with open(filename, "rb") as fileobj:
if not isinstance(file, (IOBase, mmap)):
with MMap(file) as filepointer:
self.__load(filepointer)
else:
offset = calcsize("QQQf")
fileobj.seek(offset * -1, os.SEEK_END)
size, est_els, els_added, fpr = unpack("QQQf", fileobj.read(offset))
file.seek(offset * -1, os.SEEK_END)
size, est_els, els_added, fpr = unpack("QQQf", file.read(offset))

fileobj.seek(0, os.SEEK_SET)
file.seek(0, os.SEEK_SET)
# set the basic defaults
self._blooms = list()
self.__added_elements = els_added
Expand All @@ -197,7 +214,7 @@ def __load(self, filename: str):
# now we need to read in the correct number of bytes...
offset = calcsize("Q") + calcsize("B") * blm.bloom_length
rep = "Q" + "B" * blm.bloom_length
unpacked = list(unpack(rep, fileobj.read(offset)))
unpacked = list(unpack(rep, file.read(offset)))
blm._bloom = unpacked[1:]
blm.elements_added = unpacked[0]
self._blooms.append(blm)
Expand Down
62 changes: 39 additions & 23 deletions probables/countminsketch/countminsketch.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@
import math
import os
import typing
from io import BytesIO, IOBase
from mmap import mmap
from numbers import Number
from pathlib import Path
from struct import calcsize, pack, unpack

from ..constants import INT32_T_MAX, INT32_T_MIN, INT64_T_MAX, INT64_T_MIN
from ..exceptions import CountMinSketchError, InitializationError, NotSupportedError
from ..hashes import HashFuncT, HashResultsT, KeyT, default_fnv_1a
from ..utilities import is_valid_file
from ..utilities import MMap, is_valid_file


class CountMinSketch(object):
Expand Down Expand Up @@ -60,7 +63,7 @@ def __init__(
depth: typing.Optional[int] = None,
confidence: typing.Optional[float] = None,
error_rate: typing.Optional[float] = None,
filepath: typing.Optional[str] = None,
filepath: typing.Optional[typing.Union[str, Path]] = None,
hash_function: typing.Optional[HashFuncT] = None,
) -> None:
"""default initilization function"""
Expand Down Expand Up @@ -134,6 +137,12 @@ def __contains__(self, key: KeyT) -> bool:
"""setup the `in` keyword"""
return True if self.check(key) != 0 else False

def __bytes__(self) -> bytes:
"""Export countmin-sketch to `bytes`"""
with BytesIO() as f:
self.export(f)
return f.getvalue()

@property
def width(self) -> int:
"""int: The width of the count-min sketch
Expand Down Expand Up @@ -314,17 +323,20 @@ def check_alt(self, hashes: HashResultsT) -> int:
bins = self.__get_values_sorted(hashes)
return self.__query_method(bins)

def export(self, filepath: str) -> None:
def export(self, file: typing.Union[Path, str, IOBase, mmap]) -> None:
""" Export the count-min sketch to disk
Args:
filename (str): The filename to which the count-min sketch \
will be written. """
with open(filepath, "wb") as filepointer:
if not isinstance(file, (IOBase, mmap)):
with open(file, "wb") as filepointer:
self.export(filepointer) # type: ignore
else:
# write out the bins
rep = "i" * len(self._bins)
filepointer.write(pack(rep, *self._bins))
filepointer.write(pack("IIq", self.width, self.depth, self.elements_added))
file.write(pack(rep, *self._bins))
file.write(pack("IIq", self.width, self.depth, self.elements_added))

def join(self, second: "CountMinSketch") -> None:
""" Join two count-min sketchs into a single count-min sketch; the
Expand Down Expand Up @@ -369,23 +381,27 @@ def join(self, second: "CountMinSketch") -> None:
elif self.elements_added < INT64_T_MIN:
self.__elements_added = INT64_T_MIN

def __load(self, filepath: str):
def __load(self, file: typing.Union[Path, str, IOBase]):
"""load the count-min sketch from file"""
with open(filepath, "rb") as filepointer:
if not isinstance(file, (IOBase, mmap)):
file = Path(file)
with MMap(file) as filepointer:
self.__load(filepointer)
else:
offset = calcsize("IIq")
filepointer.seek(offset * -1, os.SEEK_END)
mybytes = unpack("IIq", filepointer.read(offset))
file.seek(offset * -1, os.SEEK_END)
mybytes = unpack("IIq", file.read(offset))
self.__width = mybytes[0]
self.__depth = mybytes[1]
self.__elements_added = mybytes[2]
self.__confidence = 1 - (1 / math.pow(2, self.depth))
self.__error_rate = 2 / self.width

filepointer.seek(0, os.SEEK_SET)
file.seek(0, os.SEEK_SET)
length = self.width * self.depth
rep = "i" * length
offset = calcsize(rep)
self._bins = list(unpack(rep, filepointer.read(offset)))
self._bins = list(unpack(rep, file.read(offset)))

def __get_values_sorted(self, hashes: HashResultsT) -> HashResultsT:
"""get the values sorted"""
Expand Down Expand Up @@ -540,12 +556,12 @@ def __init__(
depth: typing.Optional[int] = None,
confidence: typing.Optional[float] = None,
error_rate: typing.Optional[float] = None,
filepath: typing.Optional[str] = None,
filepath: typing.Optional[typing.Union[str, Path]] = None,
hash_function: typing.Optional[HashFuncT] = None,
) -> None:

super(HeavyHitters, self).__init__(width, depth, confidence, error_rate, filepath, hash_function)
self.__top_x = dict() # top x heavy hitters
self.__top_x = dict() # type: ignore
self.__top_x_size = 0
self.__num_hitters = num_hitters
self.__smallest = 0
Expand All @@ -572,7 +588,7 @@ def number_heavy_hitters(self) -> int:
Not settable"""
return self.__num_hitters

def add(self, key: str, num_els: int = 1) -> int:
def add(self, key: str, num_els: int = 1) -> int: # type: ignore
"""Add element to heavy hitters
Args:
Expand All @@ -585,7 +601,7 @@ def add(self, key: str, num_els: int = 1) -> int:
hashes = self.hashes(key)
return self.add_alt(key, hashes, num_els)

def add_alt(self, key: str, hashes: HashResultsT, num_els: int = 1) -> int:
def add_alt(self, key: str, hashes: HashResultsT, num_els: int = 1) -> int: # type: ignore
""" Add the element `key` represented as hashes to the HeavyHitters
object (hence the different signature on the function!)
Expand Down Expand Up @@ -643,7 +659,7 @@ def clear(self) -> None:
self.__top_x_size = 0
self.__smallest = 0

def join(self, second: "HeavyHitters"):
def join(self, second: "HeavyHitters"): # type: ignore
""" Join is not supported by HeavyHitters
Raises:
Expand All @@ -670,7 +686,7 @@ def __init__(
) -> None:
super(StreamThreshold, self).__init__(width, depth, confidence, error_rate, filepath, hash_function)
self.__threshold = threshold
self.__meets_threshold = dict()
self.__meets_threshold = dict() # type: ignore

def __str__(self) -> str:
"""stream threshold string rep"""
Expand All @@ -693,7 +709,7 @@ def clear(self) -> None:
super(StreamThreshold, self).clear()
self.__meets_threshold = dict()

def add(self, key: str, num_els: int = 1) -> int:
def add(self, key: str, num_els: int = 1) -> int: # type: ignore
"""Add the element for key into the data structure
Args:
Expand All @@ -706,7 +722,7 @@ def add(self, key: str, num_els: int = 1) -> int:
hashes = self.hashes(key)
return self.add_alt(key, hashes, num_els)

def add_alt(self, key: str, hashes: HashResultsT, num_els: int = 1) -> int:
def add_alt(self, key: str, hashes: HashResultsT, num_els: int = 1) -> int: # type: ignore
""" Add the element for key into the data structure
Args:
Expand All @@ -725,7 +741,7 @@ def add_alt(self, key: str, hashes: HashResultsT, num_els: int = 1) -> int:
self.__meets_threshold[key] = res
return res

def remove(self, key: str, num_els: int = 1) -> int:
def remove(self, key: str, num_els: int = 1) -> int: # type: ignore
""" Remove element 'key' from the count-min sketch
Args:
Expand All @@ -739,7 +755,7 @@ def remove(self, key: str, num_els: int = 1) -> int:
hashes = self.hashes(key)
return self.remove_alt(key, hashes, num_els)

def remove_alt(self, key: str, hashes: HashResultsT, num_els: int = 1) -> int:
def remove_alt(self, key: str, hashes: HashResultsT, num_els: int = 1) -> int: # type: ignore
""" Remove an element by using the hash representation
Args:
Expand All @@ -760,7 +776,7 @@ def remove_alt(self, key: str, hashes: HashResultsT, num_els: int = 1) -> int:
self.__meets_threshold[key] = res
return res

def join(self, second: "StreamThreshold"):
def join(self, second: "StreamThreshold"): # type: ignore
""" Join is not supported by StreamThreshold
Raises:
Expand Down
Loading

0 comments on commit 9ccbaa2

Please sign in to comment.