Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mmap large dictionaries in patch-from mode #3486

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 134 additions & 26 deletions programs/fileio.c
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,11 @@ void FIO_setPassThroughFlag(FIO_prefs_t* const prefs, int value) {
prefs->passThrough = (value != 0);
}

void FIO_setMMapDict(FIO_prefs_t* const prefs, ZSTD_paramSwitch_e value)
{
prefs->mmapDict = value;
}

/* FIO_ctx_t functions */

void FIO_setHasStdoutOutput(FIO_ctx_t* const fCtx, int value) {
Expand Down Expand Up @@ -660,7 +665,23 @@ FIO_openDstFile(FIO_ctx_t* fCtx, FIO_prefs_t* const prefs,
}
}

/*! FIO_createDictBuffer() :

/* FIO_getDictFileStat() :
*/
static void FIO_getDictFileStat(const char* fileName, stat_t* dictFileStat) {
assert(dictFileStat != NULL);
if (fileName == NULL) return;

if (!UTIL_stat(fileName, dictFileStat)) {
EXM_THROW(31, "Stat failed on dictionary file %s: %s", fileName, strerror(errno));
}

if (!UTIL_isRegularFileStat(dictFileStat)) {
EXM_THROW(32, "Dictionary %s must be a regular file.", fileName);
}
}

/* FIO_createDictBuffer() :
* creates a buffer, pointed by `*bufferPtr`,
* loads `filename` content into it, up to DICTSIZE_MAX bytes.
* @return : loaded size
Expand All @@ -678,14 +699,6 @@ static size_t FIO_createDictBuffer(void** bufferPtr, const char* fileName, FIO_p

DISPLAYLEVEL(4,"Loading %s as dictionary \n", fileName);

if (!UTIL_stat(fileName, dictFileStat)) {
EXM_THROW(31, "Stat failed on dictionary file %s: %s", fileName, strerror(errno));
}

if (!UTIL_isRegularFileStat(dictFileStat)) {
EXM_THROW(32, "Dictionary %s must be a regular file.", fileName);
}

fileHandle = fopen(fileName, "rb");

if (fileHandle == NULL) {
Expand All @@ -712,6 +725,70 @@ static size_t FIO_createDictBuffer(void** bufferPtr, const char* fileName, FIO_p
return (size_t)fileSize;
}

#if (PLATFORM_POSIX_VERSION > 0)
#include <sys/mman.h>
static void* FIO_mmap(size_t fileSize, int fileHandle)
{
return mmap
(NULL, (size_t)fileSize, PROT_READ, MAP_PRIVATE, fileHandle, 0);
}
static int FIO_munmap(void* buffer, size_t bufferSize)
{
return munmap(buffer, bufferSize);
}
/* We might want to also do mapping for windows */
static size_t FIO_createDictBufferMMap(void** bufferPtr, const char* fileName, FIO_prefs_t* const prefs, stat_t* dictFileStat)
{
int fileHandle;
U64 fileSize;

assert(bufferPtr != NULL);
assert(dictFileStat != NULL);
*bufferPtr = NULL;
if (fileName == NULL) return 0;

DISPLAYLEVEL(4,"Loading %s as dictionary \n", fileName);

fileHandle = open(fileName, O_RDONLY);

if (fileHandle == -1) {
EXM_THROW(33, "Couldn't open dictionary %s: %s", fileName, strerror(errno));
}

fileSize = UTIL_getFileSizeStat(dictFileStat);
{
size_t const dictSizeMax = prefs->patchFromMode ? prefs->memLimit : DICTSIZE_MAX;
if (fileSize > dictSizeMax) {
EXM_THROW(34, "Dictionary file %s is too large (> %u bytes)",
fileName, (unsigned)dictSizeMax); /* avoid extreme cases */
}
}

*bufferPtr = FIO_mmap((size_t)fileSize, fileHandle);

close(fileHandle);
return (size_t)fileSize;
}
#else
static size_t FIO_createDictBufferMMap(void** bufferPtr, const char* fileName, FIO_prefs_t* const prefs, stat_t* dictFileStat)
{
return FIO_createDictBuffer(bufferPtr, fileName, prefs, dictFileStat);
}
static void FIO_munmap(void* buffer, size_t bufferSize) {
(void)bufferSize;
free(buffer);
}
#endif

