From 4c56240ad8f8a6bcdcd33868de29899ee70f68d3 Mon Sep 17 00:00:00 2001 From: David Stansby Date: Sun, 1 Sep 2024 11:07:40 +0100 Subject: [PATCH] Replace c-blosc extension --- numcodecs/__init__.py | 2 - numcodecs/{blosc_v2.py => blosc.py} | 82 +++- numcodecs/blosc.pyx | 599 ---------------------------- numcodecs/tests/test_blosc.py | 91 +---- numcodecs/tests/test_blosc_v2.py | 286 ------------- setup.py | 78 +--- 6 files changed, 74 insertions(+), 1064 deletions(-) rename numcodecs/{blosc_v2.py => blosc.py} (71%) delete mode 100644 numcodecs/blosc.pyx delete mode 100644 numcodecs/tests/test_blosc_v2.py diff --git a/numcodecs/__init__.py b/numcodecs/__init__.py index b4b983ce6..46983124e 100644 --- a/numcodecs/__init__.py +++ b/numcodecs/__init__.py @@ -49,9 +49,7 @@ ncores = multiprocessing.cpu_count() except OSError: # pragma: no cover ncores = 1 -blosc._init() blosc.set_nthreads(min(8, ncores)) -atexit.register(blosc.destroy) from numcodecs import zstd as zstd from numcodecs.zstd import Zstd diff --git a/numcodecs/blosc_v2.py b/numcodecs/blosc.py similarity index 71% rename from numcodecs/blosc_v2.py rename to numcodecs/blosc.py index af3d063b5..bd76435cd 100644 --- a/numcodecs/blosc_v2.py +++ b/numcodecs/blosc.py @@ -18,32 +18,32 @@ """ -from numcodecs.abc import Codec import numpy as np import blosc from blosc import ( BITSHUFFLE, - SHUFFLE, - NOSHUFFLE, MAX_BUFFERSIZE, MAX_THREADS, MAX_TYPESIZE, - VERSION_STRING, + NOSHUFFLE, + SHUFFLE, VERSION_DATE, + VERSION_STRING, ) +from numcodecs.abc import Codec __all__ = [ "BITSHUFFLE", - "SHUFFLE", - "NOSHUFFLE", "MAX_BUFFERSIZE", "MAX_THREADS", "MAX_TYPESIZE", - "VERSION_STRING", + "NOSHUFFLE", + "SHUFFLE", "VERSION_DATE", - "list_compressors", + "VERSION_STRING", 'get_nthreads', + "list_compressors", ] AUTOBLOCKS = 0 @@ -52,20 +52,30 @@ def list_compressors() -> list[str]: + """Get a list of compressors supported in blosc.""" return blosc.compressor_list() def get_nthreads() -> int: + """ + Get the number of threads that Blosc uses internally for compression and + decompression. + """ nthreads = blosc.set_nthreads(1) blosc.set_nthreads(nthreads) return nthreads def set_nthreads(nthreads: int) -> None: + """ + Set the number of threads that Blosc uses internally for compression and + decompression. + """ blosc.set_nthreads(nthreads) -def cbuffer_complib(source): +def cbuffer_complib(source) -> str: + """Return the name of the compression library used to compress `source`.""" return blosc.get_clib(source) @@ -86,6 +96,32 @@ def _check_buffer_size(buf, max_buffer_size): def compress(source, cname: str, clevel: int, shuffle: int = SHUFFLE, blocksize=AUTOBLOCKS): + """ + Compress data. + + Parameters + ---------- + source : bytes-like + Data to be compressed. Can be any object supporting the buffer + protocol. + cname : bytes + Name of compression library to use. + clevel : int + Compression level. + shuffle : int + Either NOSHUFFLE (0), SHUFFLE (1), BITSHUFFLE (2) or AUTOSHUFFLE (-1). If AUTOSHUFFLE, + bit-shuffle will be used for buffers with itemsize 1, and byte-shuffle will + be used otherwise. The default is `SHUFFLE`. + blocksize : int + The requested size of the compressed blocks. If 0, an automatic blocksize will + be used. + + Returns + ------- + dest : bytes + Compressed data. + + """ if shuffle == AUTOSHUFFLE: if source.itemsize == 1: shuffle = BITSHUFFLE @@ -109,6 +145,23 @@ def compress(source, cname: str, clevel: int, shuffle: int = SHUFFLE, blocksize= def decompress(source, dest: np.ndarray | bytearray | None = None): + """ + Decompress data. + + Parameters + ---------- + source : bytes-like + Compressed data, including blosc header. Can be any object supporting the buffer + protocol. + dest : array-like, optional + Object to decompress into. + + Returns + ------- + dest : bytes + Object containing decompressed data. + + """ if dest is None: return blosc.decompress(source) elif isinstance(dest, np.ndarray): @@ -119,7 +172,8 @@ def decompress(source, dest: np.ndarray | bytearray | None = None): class Blosc(Codec): - """Codec providing compression using the Blosc meta-compressor. + """ + Codec providing compression using the Blosc meta-compressor. Parameters ---------- @@ -170,11 +224,5 @@ def decode(self, buf, out=None): return decompress(buf, out) def __repr__(self): - r = '%s(cname=%r, clevel=%r, shuffle=%s, blocksize=%s)' % ( - type(self).__name__, - self.cname, - self.clevel, - _shuffle_repr[self.shuffle + 1], - self.blocksize, - ) + r = f'{type(self).__name__}(cname={self.cname!r}, clevel={self.clevel!r}, shuffle={_shuffle_repr[self.shuffle + 1]}, blocksize={self.blocksize})' return r diff --git a/numcodecs/blosc.pyx b/numcodecs/blosc.pyx deleted file mode 100644 index 3caa36078..000000000 --- a/numcodecs/blosc.pyx +++ /dev/null @@ -1,599 +0,0 @@ -# cython: embedsignature=True -# cython: profile=False -# cython: linetrace=False -# cython: binding=False -# cython: language_level=3 -import threading -import multiprocessing -import os -from deprecated import deprecated - - -from cpython.buffer cimport PyBUF_ANY_CONTIGUOUS, PyBUF_WRITEABLE -from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AS_STRING - - -from .compat_ext cimport Buffer -from .compat_ext import Buffer -from .compat import ensure_contiguous_ndarray -from .abc import Codec - - -cdef extern from "blosc.h": - cdef enum: - BLOSC_MAX_OVERHEAD, - BLOSC_VERSION_STRING, - BLOSC_VERSION_DATE, - BLOSC_NOSHUFFLE, - BLOSC_SHUFFLE, - BLOSC_BITSHUFFLE, - BLOSC_MAX_BUFFERSIZE, - BLOSC_MAX_THREADS, - BLOSC_MAX_TYPESIZE, - BLOSC_DOSHUFFLE, - BLOSC_DOBITSHUFFLE, - BLOSC_MEMCPYED - - void blosc_init() - void blosc_destroy() - int blosc_get_nthreads() - int blosc_set_nthreads(int nthreads) - int blosc_set_compressor(const char *compname) - void blosc_set_blocksize(size_t blocksize) - char* blosc_list_compressors() - int blosc_compress(int clevel, int doshuffle, size_t typesize, size_t nbytes, - void* src, void* dest, size_t destsize) nogil - int blosc_decompress(void *src, void *dest, size_t destsize) nogil - int blosc_getitem(void* src, int start, int nitems, void* dest) - int blosc_compname_to_compcode(const char* compname) - int blosc_compress_ctx(int clevel, int doshuffle, size_t typesize, size_t nbytes, - const void* src, void* dest, size_t destsize, - const char* compressor, size_t blocksize, - int numinternalthreads) nogil - int blosc_decompress_ctx(const void* src, void* dest, size_t destsize, - int numinternalthreads) nogil - void blosc_cbuffer_sizes(const void* cbuffer, size_t* nbytes, size_t* cbytes, - size_t* blocksize) - char* blosc_cbuffer_complib(const void* cbuffer) - void blosc_cbuffer_metainfo(const void* cbuffer, size_t* typesize, int* flags) - - -MAX_OVERHEAD = BLOSC_MAX_OVERHEAD -MAX_BUFFERSIZE = BLOSC_MAX_BUFFERSIZE -MAX_THREADS = BLOSC_MAX_THREADS -MAX_TYPESIZE = BLOSC_MAX_TYPESIZE -VERSION_STRING = BLOSC_VERSION_STRING -VERSION_DATE = BLOSC_VERSION_DATE -VERSION_STRING = VERSION_STRING.decode() -VERSION_DATE = VERSION_DATE.decode() -__version__ = VERSION_STRING -NOSHUFFLE = BLOSC_NOSHUFFLE -SHUFFLE = BLOSC_SHUFFLE -BITSHUFFLE = BLOSC_BITSHUFFLE -# automatic shuffle -AUTOSHUFFLE = -1 -# automatic block size - let blosc decide -AUTOBLOCKS = 0 - -# synchronization -_MUTEX = None -_MUTEX_IS_INIT = False - -def get_mutex(): - global _MUTEX_IS_INIT, _MUTEX - if not _MUTEX_IS_INIT: - try: - mutex = multiprocessing.Lock() - except OSError: - mutex = None - except ImportError: - mutex = None - _MUTEX = mutex - _MUTEX_IS_INIT = True - return _MUTEX - -# store ID of process that first loads the module, so we can detect a fork later -_importer_pid = os.getpid() - - -def _init(): - """Initialize the Blosc library environment.""" - blosc_init() - -init = deprecated(_init) - - -def _destroy(): - """Destroy the Blosc library environment.""" - blosc_destroy() - - -destroy = deprecated(_destroy) - - -def _compname_to_compcode(cname): - """Return the compressor code associated with the compressor name. If the compressor - name is not recognized, or there is not support for it in this build, -1 is returned - instead.""" - if isinstance(cname, str): - cname = cname.encode('ascii') - return blosc_compname_to_compcode(cname) - -compname_to_compcode = deprecated(_compname_to_compcode) - - -def list_compressors(): - """Get a list of compressors supported in the current build.""" - s = blosc_list_compressors() - s = s.decode('ascii') - return s.split(',') - - -def get_nthreads(): - """Get the number of threads that Blosc uses internally for compression and - decompression.""" - return blosc_get_nthreads() - - -def set_nthreads(int nthreads): - """Set the number of threads that Blosc uses internally for compression and - decompression.""" - return blosc_set_nthreads(nthreads) - - -def _cbuffer_sizes(source): - """Return information about a compressed buffer, namely the number of uncompressed - bytes (`nbytes`) and compressed (`cbytes`). It also returns the `blocksize` (which - is used internally for doing the compression by blocks). - - Returns - ------- - nbytes : int - cbytes : int - blocksize : int - - """ - cdef: - Buffer buffer - size_t nbytes, cbytes, blocksize - - # obtain buffer - buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) - - # determine buffer size - blosc_cbuffer_sizes(buffer.ptr, &nbytes, &cbytes, &blocksize) - - # release buffers - buffer.release() - - return nbytes, cbytes, blocksize - -cbuffer_sizes = deprecated(_cbuffer_sizes) - -def cbuffer_complib(source): - """Return the name of the compression library used to compress `source`.""" - cdef: - Buffer buffer - - # obtain buffer - buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) - - # determine buffer size - complib = blosc_cbuffer_complib(buffer.ptr) - - # release buffers - buffer.release() - - complib = complib.decode('ascii') - - return complib - - -def _cbuffer_metainfo(source): - """Return some meta-information about the compressed buffer in `source`, including - the typesize, whether the shuffle or bit-shuffle filters were used, and the - whether the buffer was memcpyed. - - Returns - ------- - typesize - shuffle - memcpyed - - """ - cdef: - Buffer buffer - size_t typesize - int flags - - # obtain buffer - buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) - - # determine buffer size - blosc_cbuffer_metainfo(buffer.ptr, &typesize, &flags) - - # release buffers - buffer.release() - - # decompose flags - if flags & BLOSC_DOSHUFFLE: - shuffle = SHUFFLE - elif flags & BLOSC_DOBITSHUFFLE: - shuffle = BITSHUFFLE - else: - shuffle = NOSHUFFLE - memcpyed = flags & BLOSC_MEMCPYED - - return typesize, shuffle, memcpyed - -cbuffer_metainfo = deprecated(_cbuffer_metainfo) - -def _err_bad_cname(cname): - raise ValueError('bad compressor or compressor not supported: %r; expected one of ' - '%s' % (cname, list_compressors())) - -err_bad_cname = deprecated(_err_bad_cname) - -def compress(source, char* cname, int clevel, int shuffle=SHUFFLE, - int blocksize=AUTOBLOCKS): - """Compress data. - - Parameters - ---------- - source : bytes-like - Data to be compressed. Can be any object supporting the buffer - protocol. - cname : bytes - Name of compression library to use. - clevel : int - Compression level. - shuffle : int - Either NOSHUFFLE (0), SHUFFLE (1), BITSHUFFLE (2) or AUTOSHUFFLE (-1). If AUTOSHUFFLE, - bit-shuffle will be used for buffers with itemsize 1, and byte-shuffle will - be used otherwise. The default is `SHUFFLE`. - blocksize : int - The requested size of the compressed blocks. If 0, an automatic blocksize will - be used. - - Returns - ------- - dest : bytes - Compressed data. - - """ - - cdef: - char *source_ptr - char *dest_ptr - Buffer source_buffer - size_t nbytes, itemsize - int cbytes - bytes dest - - # check valid cname early - cname_str = cname.decode('ascii') - if cname_str not in list_compressors(): - _err_bad_cname(cname_str) - - # setup source buffer - source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) - source_ptr = source_buffer.ptr - nbytes = source_buffer.nbytes - itemsize = source_buffer.itemsize - - # determine shuffle - if shuffle == AUTOSHUFFLE: - if itemsize == 1: - shuffle = BITSHUFFLE - else: - shuffle = SHUFFLE - elif shuffle not in [NOSHUFFLE, SHUFFLE, BITSHUFFLE]: - raise ValueError('invalid shuffle argument; expected -1, 0, 1 or 2, found %r' % - shuffle) - - try: - - # setup destination - dest = PyBytes_FromStringAndSize(NULL, nbytes + BLOSC_MAX_OVERHEAD) - dest_ptr = PyBytes_AS_STRING(dest) - - # perform compression - if _get_use_threads(): - # allow blosc to use threads internally - - # N.B., we are using blosc's global context, and so we need to use a lock - # to ensure no-one else can modify the global context while we're setting it - # up and using it. - with get_mutex(): - - # set compressor - compressor_set = blosc_set_compressor(cname) - if compressor_set < 0: - # shouldn't happen if we checked against list of compressors - # already, but just in case - _err_bad_cname(cname_str) - - # set blocksize - blosc_set_blocksize(blocksize) - - # perform compression - with nogil: - cbytes = blosc_compress(clevel, shuffle, itemsize, nbytes, source_ptr, - dest_ptr, nbytes + BLOSC_MAX_OVERHEAD) - - else: - with nogil: - cbytes = blosc_compress_ctx(clevel, shuffle, itemsize, nbytes, source_ptr, - dest_ptr, nbytes + BLOSC_MAX_OVERHEAD, - cname, blocksize, 1) - - finally: - - # release buffers - source_buffer.release() - - # check compression was successful - if cbytes <= 0: - raise RuntimeError('error during blosc compression: %d' % cbytes) - - # resize after compression - dest = dest[:cbytes] - - return dest - - -def decompress(source, dest=None): - """Decompress data. - - Parameters - ---------- - source : bytes-like - Compressed data, including blosc header. Can be any object supporting the buffer - protocol. - dest : array-like, optional - Object to decompress into. - - Returns - ------- - dest : bytes - Object containing decompressed data. - - """ - cdef: - int ret - char *source_ptr - char *dest_ptr - Buffer source_buffer - Buffer dest_buffer = None - size_t nbytes, cbytes, blocksize - - # setup source buffer - source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) - source_ptr = source_buffer.ptr - - # determine buffer size - blosc_cbuffer_sizes(source_ptr, &nbytes, &cbytes, &blocksize) - - # setup destination buffer - if dest is None: - # allocate memory - dest = PyBytes_FromStringAndSize(NULL, nbytes) - dest_ptr = PyBytes_AS_STRING(dest) - dest_nbytes = nbytes - else: - arr = ensure_contiguous_ndarray(dest) - dest_buffer = Buffer(arr, PyBUF_ANY_CONTIGUOUS | PyBUF_WRITEABLE) - dest_ptr = dest_buffer.ptr - dest_nbytes = dest_buffer.nbytes - - try: - - # guard condition - if dest_nbytes < nbytes: - raise ValueError('destination buffer too small; expected at least %s, ' - 'got %s' % (nbytes, dest_nbytes)) - - # perform decompression - if _get_use_threads(): - # allow blosc to use threads internally - with nogil: - ret = blosc_decompress(source_ptr, dest_ptr, nbytes) - else: - with nogil: - ret = blosc_decompress_ctx(source_ptr, dest_ptr, nbytes, 1) - - finally: - - # release buffers - source_buffer.release() - if dest_buffer is not None: - dest_buffer.release() - - # handle errors - if ret <= 0: - raise RuntimeError('error during blosc decompression: %d' % ret) - - return dest - - -def _decompress_partial(source, start, nitems, dest=None): - """**Experimental** - Decompress data of only a part of a buffer. - - Parameters - ---------- - source : bytes-like - Compressed data, including blosc header. Can be any object supporting the buffer - protocol. - start: int, - Offset in item where we want to start decoding - nitems: int - Number of items we want to decode - dest : array-like, optional - Object to decompress into. - - - Returns - ------- - dest : bytes - Object containing decompressed data. - - """ - cdef: - int ret - int encoding_size - int nitems_bytes - int start_bytes - char *source_ptr - char *dest_ptr - Buffer source_buffer - Buffer dest_buffer = None - - # setup source buffer - source_buffer = Buffer(source, PyBUF_ANY_CONTIGUOUS) - source_ptr = source_buffer.ptr - - # get encoding size from source buffer header - encoding_size = source[3] - - # convert variables to handle type and encoding sizes - nitems_bytes = nitems * encoding_size - start_bytes = (start * encoding_size) - - # setup destination buffer - if dest is None: - dest = PyBytes_FromStringAndSize(NULL, nitems_bytes) - dest_ptr = PyBytes_AS_STRING(dest) - dest_nbytes = nitems_bytes - else: - arr = ensure_contiguous_ndarray(dest) - dest_buffer = Buffer(arr, PyBUF_ANY_CONTIGUOUS | PyBUF_WRITEABLE) - dest_ptr = dest_buffer.ptr - dest_nbytes = dest_buffer.nbytes - - # try decompression - try: - if dest_nbytes < nitems_bytes: - raise ValueError('destination buffer too small; expected at least %s, ' - 'got %s' % (nitems_bytes, dest_nbytes)) - ret = blosc_getitem(source_ptr, start, nitems, dest_ptr) - - finally: - source_buffer.release() - if dest_buffer is not None: - dest_buffer.release() - - # ret refers to the number of bytes returned from blosc_getitem. - if ret <= 0: - raise RuntimeError('error during blosc partial decompression: %d', ret) - - return dest - -decompress_partial = deprecated(_decompress_partial) - -# set the value of this variable to True or False to override the -# default adaptive behaviour -use_threads = None - - -def _get_use_threads(): - global use_threads - proc = multiprocessing.current_process() - - # check if locks are available, and if not no threads - if not get_mutex(): - return False - - # check for fork - if proc.pid != _importer_pid: - # If this module has been imported in the parent process, and the current process - # is a fork, attempting to use blosc in multi-threaded mode will cause a - # program hang, so we force use of blosc ctx functions, i.e., no threads. - return False - - if use_threads in [True, False]: - # user has manually overridden the default behaviour - _use_threads = use_threads - - else: - # Adaptive behaviour: allow blosc to use threads if it is being called from the - # main Python thread in the main Python process, inferring that it is being run - # from within a single-threaded, single-process program; otherwise do not allow - # blosc to use threads, inferring it is being run from within a multi-threaded - # program or multi-process program - - if proc.name != 'MainProcess': - _use_threads = False - elif hasattr(threading, 'main_thread'): - _use_threads = (threading.main_thread() == threading.current_thread()) - else: - _use_threads = threading.current_thread().name == 'MainThread' - - return _use_threads - - -_shuffle_repr = ['AUTOSHUFFLE', 'NOSHUFFLE', 'SHUFFLE', 'BITSHUFFLE'] - - -class Blosc(Codec): - """Codec providing compression using the Blosc meta-compressor. - - Parameters - ---------- - cname : string, optional - A string naming one of the compression algorithms available within blosc, e.g., - 'zstd', 'blosclz', 'lz4', 'lz4hc', 'zlib' or 'snappy'. - clevel : integer, optional - An integer between 0 and 9 specifying the compression level. - shuffle : integer, optional - Either NOSHUFFLE (0), SHUFFLE (1), BITSHUFFLE (2) or AUTOSHUFFLE (-1). If AUTOSHUFFLE, - bit-shuffle will be used for buffers with itemsize 1, and byte-shuffle will - be used otherwise. The default is `SHUFFLE`. - blocksize : int - The requested size of the compressed blocks. If 0 (default), an automatic - blocksize will be used. - - See Also - -------- - numcodecs.zstd.Zstd, numcodecs.lz4.LZ4 - - """ - - codec_id = 'blosc' - NOSHUFFLE = NOSHUFFLE - SHUFFLE = SHUFFLE - BITSHUFFLE = BITSHUFFLE - AUTOSHUFFLE = AUTOSHUFFLE - max_buffer_size = 2**31 - 1 - - def __init__(self, cname='lz4', clevel=5, shuffle=SHUFFLE, blocksize=AUTOBLOCKS): - self.cname = cname - if isinstance(cname, str): - self._cname_bytes = cname.encode('ascii') - else: - self._cname_bytes = cname - self.clevel = clevel - self.shuffle = shuffle - self.blocksize = blocksize - - def encode(self, buf): - buf = ensure_contiguous_ndarray(buf, self.max_buffer_size) - return compress(buf, self._cname_bytes, self.clevel, self.shuffle, self.blocksize) - - def decode(self, buf, out=None): - buf = ensure_contiguous_ndarray(buf, self.max_buffer_size) - return decompress(buf, out) - - def decode_partial(self, buf, int start, int nitems, out=None): - '''**Experimental**''' - buf = ensure_contiguous_ndarray(buf, self.max_buffer_size) - return _decompress_partial(buf, start, nitems, dest=out) - - def __repr__(self): - r = '%s(cname=%r, clevel=%r, shuffle=%s, blocksize=%s)' % \ - (type(self).__name__, - self.cname, - self.clevel, - _shuffle_repr[self.shuffle + 1], - self.blocksize) - return r diff --git a/numcodecs/tests/test_blosc.py b/numcodecs/tests/test_blosc.py index 0bc140107..1f1ea8012 100644 --- a/numcodecs/tests/test_blosc.py +++ b/numcodecs/tests/test_blosc.py @@ -5,7 +5,7 @@ import pytest try: - from numcodecs import blosc + from numcodecs import blosc as blosc from numcodecs.blosc import Blosc except ImportError: # pragma: no cover pytest.skip("numcodecs.blosc not available", allow_module_level=True) @@ -15,7 +15,6 @@ check_backwards_compatibility, check_config, check_encode_decode, - check_encode_decode_partial, check_err_decode_object_buffer, check_err_encode_object_buffer, check_max_buffer_size, @@ -75,19 +74,6 @@ def test_encode_decode(array, codec): check_encode_decode(array, codec) -@pytest.mark.parametrize('codec', codecs) -@pytest.mark.parametrize( - 'array', - [ - pytest.param(x) if len(x.shape) == 1 else pytest.param(x, marks=[pytest.mark.xfail]) - for x in arrays - ], -) -def test_partial_decode(codec, array): - _skip_null(codec) - check_encode_decode_partial(array, codec) - - def test_config(): codec = Blosc(cname='zstd', clevel=3, shuffle=1) check_config(codec) @@ -117,33 +103,6 @@ def test_eq(): assert Blosc(cname='lz4') != 'foo' -def test_compress_blocksize_default(use_threads): - arr = np.arange(1000, dtype='i4') - - blosc.use_threads = use_threads - - # default blocksize - enc = blosc.compress(arr, b'lz4', 1, Blosc.NOSHUFFLE) - _, _, blocksize = blosc._cbuffer_sizes(enc) - assert blocksize > 0 - - # explicit default blocksize - enc = blosc.compress(arr, b'lz4', 1, Blosc.NOSHUFFLE, 0) - _, _, blocksize = blosc._cbuffer_sizes(enc) - assert blocksize > 0 - - -@pytest.mark.parametrize('bs', [2**7, 2**8]) -def test_compress_blocksize(use_threads, bs): - arr = np.arange(1000, dtype='i4') - - blosc.use_threads = use_threads - - enc = blosc.compress(arr, b'lz4', 1, Blosc.NOSHUFFLE, bs) - _, _, blocksize = blosc._cbuffer_sizes(enc) - assert blocksize == bs - - def test_compress_complib(use_threads): arr = np.arange(1000, dtype='i4') expected_complibs = { @@ -155,7 +114,7 @@ def test_compress_complib(use_threads): } blosc.use_threads = use_threads for cname in blosc.list_compressors(): - enc = blosc.compress(arr, cname.encode(), 1, Blosc.NOSHUFFLE) + enc = blosc.compress(arr, cname, 1, Blosc.NOSHUFFLE) complib = blosc.cbuffer_complib(enc) expected_complib = expected_complibs[cname] assert complib == expected_complib @@ -167,33 +126,6 @@ def test_compress_complib(use_threads): blosc.compress(arr, b'foo', 1) -@pytest.mark.parametrize('dtype', ['i1', 'i2', 'i4', 'i8']) -def test_compress_metainfo(dtype, use_threads): - arr = np.arange(1000, dtype=dtype) - for shuffle in Blosc.NOSHUFFLE, Blosc.SHUFFLE, Blosc.BITSHUFFLE: - blosc.use_threads = use_threads - for cname in blosc.list_compressors(): - enc = blosc.compress(arr, cname.encode(), 1, shuffle) - typesize, did_shuffle, _ = blosc._cbuffer_metainfo(enc) - assert typesize == arr.dtype.itemsize - assert did_shuffle == shuffle - - -def test_compress_autoshuffle(use_threads): - arr = np.arange(8000) - for dtype in 'i1', 'i2', 'i4', 'i8', 'f2', 'f4', 'f8', 'bool', 'S10': - varr = arr.view(dtype) - blosc.use_threads = use_threads - for cname in blosc.list_compressors(): - enc = blosc.compress(varr, cname.encode(), 1, Blosc.AUTOSHUFFLE) - typesize, did_shuffle, _ = blosc._cbuffer_metainfo(enc) - assert typesize == varr.dtype.itemsize - if typesize == 1: - assert did_shuffle == Blosc.BITSHUFFLE - else: - assert did_shuffle == Blosc.SHUFFLE - - def test_config_blocksize(): # N.B., we want to be backwards compatible with any config where blocksize is not # explicitly stated @@ -259,17 +191,8 @@ def test_err_encode_object_buffer(): check_err_encode_object_buffer(Blosc()) -def test_decompression_error_handling(): - for codec in codecs: - _skip_null(codec) - with pytest.raises(RuntimeError): - codec.decode(bytearray()) - with pytest.raises(RuntimeError): - codec.decode(bytearray(0)) - - -def test_max_buffer_size(): - for codec in codecs: - _skip_null(codec) - assert codec.max_buffer_size == 2**31 - 1 - check_max_buffer_size(codec) +@pytest.mark.parametrize("codec", codecs) +def test_max_buffer_size(codec): + _skip_null(codec) + assert codec.max_buffer_size == 2**31 - 1 + check_max_buffer_size(codec) diff --git a/numcodecs/tests/test_blosc_v2.py b/numcodecs/tests/test_blosc_v2.py deleted file mode 100644 index 94230293c..000000000 --- a/numcodecs/tests/test_blosc_v2.py +++ /dev/null @@ -1,286 +0,0 @@ -from multiprocessing import Pool -from multiprocessing.pool import ThreadPool - - -import numpy as np -import pytest - - -try: - from numcodecs import blosc_v2 as blosc - from numcodecs.blosc_v2 import Blosc -except ImportError: # pragma: no cover - pytest.skip("numcodecs.blosc not available", allow_module_level=True) - - -from numcodecs.tests.common import ( - check_encode_decode, - check_config, - check_backwards_compatibility, - check_err_decode_object_buffer, - check_err_encode_object_buffer, - check_max_buffer_size, -) - - -codecs = [ - Blosc(shuffle=Blosc.SHUFFLE), - Blosc(clevel=0, shuffle=Blosc.SHUFFLE), - Blosc(cname='lz4', shuffle=Blosc.SHUFFLE), - Blosc(cname='lz4', clevel=1, shuffle=Blosc.NOSHUFFLE), - Blosc(cname='lz4', clevel=5, shuffle=Blosc.SHUFFLE), - Blosc(cname='lz4', clevel=9, shuffle=Blosc.BITSHUFFLE), - Blosc(cname='zlib', clevel=1, shuffle=0), - Blosc(cname='zstd', clevel=1, shuffle=1), - Blosc(cname='blosclz', clevel=1, shuffle=2), - None, # was snappy - Blosc(shuffle=Blosc.SHUFFLE, blocksize=0), - Blosc(shuffle=Blosc.SHUFFLE, blocksize=2**8), - Blosc(cname='lz4', clevel=1, shuffle=Blosc.NOSHUFFLE, blocksize=2**8), -] - - -# mix of dtypes: integer, float, bool, string -# mix of shapes: 1D, 2D, 3D -# mix of orders: C, F -arrays = [ - np.arange(1000, dtype='i4'), - np.linspace(1000, 1001, 1000, dtype='f8'), - np.random.normal(loc=1000, scale=1, size=(100, 10)), - np.random.randint(0, 2, size=1000, dtype=bool).reshape(100, 10, order='F'), - np.random.choice([b'a', b'bb', b'ccc'], size=1000).reshape(10, 10, 10), - np.random.randint(0, 2**60, size=1000, dtype='u8').view('M8[ns]'), - np.random.randint(0, 2**60, size=1000, dtype='u8').view('m8[ns]'), - np.random.randint(0, 2**25, size=1000, dtype='u8').view('M8[m]'), - np.random.randint(0, 2**25, size=1000, dtype='u8').view('m8[m]'), - np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('M8[ns]'), - np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('m8[ns]'), - np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('M8[m]'), - np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('m8[m]'), -] - - -def _skip_null(codec): - if codec is None: - pytest.skip("codec has been removed") - - -@pytest.fixture(scope='module', params=[True, False, None]) -def use_threads(request): - return request.param - - -@pytest.mark.parametrize('array', arrays) -@pytest.mark.parametrize('codec', codecs) -def test_encode_decode(array, codec): - _skip_null(codec) - check_encode_decode(array, codec) - - -""" -@pytest.mark.parametrize('codec', codecs) -@pytest.mark.parametrize( - 'array', - [ - pytest.param(x) if len(x.shape) == 1 else pytest.param(x, marks=[pytest.mark.xfail]) - for x in arrays - ], -) -def test_partial_decode(codec, array): - _skip_null(codec) - check_encode_decode_partial(array, codec) -""" - - -def test_config(): - codec = Blosc(cname='zstd', clevel=3, shuffle=1) - check_config(codec) - codec = Blosc(cname='lz4', clevel=1, shuffle=2, blocksize=2**8) - check_config(codec) - - -def test_repr(): - expect = "Blosc(cname='zstd', clevel=3, shuffle=SHUFFLE, blocksize=0)" - actual = repr(Blosc(cname='zstd', clevel=3, shuffle=Blosc.SHUFFLE, blocksize=0)) - assert expect == actual - expect = "Blosc(cname='lz4', clevel=1, shuffle=NOSHUFFLE, blocksize=256)" - actual = repr(Blosc(cname='lz4', clevel=1, shuffle=Blosc.NOSHUFFLE, blocksize=256)) - assert expect == actual - expect = "Blosc(cname='zlib', clevel=9, shuffle=BITSHUFFLE, blocksize=512)" - actual = repr(Blosc(cname='zlib', clevel=9, shuffle=Blosc.BITSHUFFLE, blocksize=512)) - assert expect == actual - expect = "Blosc(cname='blosclz', clevel=5, shuffle=AUTOSHUFFLE, blocksize=1024)" - actual = repr(Blosc(cname='blosclz', clevel=5, shuffle=Blosc.AUTOSHUFFLE, blocksize=1024)) - assert expect == actual - - -def test_eq(): - assert Blosc() == Blosc() - assert Blosc(cname='lz4') != Blosc(cname='zstd') - assert Blosc(clevel=1) != Blosc(clevel=9) - assert Blosc(cname='lz4') != 'foo' - - -@pytest.mark.skip("blosc-python has no way to get blocksize") -def test_compress_blocksize_default(use_threads): - arr = np.arange(1000, dtype='i4') - - blosc.use_threads = use_threads - - # default blocksize - enc = blosc.compress(arr, b'lz4', clevel=1, shuffle=Blosc.NOSHUFFLE) - _, _, blocksize = blosc.cbuffer_sizes(enc) - assert blocksize > 0 - - # explicit default blocksize - enc = blosc.compress(arr, b'lz4', clevel=1, shuffle=Blosc.NOSHUFFLE, blocksize=0) - _, _, blocksize = blosc.cbuffer_sizes(enc) - assert blocksize > 0 - - -@pytest.mark.skip("blosc-python has no way to get cbuffer sizes") -@pytest.mark.parametrize('bs', (2**7, 2**8)) -def test_compress_blocksize(use_threads, bs): - arr = np.arange(1000, dtype='i4') - - blosc.use_threads = use_threads - - enc = blosc.compress(arr, b'lz4', clevel=1, shuffle=Blosc.NOSHUFFLE, blocksize=bs) - _, _, blocksize = blosc.cbuffer_sizes(enc) - assert blocksize == bs - - -def test_compress_complib(use_threads): - arr = np.arange(1000, dtype='i4') - expected_complibs = { - 'lz4': 'LZ4', - 'lz4hc': 'LZ4', - 'blosclz': 'BloscLZ', - 'zlib': 'Zlib', - 'zstd': 'Zstd', - } - blosc.use_threads = use_threads - for cname in blosc.list_compressors(): - enc = blosc.compress(arr, cname, 1, Blosc.NOSHUFFLE) - complib = blosc.cbuffer_complib(enc) - expected_complib = expected_complibs[cname] - assert complib == expected_complib - with pytest.raises(ValueError): - # capitalized cname - blosc.compress(arr, b'LZ4', 1) - with pytest.raises(ValueError): - # bad cname - blosc.compress(arr, b'foo', 1) - - -@pytest.mark.skip("blosc-python has no way to get cbuffer metainfo") -@pytest.mark.parametrize('dtype', ['i1', 'i2', 'i4', 'i8']) -def test_compress_metainfo(dtype, use_threads): - arr = np.arange(1000, dtype=dtype) - for shuffle in Blosc.NOSHUFFLE, Blosc.SHUFFLE, Blosc.BITSHUFFLE: - blosc.use_threads = use_threads - for cname in blosc.list_compressors(): - enc = blosc.compress(arr, cname, 1, shuffle) - typesize, did_shuffle, _ = blosc.cbuffer_metainfo(enc) - assert typesize == arr.dtype.itemsize - assert did_shuffle == shuffle - - -@pytest.mark.skip("blosc-python has no way to get cbuffer metainfo") -def test_compress_autoshuffle(use_threads): - arr = np.arange(8000) - for dtype in 'i1', 'i2', 'i4', 'i8', 'f2', 'f4', 'f8', 'bool', 'S10': - varr = arr.view(dtype) - blosc.use_threads = use_threads - for cname in blosc.list_compressors(): - enc = blosc.compress(varr, cname.encode(), 1, Blosc.AUTOSHUFFLE) - typesize, did_shuffle, _ = blosc.cbuffer_metainfo(enc) - assert typesize == varr.dtype.itemsize - if typesize == 1: - assert did_shuffle == Blosc.BITSHUFFLE - else: - assert did_shuffle == Blosc.SHUFFLE - - -def test_config_blocksize(): - # N.B., we want to be backwards compatible with any config where blocksize is not - # explicitly stated - - # blocksize not stated - config = dict(cname='lz4', clevel=1, shuffle=Blosc.SHUFFLE) - codec = Blosc.from_config(config) - assert codec.blocksize == 0 - - # blocksize stated - config = dict(cname='lz4', clevel=1, shuffle=Blosc.SHUFFLE, blocksize=2**8) - codec = Blosc.from_config(config) - assert codec.blocksize == 2**8 - - -def test_backwards_compatibility(): - check_backwards_compatibility(Blosc.codec_id, arrays, codecs) - - -def _encode_worker(data): - compressor = Blosc(cname='zlib', clevel=9, shuffle=Blosc.SHUFFLE) - enc = compressor.encode(data) - return enc - - -def _decode_worker(enc): - compressor = Blosc() - data = compressor.decode(enc) - return data - - -@pytest.mark.parametrize('pool', (Pool, ThreadPool)) -def test_multiprocessing(use_threads, pool): - data = np.arange(1000000) - enc = _encode_worker(data) - - pool = pool(5) - - try: - blosc.use_threads = use_threads - - # test with process pool and thread pool - - # test encoding - enc_results = pool.map(_encode_worker, [data] * 5) - assert all(len(enc) == len(e) for e in enc_results) - - # test decoding - dec_results = pool.map(_decode_worker, [enc] * 5) - assert all(data.nbytes == len(d) for d in dec_results) - - # tidy up - pool.close() - pool.join() - - finally: - blosc.use_threads = None # restore default - - -def test_err_decode_object_buffer(): - check_err_decode_object_buffer(Blosc()) - - -def test_err_encode_object_buffer(): - check_err_encode_object_buffer(Blosc()) - - -@pytest.mark.skip("blosc can decode empty data fine") -def test_decompression_error_handling(): - for codec in codecs: - _skip_null(codec) - with pytest.raises(RuntimeError): - codec.decode(bytearray()) - with pytest.raises(RuntimeError): - codec.decode(bytearray(0)) - - -@pytest.mark.parametrize("codec", codecs) -def test_max_buffer_size(codec): - _skip_null(codec) - assert codec.max_buffer_size == 2**31 - 1 - check_max_buffer_size(codec) diff --git a/setup.py b/setup.py index 6cbddf4a5..c430896a1 100644 --- a/setup.py +++ b/setup.py @@ -49,80 +49,6 @@ def error(*msg): print('[numcodecs]', *msg, **kwargs) -def blosc_extension(): - info('setting up Blosc extension') - - extra_compile_args = base_compile_args.copy() - define_macros = [] - - # setup blosc sources - blosc_sources = [f for f in glob('c-blosc/blosc/*.c') if 'avx2' not in f and 'sse2' not in f] - include_dirs = [os.path.join('c-blosc', 'blosc')] - - # add internal complibs - blosc_sources += glob('c-blosc/internal-complibs/lz4*/*.c') - blosc_sources += glob('c-blosc/internal-complibs/snappy*/*.cc') - blosc_sources += glob('c-blosc/internal-complibs/zlib*/*.c') - blosc_sources += glob('c-blosc/internal-complibs/zstd*/common/*.c') - blosc_sources += glob('c-blosc/internal-complibs/zstd*/compress/*.c') - blosc_sources += glob('c-blosc/internal-complibs/zstd*/decompress/*.c') - blosc_sources += glob('c-blosc/internal-complibs/zstd*/dictBuilder/*.c') - include_dirs += [d for d in glob('c-blosc/internal-complibs/*') if os.path.isdir(d)] - include_dirs += [d for d in glob('c-blosc/internal-complibs/*/*') if os.path.isdir(d)] - include_dirs += [d for d in glob('c-blosc/internal-complibs/*/*/*') if os.path.isdir(d)] - # remove minizip because Python.h 3.8 tries to include crypt.h - include_dirs = [d for d in include_dirs if 'minizip' not in d] - define_macros += [ - ('HAVE_LZ4', 1), - # ('HAVE_SNAPPY', 1), - ('HAVE_ZLIB', 1), - ('HAVE_ZSTD', 1), - ] - # define_macros += [('CYTHON_TRACE', '1')] - - # SSE2 - if have_sse2 and not disable_sse2: - info('compiling Blosc extension with SSE2 support') - extra_compile_args.append('-DSHUFFLE_SSE2_ENABLED') - blosc_sources += [f for f in glob('c-blosc/blosc/*.c') if 'sse2' in f] - if os.name == 'nt': - define_macros += [('__SSE2__', 1)] - else: - info('compiling Blosc extension without SSE2 support') - - # AVX2 - if have_avx2 and not disable_avx2: - info('compiling Blosc extension with AVX2 support') - extra_compile_args.append('-DSHUFFLE_AVX2_ENABLED') - blosc_sources += [f for f in glob('c-blosc/blosc/*.c') if 'avx2' in f] - if os.name == 'nt': - define_macros += [('__AVX2__', 1)] - else: - info('compiling Blosc extension without AVX2 support') - - # include assembly files - if cpuinfo.platform.machine() == 'x86_64': - extra_objects = [ - S[:-1] + 'o' for S in glob("c-blosc/internal-complibs/zstd*/decompress/*amd64.S") - ] - else: - extra_objects = [] - - sources = ['numcodecs/blosc.pyx'] - - # define extension module - return [ - Extension( - 'numcodecs.blosc', - sources=sources + blosc_sources, - include_dirs=include_dirs, - define_macros=define_macros, - extra_compile_args=extra_compile_args, - extra_objects=extra_objects, - ), - ] - - def zstd_extension(): info('setting up Zstandard extension') @@ -342,8 +268,8 @@ def run(self): def run_setup(with_extensions): if with_extensions: ext_modules = ( - blosc_extension() - + zstd_extension() + # blosc_extension() + zstd_extension() + lz4_extension() + compat_extension() + shuffle_extension()