Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mmap large dictionaries in patch-from mode #3486

Merged
126 changes: 114 additions & 12 deletions programs/fileio.c
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,72 @@ static size_t FIO_createDictBuffer(void** bufferPtr, const char* fileName, FIO_p
return (size_t)fileSize;
}

#if (PLATFORM_POSIX_VERSION > 0)
#include <sys/mman.h>
static void* FIO_mmap(size_t fileSize, int fileHandle)
{
return mmap
(NULL, (size_t)fileSize, PROT_READ, MAP_PRIVATE, fileHandle, 0);
}
static int FIO_munmap(void* buffer, size_t bufferSize)
{
return munmap(buffer, bufferSize);
}
/* We might want to also do mapping for windows */
static size_t FIO_createDictBufferMMap(void** bufferPtr, const char* fileName, FIO_prefs_t* const prefs, stat_t* dictFileStat)
{
int fileHandle;
U64 fileSize;

assert(bufferPtr != NULL);
assert(dictFileStat != NULL);
*bufferPtr = NULL;
if (fileName == NULL) return 0;

DISPLAYLEVEL(4,"Loading %s as dictionary \n", fileName);

if (!UTIL_stat(fileName, dictFileStat)) {
EXM_THROW(31, "Stat failed on dictionary file %s: %s", fileName, strerror(errno));
}

if (!UTIL_isRegularFileStat(dictFileStat)) {
EXM_THROW(32, "Dictionary %s must be a regular file.", fileName);
}

fileHandle = open(fileName, O_RDONLY);

if (fileHandle == -1) {
EXM_THROW(33, "Couldn't open dictionary %s: %s", fileName, strerror(errno));
}

fileSize = UTIL_getFileSizeStat(dictFileStat);
{
size_t const dictSizeMax = prefs->patchFromMode ? prefs->memLimit : DICTSIZE_MAX;
if (fileSize > dictSizeMax) {
EXM_THROW(34, "Dictionary file %s is too large (> %u bytes)",
fileName, (unsigned)dictSizeMax); /* avoid extreme cases */
}
}

*bufferPtr = FIO_mmap((size_t)fileSize, fileHandle);

close(fileHandle);
return (size_t)fileSize;
}
static void FIO_munmapDictBuffer(void* dictBuffer, size_t dictBufferSize) {
FIO_munmap(dictBuffer, dictBufferSize);
}
#else
static size_t FIO_createDictBufferMMap(void** bufferPtr, const char* fileName, FIO_prefs_t* const prefs, stat_t* dictFileStat)
{
return FIO_createDictBuffer(bufferPtr, fileName, prefs, dictFileStat);
}
static void FIO_munmapDictBuffer(void* dictBuffer, size_t dictBufferSize) {
(void)dictBufferSize;
free(dictBuffer);
}
#endif



/* FIO_checkFilenameCollisions() :
Expand Down Expand Up @@ -921,6 +987,7 @@ typedef struct {
ZSTD_CStream* cctx;
WritePoolCtx_t *writeCtx;
ReadPoolCtx_t *readCtx;
int mmapDict;
} cRess_t;

/** ZSTD_cycleLog() :
Expand Down Expand Up @@ -961,6 +1028,7 @@ static void FIO_adjustParamsForPatchFromMode(FIO_prefs_t* const prefs,
static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
const char* dictFileName, unsigned long long const maxSrcFileSize,
int cLevel, ZSTD_compressionParameters comprParams) {
int mmapDict = 0;
cRess_t ress;
memset(&ress, 0, sizeof(ress));

Expand All @@ -973,10 +1041,19 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
/* need to update memLimit before calling createDictBuffer
* because of memLimit check inside it */
if (prefs->patchFromMode) {
U64 const dictSize = UTIL_getFileSize(dictFileName);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should reorganize this so that we aren't stat-ing a file multiple times for patch-from.

unsigned long long const ssSize = (unsigned long long)prefs->streamSrcSize;
FIO_adjustParamsForPatchFromMode(prefs, &comprParams, UTIL_getFileSize(dictFileName), ssSize > 0 ? ssSize : maxSrcFileSize, cLevel);
mmapDict = dictSize > prefs->memLimit;
FIO_adjustParamsForPatchFromMode(prefs, &comprParams, dictSize, ssSize > 0 ? ssSize : maxSrcFileSize, cLevel);
}

ress.mmapDict = mmapDict;

