Skip to content

Commit

Permalink
Use narrower mechanisms to interrupt kqueue and epoll.
Browse files Browse the repository at this point in the history
  • Loading branch information
rmacnak committed Oct 6, 2024
1 parent 343c2f4 commit 45e1934
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 85 deletions.
66 changes: 24 additions & 42 deletions vm/message_loop_epoll.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
#include "vm/message_loop.h"

#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/timerfd.h>
#include <unistd.h>

Expand All @@ -19,38 +19,20 @@

namespace psoup {

static bool SetBlockingHelper(intptr_t fd, bool blocking) {
intptr_t status;
status = fcntl(fd, F_GETFL);
if (status < 0) {
perror("fcntl(F_GETFL) failed");
return false;
}
status = blocking ? (status & ~O_NONBLOCK) : (status | O_NONBLOCK);
if (fcntl(fd, F_SETFL, status) < 0) {
perror("fcntl(F_SETFL, O_NONBLOCK) failed");
return false;
}
return true;
}

EPollMessageLoop::EPollMessageLoop(Isolate* isolate)
: MessageLoop(isolate),
mutex_(),
head_(nullptr),
tail_(nullptr),
wakeup_(0) {
int result = pipe(interrupt_fds_);
if (result != 0) {
FATAL("Failed to create pipe");
}
if (!SetBlockingHelper(interrupt_fds_[0], false)) {
FATAL("Failed to set pipe fd non-blocking\n");
event_fd_ = eventfd(0, EFD_CLOEXEC);
if (event_fd_ == -1) {
FATAL("Failed to create eventfd");
}

timer_fd_ = timerfd_create(CLOCK_MONOTONIC, TFD_CLOEXEC);
if (timer_fd_ == -1) {
FATAL("Failed to creater timer_fd");
FATAL("Failed to creater timerfd");
}

epoll_fd_ = epoll_create(64);
Expand All @@ -60,25 +42,24 @@ EPollMessageLoop::EPollMessageLoop(Isolate* isolate)

struct epoll_event event;
event.events = EPOLLIN;
event.data.fd = interrupt_fds_[0];
int status = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fds_[0], &event);
event.data.fd = event_fd_;
int status = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &event);
if (status == -1) {
FATAL("Failed to add pipe to epoll");
FATAL("Failed to add eventfd to epoll");
}

event.events = EPOLLIN;
event.data.fd = timer_fd_;
status = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &event);
if (status == -1) {
FATAL("Failed to add timer_fd to epoll");
FATAL("Failed to add timerfd to epoll");
}
}

EPollMessageLoop::~EPollMessageLoop() {
close(epoll_fd_);
close(timer_fd_);
close(interrupt_fds_[0]);
close(interrupt_fds_[1]);
close(event_fd_);
}

