Skip to content

Commit

Permalink
[C] Hoist check to receiver so it is applied only once.
Browse files Browse the repository at this point in the history
  • Loading branch information
mikeb01 committed Jan 21, 2022
1 parent 66fc409 commit 8fc3976
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 129 deletions.
42 changes: 23 additions & 19 deletions aeron-driver/src/main/c/aeron_driver_receiver.c
Original file line number Diff line number Diff line change
Expand Up @@ -168,32 +168,36 @@ int aeron_driver_receiver_do_work(void *clientd)
{
aeron_publication_image_t *image = receiver->images.array[i].image;

int send_sm_result = aeron_publication_image_send_pending_status_message(image, now_ns);
if (send_sm_result < 0)
if (NULL != image->endpoint && AERON_PUBLICATION_IMAGE_STATE_ACTIVE == image->conductor_fields.state)
{
AERON_APPEND_ERR("%s", "receiver send SM");
aeron_driver_receiver_log_error(receiver);
}
int send_sm_result = aeron_publication_image_send_pending_status_message(image, now_ns);
if (send_sm_result < 0)
{
AERON_APPEND_ERR("%s", "receiver send SM");
aeron_driver_receiver_log_error(receiver);
}

work_count += send_sm_result < 0 ? 0 : send_sm_result;
work_count += send_sm_result < 0 ? 0 : send_sm_result;

int send_nak_result = aeron_publication_image_send_pending_loss(image);
if (send_nak_result < 0)
{
AERON_APPEND_ERR("%s", "receiver send NAK");
aeron_driver_receiver_log_error(receiver);
}
int send_nak_result = aeron_publication_image_send_pending_loss(image);
if (send_nak_result < 0)
{
AERON_APPEND_ERR("%s", "receiver send NAK");
aeron_driver_receiver_log_error(receiver);
}

work_count += send_nak_result < 0 ? 0 : send_nak_result;
work_count += send_nak_result < 0 ? 0 : send_nak_result;

int initiate_rttm_result = aeron_publication_image_initiate_rttm(image, now_ns);
if (send_nak_result < 0)
{
AERON_APPEND_ERR("%s", "receiver send RTTM");
aeron_driver_receiver_log_error(receiver);
int initiate_rttm_result = aeron_publication_image_initiate_rttm(image, now_ns);
if (send_nak_result < 0)
{
AERON_APPEND_ERR("%s", "receiver send RTTM");
aeron_driver_receiver_log_error(receiver);
}

work_count += initiate_rttm_result < 0 ? 0 : initiate_rttm_result;
}

work_count += initiate_rttm_result < 0 ? 0 : initiate_rttm_result;
}