static void FIO_freeDict(const FIO_Dict_t* dict) {
if (dict->dictBufferType == FIO_mallocDict) {
free(dict->dictBuffer);
} else if (dict->dictBufferType == FIO_mmapDict) {
FIO_munmap(dict->dictBuffer, dict->dictBufferSize);
} else {
assert(0); /* Should not reach this case */
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I would add an else assert(0); at the end,
because such a case is supposed to never happen,
unless the FIO_Dict_t code is changed in the future.



/* FIO_checkFilenameCollisions() :
Expand Down Expand Up @@ -914,8 +991,7 @@ static ZSTD_outBuffer setOutBuffer(void* buf, size_t s, size_t pos)
* Compression
************************************************************************/
typedef struct {
void* dictBuffer;
size_t dictBufferSize;
FIO_Dict_t dict;
const char* dictFileName;
stat_t dictFileStat;
ZSTD_CStream* cctx;
Expand Down Expand Up @@ -961,6 +1037,8 @@ static void FIO_adjustParamsForPatchFromMode(FIO_prefs_t* const prefs,
static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
const char* dictFileName, unsigned long long const maxSrcFileSize,
int cLevel, ZSTD_compressionParameters comprParams) {
int useMMap = prefs->mmapDict == ZSTD_ps_enable;
int forceNoUseMMap = prefs->mmapDict == ZSTD_ps_disable;
cRess_t ress;
memset(&ress, 0, sizeof(ress));

Expand All @@ -970,19 +1048,30 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
EXM_THROW(30, "allocation error (%s): can't create ZSTD_CCtx",
strerror(errno));

FIO_getDictFileStat(dictFileName, &ress.dictFileStat);

/* need to update memLimit before calling createDictBuffer
* because of memLimit check inside it */
if (prefs->patchFromMode) {
U64 const dictSize = UTIL_getFileSizeStat(&ress.dictFileStat);
unsigned long long const ssSize = (unsigned long long)prefs->streamSrcSize;
FIO_adjustParamsForPatchFromMode(prefs, &comprParams, UTIL_getFileSize(dictFileName), ssSize > 0 ? ssSize : maxSrcFileSize, cLevel);
useMMap |= dictSize > prefs->memLimit;
FIO_adjustParamsForPatchFromMode(prefs, &comprParams, dictSize, ssSize > 0 ? ssSize : maxSrcFileSize, cLevel);
}

ress.dict.dictBufferType = (useMMap && !forceNoUseMMap) ? FIO_mmapDict : FIO_mallocDict;

if (ress.dict.dictBufferType == FIO_mallocDict) {
ress.dict.dictBufferSize = FIO_createDictBuffer(&ress.dict.dictBuffer, dictFileName, prefs, &ress.dictFileStat); /* works with dictFileName==NULL */
} else {
ress.dict.dictBufferSize = FIO_createDictBufferMMap(&ress.dict.dictBuffer, dictFileName, prefs, &ress.dictFileStat);
}
ress.dictBufferSize = FIO_createDictBuffer(&ress.dictBuffer, dictFileName, prefs, &ress.dictFileStat); /* works with dictFileName==NULL */

ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_CStreamOutSize());
ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_CStreamInSize());

/* Advanced parameters, including dictionary */
if (dictFileName && (ress.dictBuffer==NULL))
if (dictFileName && (ress.dict.dictBuffer==NULL))
EXM_THROW(32, "allocation error : can't create dictBuffer");
ress.dictFileName = dictFileName;

Expand Down Expand Up @@ -1032,17 +1121,17 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
#endif
/* dictionary */
if (prefs->patchFromMode) {
CHECK( ZSTD_CCtx_refPrefix(ress.cctx, ress.dictBuffer, ress.dictBufferSize) );
CHECK( ZSTD_CCtx_refPrefix(ress.cctx, ress.dict.dictBuffer, ress.dict.dictBufferSize) );
} else {
CHECK( ZSTD_CCtx_loadDictionary(ress.cctx, ress.dictBuffer, ress.dictBufferSize) );
CHECK( ZSTD_CCtx_loadDictionary_byReference(ress.cctx, ress.dict.dictBuffer, ress.dict.dictBufferSize) );
}

return ress;
}

static void FIO_freeCResources(const cRess_t* const ress)
{
free(ress->dictBuffer);
FIO_freeDict(&(ress->dict));
AIO_WritePool_free(ress->writeCtx);
AIO_ReadPool_free(ress->readCtx);
ZSTD_freeCStream(ress->cctx); /* never fails */
Expand Down Expand Up @@ -2043,43 +2132,60 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
* Decompression
***************************************************************************/
typedef struct {
FIO_Dict_t dict;
ZSTD_DStream* dctx;
WritePoolCtx_t *writeCtx;
ReadPoolCtx_t *readCtx;
} dRess_t;

static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName)
{
int useMMap = prefs->mmapDict == ZSTD_ps_enable;
int forceNoUseMMap = prefs->mmapDict == ZSTD_ps_disable;
stat_t statbuf;
dRess_t ress;
memset(&ress, 0, sizeof(ress));

if (prefs->patchFromMode)
FIO_adjustMemLimitForPatchFromMode(prefs, UTIL_getFileSize(dictFileName), 0 /* just use the dict size */);
FIO_getDictFileStat(dictFileName, &statbuf);

if (prefs->patchFromMode){
U64 const dictSize = UTIL_getFileSizeStat(&statbuf);
useMMap |= dictSize > prefs->memLimit;
FIO_adjustMemLimitForPatchFromMode(prefs, dictSize, 0 /* just use the dict size */);
}

/* Allocation */
ress.dict.dictBufferType = (useMMap && !forceNoUseMMap) ? FIO_mmapDict : FIO_mallocDict;
ress.dctx = ZSTD_createDStream();
if (ress.dctx==NULL)
EXM_THROW(60, "Error: %s : can't create ZSTD_DStream", strerror(errno));
CHECK( ZSTD_DCtx_setMaxWindowSize(ress.dctx, prefs->memLimit) );
CHECK( ZSTD_DCtx_setParameter(ress.dctx, ZSTD_d_forceIgnoreChecksum, !prefs->checksumFlag));

/* dictionary */
{ void* dictBuffer;
stat_t statbuf;
size_t const dictBufferSize = FIO_createDictBuffer(&dictBuffer, dictFileName, prefs, &statbuf);
CHECK( ZSTD_DCtx_reset(ress.dctx, ZSTD_reset_session_only) );
CHECK( ZSTD_DCtx_loadDictionary(ress.dctx, dictBuffer, dictBufferSize) );
free(dictBuffer);
{ if (ress.dict.dictBufferType == FIO_mallocDict) {
ress.dict.dictBufferSize = FIO_createDictBuffer(&ress.dict.dictBuffer, dictFileName, prefs, &statbuf);
} else {
ress.dict.dictBufferSize = FIO_createDictBufferMMap(&ress.dict.dictBuffer, dictFileName, prefs, &statbuf);
}

CHECK(ZSTD_DCtx_reset(ress.dctx, ZSTD_reset_session_only) );

if (prefs->patchFromMode){
CHECK(ZSTD_DCtx_refPrefix(ress.dctx, ress.dict.dictBuffer, ress.dict.dictBufferSize));
} else {
CHECK(ZSTD_DCtx_loadDictionary_byReference(ress.dctx, ress.dict.dictBuffer, ress.dict.dictBufferSize));
}
}

ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_DStreamOutSize());
ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_DStreamInSize());

return ress;
}

