Skip to content

Commit

Permalink
Zarr V2: add read/update support for 'shuffle' filter
Browse files Browse the repository at this point in the history
  • Loading branch information
rouault committed Feb 19, 2025
1 parent fec933e commit 000cced
Show file tree
Hide file tree
Showing 5 changed files with 256 additions and 12 deletions.
13 changes: 12 additions & 1 deletion autotest/gdrivers/data/zarr/generate_test_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import numpy as np
import zarr
from numcodecs import LZ4, LZMA, Blosc, GZip, Zlib, Zstd
from numcodecs import LZ4, LZMA, Blosc, GZip, Shuffle, Zlib, Zstd

os.chdir(os.path.dirname(__file__))

Expand Down Expand Up @@ -69,6 +69,17 @@
)
z[:] = [1, 2]

z = zarr.open(
"shuffle.zarr",
mode="w",
dtype="u2",
shape=(2,),
chunks=(2,),
compressor=None,
filters=[Shuffle(elementsize=2)],
)
z[:] = [1, 2]


z = zarr.open(
"order_f_u1.zarr",
Expand Down
19 changes: 19 additions & 0 deletions autotest/gdrivers/data/zarr/shuffle.zarr/.zarray
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"chunks": [
2
],
"compressor": null,
"dtype": "<u2",
"fill_value": 0,
"filters": [
{
"elementsize": 2,
"id": "shuffle"
}
],
"order": "C",
"shape": [
2
],
"zarr_format": 2
}
Binary file added autotest/gdrivers/data/zarr/shuffle.zarr/0
Binary file not shown.
31 changes: 31 additions & 0 deletions autotest/gdrivers/zarr_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import json
import math
import os
import shutil
import struct
import sys

Expand Down Expand Up @@ -535,6 +536,36 @@ def test_zarr_read_compression_methods(datasetname, compressor):
assert ar.Read() == array.array("b", [1, 2])


def test_zarr_read_shuffle_filter():

filename = "data/zarr/shuffle.zarr"
ds = gdal.OpenEx(filename, gdal.OF_MULTIDIM_RASTER)
rg = ds.GetRootGroup()
assert rg
ar = rg.OpenMDArray(rg.GetMDArrayNames()[0])
assert ar
assert ar.Read() == array.array("h", [1, 2])


def test_zarr_read_shuffle_filter_update(tmp_path):

out_filename = tmp_path / "filter_update.zarr"
shutil.copytree("data/zarr/shuffle.zarr", out_filename)

def write():
ds = gdal.OpenEx(out_filename, gdal.OF_MULTIDIM_RASTER | gdal.OF_UPDATE)
rg = ds.GetRootGroup()
ar = rg.OpenMDArray(rg.GetMDArrayNames()[0])
ar.Write([3, 4])

write()

ds = gdal.OpenEx(out_filename, gdal.OF_MULTIDIM_RASTER)
rg = ds.GetRootGroup()
ar = rg.OpenMDArray(rg.GetMDArrayNames()[0])
assert ar.Read() == array.array("h", [3, 4])


@pytest.mark.parametrize("name", ["u1", "u2", "u4", "u8"])
def test_zarr_read_fortran_order(name):

Expand Down
205 changes: 194 additions & 11 deletions frmts/zarr/zarr_v2_array.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,181 @@ bool ZarrV2Array::AllocateWorkingBuffers(
#undef m_abyDecodedTileData
}

/************************************************************************/
/* ZarrShuffleCompressor() */
/************************************************************************/

static bool ZarrShuffleCompressor(const void *input_data, size_t input_size,
void **output_data, size_t *output_size,
CSLConstList options,
void * /* compressor_user_data */)
{
// 4 is the default of the shuffle numcodecs:
// https://numcodecs.readthedocs.io/en/v0.10.0/shuffle.html
const int eltSize = atoi(CSLFetchNameValueDef(options, "ELEMENTSIZE", "4"));
if (eltSize != 2 && eltSize != 4 && eltSize != 8)
{
CPLError(CE_Failure, CPLE_AppDefined,
"Only ELEMENTSIZE=2,4,8 is supported");
if (output_size)
*output_size = 0;
return false;
}
if ((input_size % eltSize) != 0)
{
CPLError(CE_Failure, CPLE_AppDefined,
"input_size should be a multiple of ELEMENTSIZE");
if (output_size)
*output_size = 0;
return false;
}
if (output_data != nullptr && *output_data != nullptr &&
output_size != nullptr && *output_size != 0)
{
if (*output_size < input_size)
{
CPLError(CE_Failure, CPLE_AppDefined, "Too small output size");
*output_size = input_size;
return false;
}

const size_t nElts = input_size / eltSize;
// Put at the front of the output buffer all the least significant
// bytes of each word, then then 2nd least significant byte, etc.
for (size_t i = 0; i < nElts; ++i)
{
for (int j = 0; j < eltSize; j++)
{
(static_cast<uint8_t *>(*output_data))[j * nElts + i] =
(static_cast<const uint8_t *>(input_data))[i * eltSize + j];
}
}

*output_size = input_size;
return true;
}

if (output_data == nullptr && output_size != nullptr)
{
*output_size = input_size;
return true;
}

if (output_data != nullptr && *output_data == nullptr &&
output_size != nullptr)
{
*output_data = VSI_MALLOC_VERBOSE(input_size);
*output_size = input_size;
if (*output_data == nullptr)
return false;
bool ret = ZarrShuffleCompressor(input_data, input_size, output_data,
output_size, options, nullptr);
if (!ret)
{
VSIFree(*output_data);
*output_data = nullptr;
}
return ret;
}

CPLError(CE_Failure, CPLE_AppDefined, "Invalid use of API");
return false;
}

