Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send websocket message in multiple fragments when needed. #2355

Merged
merged 2 commits into from
Sep 15, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 50 additions & 40 deletions transports/janus_websockets.c
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ typedef struct janus_websockets_client {
GAsyncQueue *messages; /* Queue of outgoing messages to push */
char *incoming; /* Buffer containing the incoming message to process (in case there are fragments) */
unsigned char *buffer; /* Buffer containing the message to send */
int buflen; /* Length of the buffer (may be resized after re-allocations) */
int bufpending; /* Data an interrupted previous write couldn't send */
int bufoffset; /* Offset from where the interrupted previous write should resume */
size_t buflen; /* Length of the buffer (may be resized after re-allocations) */
size_t bufpending; /* Data an interrupted previous write couldn't send */
size_t bufoffset; /* Offset from where the interrupted previous write should resume */
volatile gint destroyed; /* Whether this libwebsockets client instance has been closed */
janus_transport_session *ts; /* Janus core-transport session */
} janus_websockets_client;
Expand Down Expand Up @@ -1057,6 +1057,9 @@ static int janus_websockets_callback_https(
return janus_websockets_callback_http(wsi, reason, user, in, len);
}

/* Use ~ 2xMTU as chunk size */
#define MESSAGE_CHUNK_SIZE 2800