intptr_t EPollMessageLoop::AwaitSignal(intptr_t fd, intptr_t signals) {
Expand Down Expand Up @@ -139,10 +120,10 @@ void EPollMessageLoop::PostMessage(IsolateMessage* message) {
}

void EPollMessageLoop::Notify() {
uword message = 0;
ssize_t written = write(interrupt_fds_[1], &message, sizeof(message));
if (written != sizeof(message)) {
FATAL("Failed to atomically write notify message");
uint64_t value = 1;
ssize_t written = write(event_fd_, &value, sizeof(value));
if (written != sizeof(value)) {
FATAL("Failed to notify");
}
}

Expand All @@ -165,17 +146,18 @@ intptr_t EPollMessageLoop::Run() {
}
} else {
for (int i = 0; i < result; i++) {
if (events[i].data.fd == interrupt_fds_[0]) {
// Interrupt fd.
uword message = 0;
ssize_t red = read(interrupt_fds_[0], &message, sizeof(message));
if (red != sizeof(message)) {
FATAL("Failed to atomically read notify message");
if (events[i].data.fd == event_fd_) {
uint64_t value;
ssize_t red = read(event_fd_, &value, sizeof(value));
if (red != sizeof(value)) {
FATAL("Failed to read eventfd");
}
} else if (events[i].data.fd == timer_fd_) {
int64_t value;
ssize_t ignore = read(timer_fd_, &value, sizeof(value));
(void)ignore;
uint64_t value;
ssize_t red = read(timer_fd_, &value, sizeof(value));
if (red != sizeof(value)) {
FATAL("Failed to read timerfd");
}
DispatchWakeup();
} else {
intptr_t fd = events[i].data.fd;
Expand Down
2 changes: 1 addition & 1 deletion vm/message_loop_epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class EPollMessageLoop : public MessageLoop {
IsolateMessage* head_;
IsolateMessage* tail_;
int64_t wakeup_;
int interrupt_fds_[2];
int event_fd_;
int timer_fd_;
int epoll_fd_;

Expand Down
52 changes: 11 additions & 41 deletions vm/message_loop_kqueue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#include "vm/message_loop.h"

#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <sys/event.h>
#include <sys/time.h>
Expand All @@ -20,52 +19,27 @@

namespace psoup {

static bool SetBlockingHelper(intptr_t fd, bool blocking) {
intptr_t status;
status = fcntl(fd, F_GETFL);
if (status < 0) {
perror("fcntl(F_GETFL) failed");
return false;
}
status = blocking ? (status & ~O_NONBLOCK) : (status | O_NONBLOCK);
if (fcntl(fd, F_SETFL, status) < 0) {
perror("fcntl(F_SETFL, O_NONBLOCK) failed");
return false;
}
return true;
}

KQueueMessageLoop::KQueueMessageLoop(Isolate* isolate)
: MessageLoop(isolate),
mutex_(),
head_(nullptr),
tail_(nullptr),
wakeup_(0) {
int result = pipe(interrupt_fds_);
if (result != 0) {
FATAL("Failed to create pipe");
}
if (!SetBlockingHelper(interrupt_fds_[0], false)) {
FATAL("Failed to set pipe fd non-blocking\n");
}

kqueue_fd_ = kqueue();
if (kqueue_fd_ == -1) {
FATAL("Failed to create kqueue");
}

struct kevent event;
EV_SET(&event, interrupt_fds_[0], EVFILT_READ, EV_ADD, 0, 0, nullptr);
int status = kevent(kqueue_fd_, &event, 1, nullptr, 0, nullptr);
if (status == -1) {
FATAL("Failed to add pipe to kqueue");
EV_SET(&event, 0, EVFILT_USER, EV_ADD | EV_ENABLE, 0, 0, nullptr);
int result = kevent(kqueue_fd_, &event, 1, nullptr, 0, nullptr);
if (result == -1) {
FATAL("Failed to register notify event");
}
}

KQueueMessageLoop::~KQueueMessageLoop() {
close(kqueue_fd_);
close(interrupt_fds_[0]);
close(interrupt_fds_[1]);
}

intptr_t KQueueMessageLoop::AwaitSignal(intptr_t fd, intptr_t signals) {
Expand Down Expand Up @@ -124,10 +98,11 @@ void KQueueMessageLoop::PostMessage(IsolateMessage* message) {
}

void KQueueMessageLoop::Notify() {
uword message = 0;
ssize_t written = write(interrupt_fds_[1], &message, sizeof(message));
if (written != sizeof(message)) {
FATAL("Failed to atomically write notify message");
struct kevent event;
EV_SET(&event, 0, EVFILT_USER, 0, NOTE_TRIGGER, 0, nullptr);
int result = kevent(kqueue_fd_, &event, 1, nullptr, 0, nullptr);
if (result != 0) {
FATAL("Failed to notify");
}
}

Expand Down Expand Up @@ -169,13 +144,8 @@ intptr_t KQueueMessageLoop::Run() {
if ((events[i].flags & EV_ERROR) != 0) {
FATAL("kevent failed\n");
}
if (events[i].udata == nullptr) {
// Interrupt fd.
uword message = 0;
ssize_t red = read(interrupt_fds_[0], &message, sizeof(message));
if (red != sizeof(message)) {
FATAL("Failed to atomically read notify message");
}
if (events[i].filter == EVFILT_USER) {
ASSERT(events[i].udata == nullptr);
} else {
intptr_t fd = events[i].ident;
intptr_t pending = 0;
Expand Down
1 change: 0 additions & 1 deletion vm/message_loop_kqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class KQueueMessageLoop : public MessageLoop {
IsolateMessage* head_;
IsolateMessage* tail_;
int64_t wakeup_;
int interrupt_fds_[2];
int kqueue_fd_;

DISALLOW_COPY_AND_ASSIGN(KQueueMessageLoop);
Expand Down

0 comments on commit 45e1934

Please sign in to comment.