static void FIO_freeDResources(dRess_t ress)
{
FIO_freeDict(&(ress.dict));
CHECK( ZSTD_freeDStream(ress.dctx) );
AIO_WritePool_free(ress.writeCtx);
AIO_ReadPool_free(ress.readCtx);
Expand Down Expand Up @@ -2655,6 +2761,8 @@ int FIO_decompressFilename(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs,

int const decodingError = FIO_decompressSrcFile(fCtx, prefs, ress, dstFileName, srcFileName);



FIO_freeDResources(ress);
return decodingError;
}
Expand Down
1 change: 1 addition & 0 deletions programs/fileio.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ void FIO_setContentSize(FIO_prefs_t* const prefs, int value);
void FIO_displayCompressionParameters(const FIO_prefs_t* prefs);
void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, int value);
void FIO_setPassThroughFlag(FIO_prefs_t* const prefs, int value);
void FIO_setMMapDict(FIO_prefs_t* const prefs, ZSTD_paramSwitch_e value);

/* FIO_ctx_t functions */
void FIO_setNbFilesTotal(FIO_ctx_t* const fCtx, int value);
Expand Down
9 changes: 9 additions & 0 deletions programs/fileio_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ typedef struct FIO_prefs_s {
int contentSize;
int allowBlockDevices;
int passThrough;
ZSTD_paramSwitch_e mmapDict;
} FIO_prefs_t;