/* This callback handles Janus API requests */
static int janus_websockets_common_callback(
struct lws *wsi,
Expand Down Expand Up @@ -1184,7 +1187,7 @@ static int janus_websockets_common_callback(
/* Check if Websockets send pipe is choked */
if(lws_send_pipe_choked(wsi)) {
if(ws_client->buffer && ws_client->bufpending > 0 && ws_client->bufoffset > 0) {
JANUS_LOG(LOG_WARN, "Websockets choked with buffer: %d, trying again\n", ws_client->bufpending);
JANUS_LOG(LOG_WARN, "Websockets choked with buffer: %zu, trying again\n", ws_client->bufpending);
lws_callback_on_writable(wsi);
} else {
gint qlen = g_async_queue_length(ws_client->messages);
Expand All @@ -1198,56 +1201,63 @@ static int janus_websockets_common_callback(
}

/* Check if we have a pending/partial write to complete first */
if(ws_client->buffer && ws_client->bufpending > 0 && ws_client->bufoffset > 0
&& !g_atomic_int_get(&ws_client->destroyed) && !g_atomic_int_get(&stopping)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These atomic checks should not be removed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added again the checks in the OR form to let the function early return.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lminiero Hi,May I asy why these atomic checks should not be removed ?
It had checked before of this code branch, isn't ?
What if ws_client->destroyed is not volicate int rather than int ?
if so, Shoud it checked here ?

JANUS_LOG(LOG_HUGE, "[%s-%p] Completing pending WebSocket write (still need to write last %d bytes)...\n",
if(ws_client->bufpending > 0 && ws_client->bufoffset > 0) {
JANUS_LOG(LOG_HUGE, "[%s-%p] Completing pending WebSocket write (still need to write last %zu bytes)...\n",
log_prefix, wsi, ws_client->bufpending);
int sent = lws_write(wsi, ws_client->buffer + ws_client->bufoffset, ws_client->bufpending, LWS_WRITE_TEXT);
JANUS_LOG(LOG_HUGE, "[%s-%p] -- Sent %d/%d bytes\n", log_prefix, wsi, sent, ws_client->bufpending);
if(sent > -1 && sent < ws_client->bufpending) {
/* We still couldn't send everything that was left, we'll try and complete this in the next round */
ws_client->bufpending -= sent;
ws_client->bufoffset += sent;
} else {
/* Clear the pending/partial write queue */
ws_client->bufpending = 0;
ws_client->bufoffset = 0;
}
/* Done for this round, check the next response/notification later */
lws_callback_on_writable(wsi);
janus_mutex_unlock(&ws_client->ts->mutex);
return 0;
}
/* Shoot all the pending messages */
char *response = g_async_queue_try_pop(ws_client->messages);
if(response && !g_atomic_int_get(&ws_client->destroyed) && !g_atomic_int_get(&stopping)) {
else {
atoppi marked this conversation as resolved.
Show resolved Hide resolved
/* Shoot all the pending messages */
char* response = g_async_queue_try_pop(ws_client->messages);
atoppi marked this conversation as resolved.
Show resolved Hide resolved
if (!response) {
/* No messages found */
janus_mutex_unlock(&ws_client->ts->mutex);
return 0;
}
/* Gotcha! */
int buflen = LWS_PRE + strlen(response);
JANUS_LOG(LOG_HUGE, "[%s-%p] Sending WebSocket message (%zu bytes)...\n", log_prefix, wsi, strlen(response));
size_t buflen = LWS_PRE + strlen(response);
if (buflen > ws_client->buflen) {
/* We need a larger shared buffer */
JANUS_LOG(LOG_HUGE, "[%s-%p] Re-allocating to %d bytes (was %d, response is %zu bytes)\n", log_prefix, wsi, buflen, ws_client->buflen, strlen(response));
JANUS_LOG(LOG_HUGE, "[%s-%p] Re-allocating to %zu bytes (was %zu, response is %zu bytes)\n", log_prefix, wsi, buflen, ws_client->buflen, strlen(response));
ws_client->buflen = buflen;
ws_client->buffer = g_realloc(ws_client->buffer, buflen);
}
memcpy(ws_client->buffer + LWS_PRE, response, strlen(response));
JANUS_LOG(LOG_HUGE, "[%s-%p] Sending WebSocket message (%zu bytes)...\n", log_prefix, wsi, strlen(response));
int sent = lws_write(wsi, ws_client->buffer + LWS_PRE, strlen(response), LWS_WRITE_TEXT);
JANUS_LOG(LOG_HUGE, "[%s-%p] -- Sent %d/%zu bytes\n", log_prefix, wsi, sent, strlen(response));
if(sent > -1 && sent < (int)strlen(response)) {
/* Initialize pending bytes count and buffer offset */
ws_client->bufpending = strlen(response);
ws_client->bufoffset = LWS_PRE;
/* We can get rid of the message */
free(response);
}

/* Evaluate amount of data to send according to MESSAGE_CHUNK_SIZE */
int amount = ws_client->bufpending <= MESSAGE_CHUNK_SIZE ? ws_client->bufpending : MESSAGE_CHUNK_SIZE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: why not use size_t for amount (to avoid casting later) as it seems it's never needed as an int.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also a more descriptive name like: amount_to_send or bytes_to_send might be preferable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I've used the same name (amount) and type (int) present in the lws example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@atoppi sure but in their case they don't need to do the casts. It's a minor point but cleaner IMO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If amount type is set to size_t the following simple test will fail

#include <stdio.h>
#include <inttypes.h>
#include <stdlib.h>
#include <limits.h>

int main(int argc, char *argv[]) {	
    size_t amount = 2400;
    /* Simulate error in lws_write */
    int sent = -1;
    if (sent < amount) {
        printf("sent < amount !\n");
        return 0;
    }
    return -1;
}

Any cast in the comparison if (sent < amount) might have a problem:

  • if we do (size_t)sent, we are casting a signed to unsigned
  • if we do (int)amount, we are casting to a lower (at least on my machine) size type (8 vs 4)

Am I missing something ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So in that example, since you are comparing to signed you should be using ssize_t for amount, but AFAICT in your patch you were only ever comparing to unsigned.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here I am comparing sent (type int, return value of lws_write) with amount.
According to lws docs about lws_write:

Return may be -1 for a fatal error needing connection close

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right so ssize_t would make sense then.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally don't see the point: lws_write returns int, not ssize_t, and comparing size_t and ssize_t would result in a comparison between values with different sizes anyway. Using int for that part seems reasonable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah didn't want to derail the discussion here as we know these values are capped pretty low anyway.

/* Set fragment flags */
int flags = lws_write_ws_flags(LWS_WRITE_TEXT, ws_client->bufoffset <= LWS_PRE, ws_client->bufpending <= (size_t)amount);
atoppi marked this conversation as resolved.
Show resolved Hide resolved
/* Send the fragment with proper flags */
int sent = lws_write(wsi, ws_client->buffer + ws_client->bufoffset, (size_t)amount, flags);
JANUS_LOG(LOG_HUGE, "[%s-%p] -- First=%d, Last=%d, Requested=%d bytes, Sent=%d bytes, Missing=%zu bytes\n", log_prefix, wsi, ws_client->bufoffset <= LWS_PRE, ws_client->bufpending <= (size_t)amount, amount, sent, ws_client->bufpending - amount);
if(sent < amount) {
/* Error on sending, abort operation */
JANUS_LOG(LOG_ERR, "Websocket sent only %d bytes (expected %d)\n", sent, amount);
ws_client->bufpending = 0;
ws_client->bufoffset = 0;
}
else {
atoppi marked this conversation as resolved.
Show resolved Hide resolved
/* Fragment successfully sent, update status */
ws_client->bufpending -= amount;
ws_client->bufoffset += amount;
if(ws_client->bufpending > 0) {
/* We still couldn't send everything that was left, we'll try and complete this in the next round */
/* We couldn't send everything in a single write, we'll complete this in the next round */
atoppi marked this conversation as resolved.
Show resolved Hide resolved
ws_client->bufpending = strlen(response) - sent;
ws_client->bufoffset = LWS_PRE + sent;
JANUS_LOG(LOG_HUGE, "[%s-%p] -- Couldn't write all bytes (%d missing), setting offset %d\n",
JANUS_LOG(LOG_HUGE, "[%s-%p] -- Couldn't write all bytes (%zu missing), setting offset %zu\n",
log_prefix, wsi, ws_client->bufpending, ws_client->bufoffset);
}
/* We can get rid of the message */
free(response);
/* Done for this round, check the next response/notification later */
lws_callback_on_writable(wsi);
janus_mutex_unlock(&ws_client->ts->mutex);
return 0;
}
/* Done for this round, check the next response/notification later */
lws_callback_on_writable(wsi);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be calling lws_callback_on_writable even when we successfully sent the last fragment, which shouldn't happen (it will perform one more useless loop).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have preserved the previous behavior (https://github.com/meetecho/janus-gateway/blob/master/transports/janus_websockets.c#L1246).
A quick test showed some issues when sending asyncronous messages when the writable call gets moved from that position.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack!

janus_mutex_unlock(&ws_client->ts->mutex);
return 0;
}
return 0;
}
Expand Down