/************************************************************************/
/* ZarrShuffleDecompressor() */
/************************************************************************/

static bool ZarrShuffleDecompressor(const void *input_data, size_t input_size,
void **output_data, size_t *output_size,
CSLConstList options,
void * /* compressor_user_data */)
{
// 4 is the default of the shuffle numcodecs:
// https://numcodecs.readthedocs.io/en/v0.10.0/shuffle.html
const int eltSize = atoi(CSLFetchNameValueDef(options, "ELEMENTSIZE", "4"));
if (eltSize != 2 && eltSize != 4 && eltSize != 8)
{
CPLError(CE_Failure, CPLE_AppDefined,
"Only ELEMENTSIZE=2,4,8 is supported");
if (output_size)
*output_size = 0;
return false;
}
if ((input_size % eltSize) != 0)
{
CPLError(CE_Failure, CPLE_AppDefined,
"input_size should be a multiple of ELEMENTSIZE");
if (output_size)
*output_size = 0;
return false;
}
if (output_data != nullptr && *output_data != nullptr &&
output_size != nullptr && *output_size != 0)
{
if (*output_size < input_size)
{
CPLError(CE_Failure, CPLE_AppDefined, "Too small output size");
*output_size = input_size;
return false;
}

// Reverse of what is done in the compressor function.
const size_t nElts = input_size / eltSize;
for (size_t i = 0; i < nElts; ++i)
{
for (int j = 0; j < eltSize; j++)
{
(static_cast<uint8_t *>(*output_data))[i * eltSize + j] =
(static_cast<const uint8_t *>(input_data))[j * nElts + i];
}
}

*output_size = input_size;
return true;
}

if (output_data == nullptr && output_size != nullptr)
{
*output_size = input_size;
return true;
}

if (output_data != nullptr && *output_data == nullptr &&
output_size != nullptr)
{
*output_data = VSI_MALLOC_VERBOSE(input_size);
*output_size = input_size;
if (*output_data == nullptr)
return false;
bool ret = ZarrShuffleDecompressor(input_data, input_size, output_data,
output_size, options, nullptr);
if (!ret)
{
VSIFree(*output_data);
*output_data = nullptr;
}
return ret;
}

CPLError(CE_Failure, CPLE_AppDefined, "Invalid use of API");
return false;
}

static const CPLCompressor gShuffleCompressor = {
/* nStructVersion = */ 1,
/* pszId = */ "shuffle", CCT_FILTER,
/* papszMetadata = */ nullptr, ZarrShuffleCompressor,
/* user_data = */ nullptr};

static const CPLCompressor gShuffleDecompressor = {
/* nStructVersion = */ 1,
/* pszId = */ "shuffle",
CCT_FILTER,
/* papszMetadata = */ nullptr,
ZarrShuffleDecompressor,
/* user_data = */ nullptr};

/************************************************************************/
/* ZarrV2Array::LoadTileData() */
/************************************************************************/
Expand Down Expand Up @@ -563,7 +738,9 @@ bool ZarrV2Array::LoadTileData(const uint64_t *tileIndices, bool bUseMutex,
const auto &oFilter = m_oFiltersArray[i];
const auto osFilterId = oFilter["id"].ToString();
const auto psFilterDecompressor =
CPLGetDecompressor(osFilterId.c_str());
EQUAL(osFilterId.c_str(), "shuffle")
? &gShuffleDecompressor
: CPLGetDecompressor(osFilterId.c_str());
CPLAssert(psFilterDecompressor);

CPLStringList aosOptions;
Expand Down Expand Up @@ -846,7 +1023,10 @@ bool ZarrV2Array::FlushDirtyTile() const
for (const auto &oFilter : m_oFiltersArray)
{
const auto osFilterId = oFilter["id"].ToString();
const auto psFilterCompressor = CPLGetCompressor(osFilterId.c_str());
const auto psFilterCompressor =
EQUAL(osFilterId.c_str(), "shuffle")
? &gShuffleCompressor
: CPLGetCompressor(osFilterId.c_str());
CPLAssert(psFilterCompressor);

CPLStringList aosOptions;
Expand Down Expand Up @@ -1865,16 +2045,19 @@ ZarrV2Group::LoadArray(const std::string &osArrayName,
CPLError(CE_Failure, CPLE_AppDefined, "Missing filter id");
return nullptr;
}
const auto psFilterCompressor =
CPLGetCompressor(osFilterId.c_str());
const auto psFilterDecompressor =
CPLGetDecompressor(osFilterId.c_str());
if (psFilterCompressor == nullptr ||
psFilterDecompressor == nullptr)
if (!EQUAL(osFilterId.c_str(), "shuffle"))
{
CPLError(CE_Failure, CPLE_AppDefined, "Filter %s not handled",
osFilterId.c_str());
return nullptr;
const auto psFilterCompressor =
CPLGetCompressor(osFilterId.c_str());
const auto psFilterDecompressor =
CPLGetDecompressor(osFilterId.c_str());
if (psFilterCompressor == nullptr ||
psFilterDecompressor == nullptr)
{
CPLError(CE_Failure, CPLE_AppDefined,
"Filter %s not handled", osFilterId.c_str());
return nullptr;
}
}
}
}
Expand Down

0 comments on commit 000cced

Please sign in to comment.