Skip to content

Commit

Permalink
TLS: Implement support for write barrier.
Browse files Browse the repository at this point in the history
  • Loading branch information
oranagra authored and yossigo committed Oct 7, 2019
1 parent 5a47794 commit 6b62948
Show file tree
Hide file tree
Showing 11 changed files with 136 additions and 31 deletions.
10 changes: 3 additions & 7 deletions TLS.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,22 +57,18 @@ Connections
Connection abstraction API is mostly done and seems to hold well for hiding
implementation details between TLS and TCP.

1. Still need to implement the equivalent of AE_BARRIER. Because TLS
socket-level read/write events don't correspond to logical operations, this
should probably be done at the Read/Write handler level.

2. Multi-threading I/O is not supported. The main issue to address is the need
1. Multi-threading I/O is not supported. The main issue to address is the need
to manipulate AE based on OpenSSL return codes. We can either propagate this
out of the thread, or explore ways of further optimizing MT I/O by having
event loops that live inside the thread and borrow connections in/out.

3. Finish cleaning up the implementation. Make sure all error cases are handled
2. Finish cleaning up the implementation. Make sure all error cases are handled
and reflected into connection state, connection state validated before
certain operations, etc.
- Clean (non-errno) interface to report would-block.
- Consistent error reporting.

4. Sync IO for TLS is currently implemented in a hackish way, i.e. making the
3. Sync IO for TLS is currently implemented in a hackish way, i.e. making the
socket blocking and configuring socket-level timeout. This means the timeout
value may not be so accurate, and there would be a lot of syscall overhead.
However I believe that getting rid of syncio completely in favor of pure
Expand Down
4 changes: 4 additions & 0 deletions src/aof.c
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,10 @@ void flushAppendOnlyFile(int force) {
* there is much to do about the whole server stopping for power problems
* or alike */

if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
usleep(server.aof_flush_sleep);
}

latencyStartMonitor(latency);
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf));
latencyEndMonitor(latency);
Expand Down
2 changes: 1 addition & 1 deletion src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -2277,7 +2277,7 @@ void clusterReadHandler(connection *conn) {
* from event handlers that will do stuff with the same link later. */
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
if (sdslen(link->sndbuf) == 0 && msglen != 0)
connSetWriteHandler(link->conn, clusterWriteHandler); /* TODO: Handle AE_BARRIER in conns */
connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);

link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);

Expand Down
32 changes: 29 additions & 3 deletions src/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,14 @@ static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_hand
/* Register a write handler, to be called when the connection is writable.
* If NULL, the existing handler is removed.
*/
static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
if (func == conn->write_handler) return C_OK;

conn->write_handler = func;
if (barrier)
conn->flags |= CONN_FLAG_WRITE_BARRIER;
else
conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
if (!conn->write_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
else
Expand Down Expand Up @@ -247,13 +251,35 @@ static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientD
conn->conn_handler = NULL;
}

/* Normally we execute the readable event first, and the writable
* event laster. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if WRITE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsynching a file to disk,
* before replying to a client. */
int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;

int call_write = (mask & AE_WRITABLE) && conn->write_handler;
int call_read = (mask & AE_READABLE) && conn->read_handler;