if (!ress.mmapDict) {
ress.dictBufferSize = FIO_createDictBuffer(&ress.dictBuffer, dictFileName, prefs, &ress.dictFileStat); /* works with dictFileName==NULL */
} else {
ress.dictBufferSize = FIO_createDictBufferMMap(&ress.dictBuffer, dictFileName, prefs, &ress.dictFileStat);
}
ress.dictBufferSize = FIO_createDictBuffer(&ress.dictBuffer, dictFileName, prefs, &ress.dictFileStat); /* works with dictFileName==NULL */

ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_CStreamOutSize());
ress.readCtx = AIO_ReadPool_create(prefs, ZSTD_CStreamInSize());
Expand Down Expand Up @@ -1034,15 +1111,19 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs,
if (prefs->patchFromMode) {
CHECK( ZSTD_CCtx_refPrefix(ress.cctx, ress.dictBuffer, ress.dictBufferSize) );
} else {
CHECK( ZSTD_CCtx_loadDictionary(ress.cctx, ress.dictBuffer, ress.dictBufferSize) );
CHECK( ZSTD_CCtx_loadDictionary_byReference(ress.cctx, ress.dictBuffer, ress.dictBufferSize) );
}

return ress;
}

static void FIO_freeCResources(const cRess_t* const ress)
{
free(ress->dictBuffer);
if (!ress->mmapDict) {
free(ress->dictBuffer);
} else {
FIO_munmapDictBuffer(ress->dictBuffer, ress->dictBufferSize);
}
AIO_WritePool_free(ress->writeCtx);
AIO_ReadPool_free(ress->readCtx);
ZSTD_freeCStream(ress->cctx); /* never fails */
Expand Down Expand Up @@ -2043,33 +2124,49 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx,
* Decompression
***************************************************************************/
typedef struct {
void* dictBuffer;
size_t dictBufferSize;
ZSTD_DStream* dctx;
WritePoolCtx_t *writeCtx;
ReadPoolCtx_t *readCtx;
int mmapDict;
} dRess_t;

static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName)
{
int mmapDict = 0;
dRess_t ress;
memset(&ress, 0, sizeof(ress));

if (prefs->patchFromMode)
FIO_adjustMemLimitForPatchFromMode(prefs, UTIL_getFileSize(dictFileName), 0 /* just use the dict size */);
if (prefs->patchFromMode){
U64 const dictSize = UTIL_getFileSize(dictFileName);
mmapDict = dictSize > prefs->memLimit;
FIO_adjustMemLimitForPatchFromMode(prefs, dictSize, 0 /* just use the dict size */);
}

/* Allocation */
ress.mmapDict = mmapDict;
ress.dctx = ZSTD_createDStream();
if (ress.dctx==NULL)
EXM_THROW(60, "Error: %s : can't create ZSTD_DStream", strerror(errno));
CHECK( ZSTD_DCtx_setMaxWindowSize(ress.dctx, prefs->memLimit) );
CHECK( ZSTD_DCtx_setParameter(ress.dctx, ZSTD_d_forceIgnoreChecksum, !prefs->checksumFlag));

/* dictionary */
{ void* dictBuffer;
stat_t statbuf;
size_t const dictBufferSize = FIO_createDictBuffer(&dictBuffer, dictFileName, prefs, &statbuf);
CHECK( ZSTD_DCtx_reset(ress.dctx, ZSTD_reset_session_only) );
CHECK( ZSTD_DCtx_loadDictionary(ress.dctx, dictBuffer, dictBufferSize) );
free(dictBuffer);
{ stat_t statbuf;
if (!mmapDict) {
ress.dictBufferSize = FIO_createDictBuffer(&ress.dictBuffer, dictFileName, prefs, &statbuf);
} else {
ress.dictBufferSize = FIO_createDictBufferMMap(&ress.dictBuffer, dictFileName, prefs, &statbuf);
}

CHECK(ZSTD_DCtx_reset(ress.dctx, ZSTD_reset_session_only) );

if (prefs->patchFromMode){
CHECK(ZSTD_DCtx_refPrefix(ress.dctx, ress.dictBuffer, ress.dictBufferSize));
} else {
CHECK(ZSTD_DCtx_loadDictionary_byReference(ress.dctx, ress.dictBuffer, ress.dictBufferSize));
}
}

ress.writeCtx = AIO_WritePool_create(prefs, ZSTD_DStreamOutSize());
Expand All @@ -2080,6 +2177,11 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi

static void FIO_freeDResources(dRess_t ress)
{
if (!ress.mmapDict) {
daniellerozenblit marked this conversation as resolved.
Show resolved Hide resolved
free(ress.dictBuffer);
} else {
FIO_munmapDictBuffer(ress.dictBuffer, ress.dictBufferSize);
}
CHECK( ZSTD_freeDStream(ress.dctx) );
AIO_WritePool_free(ress.writeCtx);
AIO_ReadPool_free(ress.readCtx);
Expand Down