for (int last_index = (int)receiver->pending_setups.length - 1, i = last_index; i >= 0; i--)
Expand Down
212 changes: 102 additions & 110 deletions aeron-driver/src/main/c/aeron_publication_image.c
Original file line number Diff line number Diff line change
Expand Up @@ -545,129 +545,124 @@ int aeron_publication_image_send_pending_status_message(aeron_publication_image_
{
int work_count = 0;

if (NULL != image->endpoint && AERON_PUBLICATION_IMAGE_STATE_ACTIVE == image->conductor_fields.state)
int64_t change_number;
AERON_GET_VOLATILE(change_number, image->end_sm_change);

if (change_number != image->last_sm_change_number ||
(now_ns > (image->time_of_last_sm_ns + image->sm_timeout_ns)))
{
int64_t change_number;
AERON_GET_VOLATILE(change_number, image->end_sm_change);
const int64_t sm_position = image->next_sm_position;
const int32_t receiver_window_length = image->next_sm_receiver_window_length;

if (change_number != image->last_sm_change_number ||
(now_ns > (image->time_of_last_sm_ns + image->sm_timeout_ns)))
{
const int64_t sm_position = image->next_sm_position;
const int32_t receiver_window_length = image->next_sm_receiver_window_length;
aeron_acquire();

aeron_acquire();
if (change_number == image->begin_sm_change)
{
const int32_t term_id = aeron_logbuffer_compute_term_id_from_position(
sm_position, image->position_bits_to_shift, image->initial_term_id);
const int32_t term_offset = (int32_t)(sm_position & image->term_length_mask);

if (change_number == image->begin_sm_change)
for (size_t i = 0, len = image->connections.length; i < len; i++)
{
const int32_t term_id = aeron_logbuffer_compute_term_id_from_position(
sm_position, image->position_bits_to_shift, image->initial_term_id);
const int32_t term_offset = (int32_t)(sm_position & image->term_length_mask);
aeron_publication_image_connection_t *connection = &image->connections.array[i];

for (size_t i = 0, len = image->connections.length; i < len; i++)
if (aeron_publication_image_connection_is_alive(connection, now_ns))
{
aeron_publication_image_connection_t *connection = &image->connections.array[i];
int send_sm_result = aeron_receive_channel_endpoint_send_sm(
image->endpoint,
connection->control_addr,
image->stream_id,
image->session_id,
term_id,
term_offset,
receiver_window_length,
0);

if (aeron_publication_image_connection_is_alive(connection, now_ns))
if (send_sm_result < 0)
{
int send_sm_result = aeron_receive_channel_endpoint_send_sm(
image->endpoint,
connection->control_addr,
image->stream_id,
image->session_id,
term_id,
term_offset,
receiver_window_length,
0);

if (send_sm_result < 0)
{
work_count = send_sm_result;
break;
}

work_count++;
aeron_counter_ordered_increment(image->status_messages_sent_counter, 1);
work_count = send_sm_result;
break;
}

work_count++;
aeron_counter_ordered_increment(image->status_messages_sent_counter, 1);
}
}

image->last_sm_position = sm_position;
image->last_overrun_threshold = sm_position + image->max_receiver_window_length;
image->last_sm_change_number = change_number;
image->time_of_last_sm_ns = now_ns;
image->last_sm_position = sm_position;
image->last_overrun_threshold = sm_position + image->max_receiver_window_length;
image->last_sm_change_number = change_number;
image->time_of_last_sm_ns = now_ns;

aeron_update_active_transport_count(image, now_ns);
}
aeron_update_active_transport_count(image, now_ns);
}
}


return work_count;
}

int aeron_publication_image_send_pending_loss(aeron_publication_image_t *image)
{
int work_count = 0;

if (NULL != image->endpoint && AERON_PUBLICATION_IMAGE_STATE_ACTIVE == image->conductor_fields.state)
{
int64_t change_number;
AERON_GET_VOLATILE(change_number, image->end_loss_change);
int64_t change_number;
AERON_GET_VOLATILE(change_number, image->end_loss_change);

if (change_number != image->last_loss_change_number)
{
const int32_t term_id = image->loss_term_id;
const int32_t term_offset = image->loss_term_offset;
const int32_t length = (int32_t)image->loss_length;
if (change_number != image->last_loss_change_number)
{
const int32_t term_id = image->loss_term_id;
const int32_t term_offset = image->loss_term_offset;
const int32_t length = (int32_t)image->loss_length;

aeron_acquire();
aeron_acquire();

if (change_number == image->begin_loss_change)
if (change_number == image->begin_loss_change)
{
if (image->conductor_fields.is_reliable)
{
if (image->conductor_fields.is_reliable)
const int64_t now_ns = aeron_clock_cached_nano_time(image->cached_clock);

for (size_t i = 0, len = image->connections.length; i < len; i++)
{
const int64_t now_ns = aeron_clock_cached_nano_time(image->cached_clock);
aeron_publication_image_connection_t *connection = &image->connections.array[i];

for (size_t i = 0, len = image->connections.length; i < len; i++)
if (aeron_publication_image_connection_is_alive(connection, now_ns))
{
aeron_publication_image_connection_t *connection = &image->connections.array[i];
int send_nak_result = aeron_receive_channel_endpoint_send_nak(
image->endpoint,
connection->control_addr,
image->stream_id,
image->session_id,
term_id,
term_offset,
length);

if (aeron_publication_image_connection_is_alive(connection, now_ns))
if (send_nak_result < 0)
{
int send_nak_result = aeron_receive_channel_endpoint_send_nak(
image->endpoint,
connection->control_addr,
image->stream_id,
image->session_id,
term_id,
term_offset,
length);

if (send_nak_result < 0)
{
work_count = send_nak_result;
break;
}

work_count++;
aeron_counter_ordered_increment(image->nak_messages_sent_counter, 1);
work_count = send_nak_result;
break;
}
}
}
else
{
const size_t index = aeron_logbuffer_index_by_term(image->initial_term_id, term_id);
uint8_t *buffer = image->mapped_raw_log.term_buffers[index].addr;

if (aeron_term_gap_filler_try_fill_gap(image->log_meta_data, buffer, term_id, term_offset, length))
{
aeron_counter_ordered_increment(image->loss_gap_fills_counter, 1);
work_count++;
aeron_counter_ordered_increment(image->nak_messages_sent_counter, 1);
}
}
}
else
{
const size_t index = aeron_logbuffer_index_by_term(image->initial_term_id, term_id);
uint8_t *buffer = image->mapped_raw_log.term_buffers[index].addr;

work_count = 1;
if (aeron_term_gap_filler_try_fill_gap(image->log_meta_data, buffer, term_id, term_offset, length))
{
aeron_counter_ordered_increment(image->loss_gap_fills_counter, 1);
}

image->last_loss_change_number = change_number;
work_count = 1;
}

image->last_loss_change_number = change_number;
}
}

Expand All @@ -678,37 +673,34 @@ int aeron_publication_image_initiate_rttm(aeron_publication_image_t *image, int6
{
int work_count = 0;

if (NULL != image->endpoint && AERON_PUBLICATION_IMAGE_STATE_ACTIVE == image->conductor_fields.state)
if (image->congestion_control->should_measure_rtt(image->congestion_control->state, now_ns))
{
if (image->congestion_control->should_measure_rtt(image->congestion_control->state, now_ns))
for (size_t i = 0, len = image->connections.length; i < len; i++)
{
for (size_t i = 0, len = image->connections.length; i < len; i++)
{
aeron_publication_image_connection_t *connection = &image->connections.array[i];
aeron_publication_image_connection_t *connection = &image->connections.array[i];

if (aeron_publication_image_connection_is_alive(connection, now_ns))
if (aeron_publication_image_connection_is_alive(connection, now_ns))
{
int send_rttm_result = aeron_receive_channel_endpoint_send_rttm(
image->endpoint,
connection->control_addr,
image->stream_id,
image->session_id,
now_ns,
0,
true);

if (send_rttm_result < 0)
{
int send_rttm_result = aeron_receive_channel_endpoint_send_rttm(
image->endpoint,
connection->control_addr,
image->stream_id,
image->session_id,
now_ns,
0,
true);

if (send_rttm_result < 0)
{
work_count = send_rttm_result;
break;
}
else
{
image->congestion_control->on_rttm_sent(image->congestion_control->state, now_ns);
}

work_count++;
work_count = send_rttm_result;
break;
}
else
{
image->congestion_control->on_rttm_sent(image->congestion_control->state, now_ns);
}

work_count++;
}
}
}
Expand Down

0 comments on commit 8fc3976

Please sign in to comment.