typedef enum {FIO_mallocDict, FIO_mmapDict} FIO_dictBufferType_t;

typedef struct {
void* dictBuffer;
size_t dictBufferSize;
FIO_dictBufferType_t dictBufferType;
} FIO_Dict_t;

#endif /* FILEIO_TYPES_HEADER */
5 changes: 5 additions & 0 deletions programs/zstdcli.c
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ static void usage_advanced(const char* programName)

DISPLAYOUT("\n");
DISPLAYOUT(" --format=zstd Compress files to the `.zst` format. [Default]\n");
DISPLAYOUT(" --mmap-dict Memory-map dictionary file rather than mallocing and loading all at once");
#ifdef ZSTD_GZCOMPRESS
DISPLAYOUT(" --format=gzip Compress files to the `.gz` format.\n");
#endif
Expand Down Expand Up @@ -851,6 +852,7 @@ int main(int argCount, const char* argv[])
ultra=0,
contentSize=1,
removeSrcFile=0;
ZSTD_paramSwitch_e mmapDict=ZSTD_ps_auto;
ZSTD_paramSwitch_e useRowMatchFinder = ZSTD_ps_auto;
FIO_compressionType_t cType = FIO_zstdCompression;
unsigned nbWorkers = 0;
Expand Down Expand Up @@ -984,6 +986,8 @@ int main(int argCount, const char* argv[])
if (longCommandWArg(&argument, "--adapt=")) { adapt = 1; if (!parseAdaptParameters(argument, &adaptMin, &adaptMax)) { badusage(programName); CLEAN_RETURN(1); } continue; }
if (!strcmp(argument, "--single-thread")) { nbWorkers = 0; singleThread = 1; continue; }
if (!strcmp(argument, "--format=zstd")) { suffix = ZSTD_EXTENSION; cType = FIO_zstdCompression; continue; }
if (!strcmp(argument, "--mmap-dict")) { mmapDict = ZSTD_ps_enable; continue; }
if (!strcmp(argument, "--no-mmap-dict")) { mmapDict = ZSTD_ps_disable; continue; }
#ifdef ZSTD_GZCOMPRESS
if (!strcmp(argument, "--format=gzip")) { suffix = GZ_EXTENSION; cType = FIO_gzipCompression; continue; }
if (exeNameMatch(programName, ZSTD_GZ)) { /* behave like gzip */
Expand Down Expand Up @@ -1526,6 +1530,7 @@ int main(int argCount, const char* argv[])
FIO_setNotificationLevel(g_displayLevel);
FIO_setAllowBlockDevices(prefs, allowBlockDevices);
FIO_setPatchFromMode(prefs, patchFromDictFileName != NULL);
FIO_setMMapDict(prefs, mmapDict);
if (memLimit == 0) {
if (compressionParams.windowLog == 0) {
memLimit = (U32)1 << g_defaultMaxWindowLog;
Expand Down