Skip to content

Commit

Permalink
Use dlmalloc to manage shared memory (#15)
Browse files Browse the repository at this point in the history
* Use dlmalloc to manage shared memory

* add stresstest
  • Loading branch information
rshin authored and robertnishihara committed Sep 10, 2016
1 parent 04737f3 commit d52bf7d
Show file tree
Hide file tree
Showing 12 changed files with 6,496 additions and 53 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
build/*
*~
*.pyc
2 changes: 1 addition & 1 deletion .travis/check-git-clang-format-output.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ else
base_commit="$TRAVIS_BRANCH"
echo "Running clang-format against branch $base_commit, with hash $(git rev-parse $base_commit)"
fi
output="$(.travis/git-clang-format --binary clang-format-3.8 --commit $base_commit --diff)"
output="$(.travis/git-clang-format --binary clang-format-3.8 --commit $base_commit --diff --exclude ^third_party/)"
if [ "$output" == "no modified files to format" ] || [ "$output" == "clang-format did not modify any files" ] ; then
echo "clang-format passed."
exit 0
Expand Down
7 changes: 6 additions & 1 deletion .travis/git-clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ def main():
default_extensions),
help=('comma-separated list of file extensions to format, '
'excluding the period and case-insensitive')),
p.add_argument('--exclude', help='Exclude files matching this regex.')
p.add_argument('-f', '--force', action='store_true',
help='allow changes to unstaged files')
p.add_argument('-p', '--patch', action='store_true',
Expand Down Expand Up @@ -125,10 +126,14 @@ def main():
if opts.verbose >= 1:
ignored_files = set(changed_lines)
filter_by_extension(changed_lines, opts.extensions.lower().split(','))
if opts.exclude:
for filename in changed_lines.keys():
if re.match(opts.exclude, filename):
del changed_lines[filename]
if opts.verbose >= 1:
ignored_files.difference_update(changed_lines)
if ignored_files:
print 'Ignoring changes in the following files (wrong extension):'
print 'Ignoring changes in the following files:'
for filename in ignored_files:
print ' ', filename
if changed_lines:
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
CC = gcc
CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500
CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -I.
BUILD = build

all: $(BUILD)/plasma_store $(BUILD)/plasma_manager $(BUILD)/plasma_client.so $(BUILD)/example

clean:
rm -r $(BUILD)/*

$(BUILD)/plasma_store: src/plasma_store.c src/plasma.h src/event_loop.h src/event_loop.c src/fling.h src/fling.c
$(CC) $(CFLAGS) src/plasma_store.c src/event_loop.c src/fling.c -o $(BUILD)/plasma_store
$(BUILD)/plasma_store: src/plasma_store.c src/plasma.h src/event_loop.h src/event_loop.c src/fling.h src/fling.c src/malloc.c src/malloc.h third_party/dlmalloc.c
$(CC) $(CFLAGS) src/plasma_store.c src/event_loop.c src/fling.c src/malloc.c -o $(BUILD)/plasma_store

$(BUILD)/plasma_manager: src/plasma_manager.c src/event_loop.h src/event_loop.c src/plasma.h src/plasma_client.c src/fling.h src/fling.c
$(CC) $(CFLAGS) src/plasma_manager.c src/event_loop.c src/plasma_client.c src/fling.c -o $(BUILD)/plasma_manager
Expand Down
1 change: 1 addition & 0 deletions src/fling.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ int send_fd(int conn, int fd, const char *payload, int size) {
struct msghdr msg;
struct iovec iov;
char buf[CMSG_SPACE(sizeof(int))];
memset(&buf, 0, CMSG_SPACE(sizeof(int)));

init_msg(&msg, &iov, buf, sizeof(buf));

Expand Down
123 changes: 123 additions & 0 deletions src/malloc.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#include <assert.h>
#include <stdlib.h>
#include <stddef.h>
#include <stdio.h>
#include <string.h>
#include <sys/mman.h>
#include <unistd.h>

#include "plasma.h"
#include "uthash.h"

void *fake_mmap(size_t);
int fake_munmap(void *, size_t);

#define MMAP(s) fake_mmap(s)
#define MUNMAP(a, s) fake_munmap(a, s)
#define DIRECT_MMAP(s) fake_mmap(s)
#define DIRECT_MUNMAP(a, s) fake_munmap(a, s)
#define USE_DL_PREFIX
#define HAVE_MORECORE 0

#include "third_party/dlmalloc.c"

#undef MMAP
#undef MUNMAP
#undef DIRECT_MMAP
#undef DIRECT_MUNMAP
#undef USE_DL_PREFIX
#undef HAVE_MORECORE

struct mmap_record {
int fd;
void *pointer;
int64_t size;
UT_hash_handle hh_fd;
UT_hash_handle hh_pointer;
};

struct mmap_record *records_by_fd = NULL;
struct mmap_record *records_by_pointer = NULL;

/* Create a buffer. This is creating a temporary file and then
* immediately unlinking it so we do not leave traces in the system. */
int create_buffer(int64_t size) {
static char template[] = "/tmp/plasmaXXXXXX";
char file_name[32];
strncpy(file_name, template, 32);
int fd = mkstemp(file_name);
if (fd < 0)
return -1;
FILE *file = fdopen(fd, "a+");
if (!file) {
close(fd);
return -1;
}
if (unlink(file_name) != 0) {
LOG_ERR("unlink error");
return -1;
}
if (ftruncate(fd, (off_t) size) != 0) {
LOG_ERR("ftruncate error");
return -1;
}
return fd;
}

void *fake_mmap(size_t size) {
// Add sizeof(size_t) so that the returned pointer is deliberately not
// page-aligned. This ensures that the segments of memory returned by
// fake_mmap are never contiguous.
int fd = create_buffer(size + sizeof(size_t));
void *pointer = mmap(NULL, size + sizeof(size_t), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (pointer == MAP_FAILED) {
return pointer;
}
pointer += sizeof(size_t);

struct mmap_record *record = malloc(sizeof(struct mmap_record));
record->fd = fd;
record->pointer = pointer;
record->size = size;
HASH_ADD(hh_fd, records_by_fd, fd, sizeof(fd), record);
HASH_ADD(hh_pointer, records_by_pointer, pointer, sizeof(pointer), record);

LOG_DEBUG("%p = fake_mmap(%lu)", pointer, size);
return pointer;
}

int fake_munmap(void *addr, size_t size) {
LOG_DEBUG("fake_munmap(%p, %lu)", addr, size);

struct mmap_record *record;

addr -= sizeof(size_t);

This comment has been minimized.

Copy link
@mehrdadn

mehrdadn Nov 13, 2016

Contributor

This line is illegal (can't subtract from void *)

HASH_FIND(hh_pointer, records_by_pointer, &addr, sizeof(addr), record);
assert(record != NULL);
close(record->fd);

HASH_DELETE(hh_fd, records_by_fd, record);
HASH_DELETE(hh_pointer, records_by_pointer, record);

return munmap(addr, size + sizeof(size_t));
}

void get_malloc_mapinfo(void *addr,
int *fd,
int64_t *map_size,
ptrdiff_t *offset) {
struct mmap_record *record;
// TODO(rshin): Implement a more efficient search through records_by_fd.
for (record = records_by_fd; record != NULL; record = record->hh_fd.next) {
if (addr >= record->pointer && addr < record->pointer + record->size) {
*fd = record->fd;
*map_size = record->size;
*offset = addr - record->pointer;

This comment has been minimized.

Copy link
@mehrdadn

mehrdadn Nov 13, 2016

Contributor

This line is illegal (can't subtract from void *)

return;
}
}
*fd = -1;
*map_size = 0;
*offset = 0;
}
9 changes: 9 additions & 0 deletions src/malloc.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#ifndef MALLOC_H
#define MALLOC_H

void get_malloc_mapinfo(void *addr,
int *fd,
int64_t *map_length,
ptrdiff_t *offset);

#endif // MALLOC_H
15 changes: 11 additions & 4 deletions src/plasma.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <inttypes.h>
#include <stdio.h>
#include <errno.h>
#include <stddef.h>
#include <string.h>

#ifdef NDEBUG
Expand All @@ -13,13 +14,17 @@
fprintf(stderr, "[DEBUG] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
#endif

#ifdef PLASMA_LOGGIN_ON
#define LOG_INFO(M, ...) \
fprintf(stderr, "[INFO] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
#else
#define LOG_INFO(M, ...)
#endif

#define LOG_ERR(M, ...) \
fprintf(stderr, "[ERROR] (%s:%d: errno: %s) " M "\n", __FILE__, __LINE__, \
errno == 0 ? "None" : strerror(errno), ##__VA_ARGS__)

#define LOG_INFO(M, ...) \
fprintf(stderr, "[INFO] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)

typedef struct {
int64_t size;
int64_t create_time;
Expand Down Expand Up @@ -59,7 +64,9 @@ enum plasma_reply_type {

typedef struct {
int type;
int64_t size;
ptrdiff_t offset;
int64_t map_size;
int64_t object_size;
} plasma_reply;

typedef struct {
Expand Down
19 changes: 13 additions & 6 deletions src/plasma_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@ void plasma_create(int conn, plasma_id object_id, int64_t size, void **data) {
plasma_reply reply;
int fd = recv_fd(conn, (char *) &reply, sizeof(plasma_reply));
assert(reply.type == PLASMA_OBJECT);
assert(reply.size == size);
*data = mmap(NULL, reply.size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
assert(reply.object_size == size);
*data =
mmap(NULL, reply.map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0) +
reply.offset;
if (*data == MAP_FAILED) {
LOG_ERR("mmap failed");
exit(-1);
}
close(fd);
}

void plasma_get(int conn, plasma_id object_id, int64_t *size, void **data) {
Expand All @@ -51,12 +54,14 @@ void plasma_get(int conn, plasma_id object_id, int64_t *size, void **data) {
fd = new_fd;
}
assert(reply.type == PLASMA_OBJECT);
*data = mmap(NULL, reply.size, PROT_READ, MAP_SHARED, fd, 0);
*data =
mmap(NULL, reply.map_size, PROT_READ, MAP_SHARED, fd, 0) + reply.offset;
if (*data == MAP_FAILED) {
LOG_ERR("mmap failed");
exit(-1);
}
*size = reply.size;
close(fd);
*size = reply.object_size;
}

void plasma_seal(int fd, plasma_id object_id) {
Expand Down Expand Up @@ -116,8 +121,10 @@ int plasma_manager_connect(const char *ip_addr, int port) {

int r = connect(fd, (struct sockaddr *) &addr, sizeof(addr));
if (r < 0) {
LOG_ERR("could not establish connection to manager with id %s:%d",
&ip_addr[0], port);
LOG_ERR(
"could not establish connection to manager with id %s:%d (probably ran "
"out of ports)",
&ip_addr[0], port);
exit(-1);
}
return fd;
Expand Down
Loading

0 comments on commit d52bf7d

Please sign in to comment.