/* Handle normal I/O flows */
if ((mask & AE_READABLE) && conn->read_handler) {
if (!invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
if ((mask & AE_WRITABLE) && conn->write_handler) {
/* Fire the writable event. */
if (call_write) {
if (!callHandler(conn, conn->write_handler)) return;
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
}

static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) {
Expand Down
14 changes: 12 additions & 2 deletions src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ typedef enum {

#define CONN_FLAG_IN_HANDLER (1<<0) /* A handler execution is in progress */
#define CONN_FLAG_CLOSE_SCHEDULED (1<<1) /* Closed scheduled by a handler */
#define CONN_FLAG_WRITE_BARRIER (1<<2) /* Write barrier requested */

typedef void (*ConnectionCallbackFunc)(struct connection *conn);

Expand All @@ -57,7 +58,7 @@ typedef struct ConnectionType {
int (*read)(struct connection *conn, void *buf, size_t buf_len);
void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
const char *(*get_last_error)(struct connection *conn);
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
Expand Down Expand Up @@ -144,7 +145,7 @@ static inline int connRead(connection *conn, void *buf, size_t buf_len) {
* If NULL, the existing handler is removed.
*/
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
return conn->type->set_write_handler(conn, func);
return conn->type->set_write_handler(conn, func, 0);
}

/* Register a read handler, to be called when the connection is readable.
Expand All @@ -154,6 +155,15 @@ static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc fu
return conn->type->set_read_handler(conn, func);
}

/* Set a write handler, and possibly enable a write barrier, this flag is
* cleared when write handler is changed or removed.
* With barroer enabled, we never fire the event if the read handler already
* fired in the same event loop iteration. Useful when you want to persist
* things to disk before sending replies, and want to do that in a group fashion. */
static inline int connSetWriteHandlerWithBarrier(connection *conn, ConnectionCallbackFunc func, int barrier) {
return conn->type->set_write_handler(conn, func, barrier);
}

static inline void connClose(connection *conn) {
conn->type->close(conn);
}
Expand Down
6 changes: 6 additions & 0 deletions src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ void debugCommand(client *c) {
"SDSLEN <key> -- Show low level SDS string info representing key and value.",
"SEGFAULT -- Crash the server with sigsegv.",
"SET-ACTIVE-EXPIRE <0|1> -- Setting it to 0 disables expiring keys in background when they are not accessed (otherwise the Redis behavior). Setting it to 1 reenables back the default.",
"AOF-FLUSH-SLEEP <microsec> -- Server will sleep before flushing the AOF, this is used for testing",
"SLEEP <seconds> -- Stop the server for <seconds>. Decimals allowed.",
"STRUCTSIZE -- Return the size of different Redis core C structures.",
"ZIPLIST <key> -- Show low level info about the ziplist encoding.",
Expand Down Expand Up @@ -595,6 +596,11 @@ NULL
{
server.active_expire_enabled = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"aof-flush-sleep") &&
c->argc == 3)
{
server.aof_flush_sleep = atoi(c->argv[2]->ptr);
addReply(c,shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr,"lua-always-replicate-commands") &&
c->argc == 3)
{
Expand Down
9 changes: 4 additions & 5 deletions src/networking.c
Original file line number Diff line number Diff line change
Expand Up @@ -1305,19 +1305,18 @@ int handleClientsWithPendingWrites(void) {
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
int ae_barrier = 0;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. AE_BARRIER ensures that. */
* actual fsync of AOF to disk. the write barrier ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_flags |= AE_BARRIER;
ae_barrier = 1;
}
/* TODO: Handle write barriers in connection (also see tlsProcessPendingData) */
if (connSetWriteHandler(c->conn, sendReplyToClient) == C_ERR) {
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
freeClientAsync(c);
}
}
Expand Down
11 changes: 6 additions & 5 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2048,6 +2048,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);

/* Handle TLS pending data. (must be done before flushAppendOnlyFile) */
tlsProcessPendingData();
/* If tls still has pending unread data don't sleep at all. */
aeDontWait(server.el, tlsHasPendingData());

/* Call the Redis Cluster before sleep function. Note that this function
* may change the state of Redis Cluster (from ok to fail or vice versa),
* so it's a good idea to call it before serving the unblocked clients
Expand Down Expand Up @@ -2093,11 +2098,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();

/* TODO: How do i handle write barriers flag */
tlsProcessPendingData();
/* If tls already has pending unread data don't sleep at all. */
aeDontWait(server.el, tlsHasPendingData());

/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();

Expand Down Expand Up @@ -2286,6 +2286,7 @@ void initServerConfig(void) {
server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE;
server.aof_rewrite_base_size = 0;
server.aof_rewrite_scheduled = 0;
server.aof_flush_sleep = 0;
server.aof_last_fsync = time(NULL);
server.aof_rewrite_time_last = -1;
server.aof_rewrite_time_start = -1;
Expand Down
1 change: 1 addition & 0 deletions src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -1192,6 +1192,7 @@ struct redisServer {
off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */
off_t aof_current_size; /* AOF current size. */
off_t aof_fsync_offset; /* AOF offset which is already synced to disk. */
int aof_flush_sleep; /* Micros to sleep before flush. (used by tests) */
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
pid_t aof_child_pid; /* PID if rewriting process */
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */
Expand Down
47 changes: 39 additions & 8 deletions src/tls.c
Original file line number Diff line number Diff line change
Expand Up @@ -361,18 +361,47 @@ static void tlsHandleEvent(tls_connection *conn, int mask) {
conn->c.conn_handler = NULL;
break;
case CONN_STATE_CONNECTED:
if ((mask & AE_READABLE) && (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ)) {
{
int call_read = ((mask & AE_READABLE) && conn->c.read_handler) ||
((mask & AE_WRITABLE) && (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE));
int call_write = ((mask & AE_WRITABLE) && conn->c.write_handler) ||
((mask & AE_READABLE) && (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ));

/* Normally we execute the readable event first, and the writable
* event laster. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if WRITE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsynching a file to disk,
* before replying to a client. */
int invert = conn->c.flags & CONN_FLAG_WRITE_BARRIER;

if (!invert && call_read) {
conn->flags &= ~TLS_CONN_FLAG_READ_WANT_WRITE;
if (!callHandler((connection *) conn, conn->c.read_handler)) return;
}

/* Fire the writable event. */
if (call_write) {
conn->flags &= ~TLS_CONN_FLAG_WRITE_WANT_READ;
if (!callHandler((connection *) conn, conn->c.write_handler)) return;
}

if ((mask & AE_WRITABLE) && (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE)) {
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && call_read) {
conn->flags &= ~TLS_CONN_FLAG_READ_WANT_WRITE;
if (!callHandler((connection *) conn, conn->c.read_handler)) return;
}

if ((mask & AE_READABLE) && conn->c.read_handler) {
if (!callHandler((connection *) conn, conn->c.read_handler)) return;
/* If SSL has pending that, already read from the socket, we're at
* risk of not calling the read handler again, make sure to add it
* to a list of pending connection that should be handled anyway. */
if ((mask & AE_READABLE)) {
if (SSL_has_pending(conn->ssl)) {
if (!conn->pending_list_node) {
listAddNodeTail(pending_list, conn);
Expand All @@ -384,10 +413,8 @@ static void tlsHandleEvent(tls_connection *conn, int mask) {
}
}

if ((mask & AE_WRITABLE) && conn->c.write_handler) {
if (!callHandler((connection *) conn, conn->c.write_handler)) return;
}
break;
}
default:
break;
}
Expand Down Expand Up @@ -535,8 +562,12 @@ static const char *connTLSGetLastError(connection *conn_) {
return NULL;
}

int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
int connTLSSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
conn->write_handler = func;
if (barrier)
conn->flags |= CONN_FLAG_WRITE_BARRIER;
else
conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
updateSSLEvent((tls_connection *) conn);
return C_OK;
}
Expand Down
31 changes: 31 additions & 0 deletions tests/integration/aof.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,35 @@ tags {"aof"} {
r expire x -1
}
}

start_server {overrides {appendonly {yes} appendfilename {appendonly.aof} appendfsync always}} {
test {AOF fsync always barrier issue} {
set rd [redis_deferring_client]
# Set a sleep when aof is flushed, so that we have a chance to look
# at the aof size and detect if the response of an incr command
# arrives before the data was written (and hopefully fsynced)
# We create a big reply, which will hopefully not have room in the
# socket buffers, and will install a write handler, then we sleep
# a big and issue the incr command, hoping that the last portion of
# the output buffer write, and the processing of the incr will happen
# in the same event loop cycle.
# Since the socket buffers and timing are unpredictable, we fuzz this
# test with slightly different sizes and sleeps a few times.
for {set i 0} {$i < 10} {incr i} {
r debug aof-flush-sleep 0
r del x
r setrange x [expr {int(rand()*5000000)+10000000}] x
r debug aof-flush-sleep 500000
set aof [file join [lindex [r config get dir] 1] appendonly.aof]
set size1 [file size $aof]
$rd get x
after [expr {int(rand()*30)}]
$rd incr new_value
$rd read
$rd read
set size2 [file size $aof]
assert {$size1 != $size2}
}
}
}
}

0 comments on commit 6b62948

Please sign in to comment.