Skip to content

Commit

Permalink
Extend recording concurrent publications (#1283)
Browse files Browse the repository at this point in the history
* [Java] Add support for initialTermId, termId, termOffset for concurrent publications.

* [Java] Additional test for initially unspecified publications.

* [C] Support initialTermId, termId and termOffset for concurrent publications.

* [C] Explicitly handle wrapping when calculating term count.

* [C] Ensure wrapping works correctly when calculating term count.  Reuse aeron_logbuffer_compute_position to calculate begin position.

* [C] Consolidate computation of term count into a function.  Fix additional case of wrapping on addition.

* [C] Fix missing symbol in debug build.
  • Loading branch information
mikeb01 authored Jan 21, 2022
1 parent 8fc3976 commit f2f11fa
Show file tree
Hide file tree
Showing 24 changed files with 479 additions and 87 deletions.
12 changes: 0 additions & 12 deletions aeron-archive/src/test/java/io/aeron/archive/ArchiveTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@
import io.aeron.test.Tests;
import org.agrona.IoUtil;
import org.agrona.SystemUtil;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.TestWatcher;

import java.io.File;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -143,14 +141,4 @@ public void onResponse(
Tests.await(() -> controlResponseAdapter.poll() != 0, TIMEOUT_NS);
}

public static TestWatcher newWatcher(final long seed)
{
return new TestWatcher()
{
public void testFailed(final ExtensionContext context, final Throwable cause)
{
System.err.println(context.getDisplayName() + " failed with random seed: " + seed);
}
};
}
}
2 changes: 1 addition & 1 deletion aeron-client/src/main/c/aeron_exclusive_publication.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ void aeron_exclusive_publication_force_close(aeron_exclusive_publication_t *publ
inline void aeron_exclusive_publication_rotate_term(aeron_exclusive_publication_t *publication)
{
const int32_t next_term_id = publication->term_id + 1;
const int32_t next_term_count = next_term_id - publication->initial_term_id;
const int32_t next_term_count = aeron_logbuffer_compute_term_count(next_term_id, publication->initial_term_id);
const size_t next_index = aeron_logbuffer_index_by_term(publication->initial_term_id, next_term_id);

publication->active_partition_index = next_index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ int aeron_logbuffer_check_page_size(uint64_t page_size)
return 0;
}

extern int64_t aeron_logbuffer_compute_term_count(int32_t term_id, int32_t initial_term_id);
extern uint64_t aeron_logbuffer_compute_log_length(uint64_t term_length, uint64_t page_size);
extern int32_t aeron_logbuffer_term_offset(int64_t raw_tail, int32_t term_length);
extern int32_t aeron_logbuffer_term_id(int64_t raw_tail);
Expand Down
18 changes: 11 additions & 7 deletions aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <string.h>
#include "protocol/aeron_udp_protocol.h"
#include "util/aeron_bitutil.h"
#include "util/aeron_math.h"
#include "concurrent/aeron_atomic.h"

#define AERON_LOGBUFFER_PARTITION_COUNT (3)
Expand Down Expand Up @@ -89,14 +90,20 @@ inline int32_t aeron_logbuffer_term_id(int64_t raw_tail)
return (int32_t)(raw_tail >> 32);
}

inline int64_t aeron_logbuffer_compute_term_count(int32_t term_id, int32_t initial_term_id)
{
return aeron_sub_wrap_i32(term_id, initial_term_id);
}

inline size_t aeron_logbuffer_index_by_position(int64_t position, size_t position_bits_to_shift)
{
return (size_t)((position >> position_bits_to_shift) % AERON_LOGBUFFER_PARTITION_COUNT);
}

inline size_t aeron_logbuffer_index_by_term(int32_t initial_term_id, int32_t active_term_id)
{
return (size_t)((active_term_id - initial_term_id) % AERON_LOGBUFFER_PARTITION_COUNT);
int64_t term_count = aeron_logbuffer_compute_term_count(active_term_id, initial_term_id);
return (size_t)(term_count % AERON_LOGBUFFER_PARTITION_COUNT);
}

inline size_t aeron_logbuffer_index_by_term_count(int64_t term_count)
Expand All @@ -107,23 +114,20 @@ inline size_t aeron_logbuffer_index_by_term_count(int64_t term_count)
inline int64_t aeron_logbuffer_compute_position(
int32_t active_term_id, int32_t term_offset, size_t position_bits_to_shift, int32_t initial_term_id)
{
int64_t term_count = active_term_id - initial_term_id;

int64_t term_count = aeron_logbuffer_compute_term_count(active_term_id, initial_term_id);
return (term_count << position_bits_to_shift) + term_offset;
}

inline int64_t aeron_logbuffer_compute_term_begin_position(
int32_t active_term_id, size_t position_bits_to_shift, int32_t initial_term_id)
{
int64_t term_count = active_term_id - initial_term_id;

return (term_count << position_bits_to_shift);
return aeron_logbuffer_compute_position(active_term_id, 0, position_bits_to_shift, initial_term_id);
}

inline int32_t aeron_logbuffer_compute_term_id_from_position(
int64_t position, size_t position_bits_to_shift, int32_t initial_term_id)
{
return (int32_t)(position >> position_bits_to_shift) + initial_term_id;
return aeron_add_wrap_i32((int32_t)(position >> position_bits_to_shift), initial_term_id);
}

inline int32_t aeron_logbuffer_compute_term_offset_from_position(int64_t position, size_t position_bits_to_shift)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1563,9 +1563,14 @@ public Boolean spiesSimulateConnection()
*/
public ChannelUriStringBuilder initialPosition(final long position, final int initialTermId, final int termLength)
{
if (position < 0 || 0 != (position & (FRAME_ALIGNMENT - 1)))
if (position < 0)
{
throw new IllegalArgumentException("invalid position: " + position);
throw new IllegalArgumentException("invalid position=" + position + " < 0");
}
if (0 != (position & (FRAME_ALIGNMENT - 1)))
{
throw new IllegalArgumentException(
"invalid position=" + position + " does not have frame alignment=" + FRAME_ALIGNMENT);
}

final int bitsToShift = LogBufferDescriptor.positionBitsToShift(termLength);
Expand Down
70 changes: 59 additions & 11 deletions aeron-driver/src/main/c/aeron_driver_conductor.c
Original file line number Diff line number Diff line change
Expand Up @@ -729,38 +729,74 @@ static int aeron_driver_conductor_speculate_next_session_id(
int aeron_confirm_publication_match(
const aeron_driver_uri_publication_params_t *params,
const int32_t existing_session_id,
const aeron_logbuffer_metadata_t *logbuffer_metadata)
const aeron_logbuffer_metadata_t *logbuffer_metadata,
const int32_t existing_initial_term_id,
const int32_t existing_term_id,
const size_t existing_term_offset)
{
if (params->has_session_id && params->session_id != existing_session_id)
{
AERON_SET_ERR(
EINVAL,
"existing publication has different session id: existing=%" PRId32 " requested=%" PRId32,
existing_session_id, params->session_id);
"existing publication has different '%s': existing=%" PRId32 " requested=%" PRId32,
AERON_URI_SESSION_ID_KEY, existing_session_id, params->session_id);

return -1;
}

if (params->mtu_length != (size_t)logbuffer_metadata->mtu_length)
if (params->has_mtu_length && params->mtu_length != (size_t)logbuffer_metadata->mtu_length)
{
AERON_SET_ERR(
EINVAL,
"existing publication has different MTU length: existing=%" PRId32 " requested=%" PRIu64,
logbuffer_metadata->mtu_length, (uint64_t)params->mtu_length);
"existing publication has different '%s': existing=%" PRId32 " requested=%" PRIu64,
AERON_URI_MTU_LENGTH_KEY, logbuffer_metadata->mtu_length, (uint64_t)params->mtu_length);

return -1;
}

if (params->term_length != (size_t)logbuffer_metadata->term_length)
if (params->has_term_length && params->term_length != (size_t)logbuffer_metadata->term_length)
{
AERON_SET_ERR(
EINVAL,
"existing publication has different term length: existing=%" PRId32 " requested=%" PRIu64,
logbuffer_metadata->term_length, (uint64_t)params->term_length);
"existing publication has different '%s': existing=%" PRId32 " requested=%" PRIu64,
AERON_URI_TERM_LENGTH_KEY, logbuffer_metadata->term_length, (uint64_t)params->term_length);

return -1;
}

if (params->has_position)
{
if (params->initial_term_id != existing_initial_term_id)
{
AERON_SET_ERR(
EINVAL,
"existing publication has different '%s': existing=%" PRId32 " requested=%" PRId32,
AERON_URI_INITIAL_TERM_ID_KEY, existing_initial_term_id, params->initial_term_id);

return -1;
}

if (params->term_id != existing_term_id)
{
AERON_SET_ERR(
EINVAL,
"existing publication has different '%s': existing=%" PRId32 " requested=%" PRId32,
AERON_URI_TERM_ID_KEY, existing_term_id, params->term_id);

return -1;
}

if (params->term_offset != existing_term_offset)
{
AERON_SET_ERR(
EINVAL,
"existing publication has different '%s': existing=%" PRId64 " requested=%" PRIu64,
AERON_URI_TERM_OFFSET_KEY, (uint64_t)existing_term_offset, (uint64_t)params->term_offset);

return -1;
}
}

return 0;
}

Expand Down Expand Up @@ -1371,7 +1407,13 @@ aeron_ipc_publication_t *aeron_driver_conductor_get_or_add_ipc_publication(

if (!is_exclusive && NULL != publication)
{
if (aeron_confirm_publication_match(params, publication->session_id, publication->log_meta_data) < 0)
if (aeron_confirm_publication_match(
params,
publication->session_id,
publication->log_meta_data,
publication->initial_term_id,
publication->starting_term_id,
publication->starting_term_offset) < 0)
{
return NULL;
}
Expand Down Expand Up @@ -1547,7 +1589,13 @@ aeron_network_publication_t *aeron_driver_conductor_get_or_add_network_publicati
return NULL;
}

if (0 != aeron_confirm_publication_match(params, publication->session_id, publication->log_meta_data))
if (0 != aeron_confirm_publication_match(
params,
publication->session_id,
publication->log_meta_data,
publication->initial_term_id,
publication->starting_term_id,
publication->starting_term_offset))
{
return NULL;
}
Expand Down
4 changes: 3 additions & 1 deletion aeron-driver/src/main/c/aeron_ipc_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ int aeron_ipc_publication_create(
if (params->has_position)
{
int64_t term_id = params->term_id;
int32_t term_count = params->term_id - initial_term_id;
int32_t term_count = aeron_logbuffer_compute_term_count(params->term_id, initial_term_id);
size_t active_index = aeron_logbuffer_index_by_term_count(term_count);

_pub->log_meta_data->term_tail_counters[active_index] =
Expand Down Expand Up @@ -163,6 +163,8 @@ int aeron_ipc_publication_create(
_pub->pub_pos_position.counter_id = pub_pos_position->counter_id;
_pub->pub_pos_position.value_addr = pub_pos_position->value_addr;
_pub->initial_term_id = initial_term_id;
_pub->starting_term_id = params->has_position ? params->term_id : initial_term_id;
_pub->starting_term_offset = params->has_position ? params->term_offset : 0;
_pub->position_bits_to_shift = (size_t)aeron_number_of_trailing_zeroes((int32_t)params->term_length);
_pub->term_window_length = (int64_t)aeron_producer_window_length(
context->ipc_publication_window_length, params->term_length);
Expand Down
2 changes: 2 additions & 0 deletions aeron-driver/src/main/c/aeron_ipc_publication.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ typedef struct aeron_ipc_publication_stct
int32_t session_id;
int32_t stream_id;
int32_t initial_term_id;
int32_t starting_term_id;
size_t starting_term_offset;
size_t log_file_name_length;
size_t position_bits_to_shift;
bool is_exclusive;
Expand Down
2 changes: 1 addition & 1 deletion aeron-driver/src/main/c/aeron_loss_detector.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ int32_t aeron_loss_detector_scan(
const int32_t rebuild_term_count = (int32_t)(rebuild_position >> position_bits_to_shift);
const int32_t hwm_term_count = (int32_t)(hwm_position >> position_bits_to_shift);

const int32_t rebuild_term_id = initial_term_id + rebuild_term_count;
const int32_t rebuild_term_id = aeron_add_wrap_i32(initial_term_id, rebuild_term_count);
const int32_t hwm_term_offset = (int32_t)(hwm_position & term_length_mask);
const int32_t limit_offset = rebuild_term_count == hwm_term_count ?
hwm_term_offset : (int32_t)(term_length_mask + 1);
Expand Down
4 changes: 3 additions & 1 deletion aeron-driver/src/main/c/aeron_network_publication.c
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ int aeron_network_publication_create(
if (params->has_position)
{
int64_t term_id = params->term_id;
int32_t term_count = params->term_id - initial_term_id;
int32_t term_count = aeron_logbuffer_compute_term_count(params->term_id, initial_term_id);
size_t active_index = aeron_logbuffer_index_by_term_count(term_count);

_pub->log_meta_data->term_tail_counters[active_index] =
Expand Down Expand Up @@ -197,6 +197,8 @@ int aeron_network_publication_create(
_pub->snd_bpe_counter.value_addr = snd_bpe_counter->value_addr;
_pub->tag = params->entity_tag;
_pub->initial_term_id = initial_term_id;
_pub->starting_term_id = params->has_position ? params->term_id : initial_term_id;
_pub->starting_term_offset = params->has_position ? params->term_offset : 0;
_pub->term_buffer_length = _pub->log_meta_data->term_length;
_pub->term_length_mask = (int32_t)params->term_length - 1;
_pub->position_bits_to_shift = (size_t)aeron_number_of_trailing_zeroes((int32_t)params->term_length);
Expand Down
2 changes: 2 additions & 0 deletions aeron-driver/src/main/c/aeron_network_publication.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ typedef struct aeron_network_publication_stct
int32_t session_id;
int32_t stream_id;
int32_t initial_term_id;
int32_t starting_term_id;
int32_t term_length_mask;
size_t starting_term_offset;
size_t log_file_name_length;
size_t position_bits_to_shift;
size_t mtu_length;
Expand Down
14 changes: 4 additions & 10 deletions aeron-driver/src/main/c/uri/aeron_driver_uri.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ int aeron_uri_get_term_length_param(aeron_uri_params_t *uri_params, aeron_driver
}

params->term_length = value;
params->has_term_length = true;
}

return 0;
Expand All @@ -67,6 +68,7 @@ int aeron_uri_get_mtu_length_param(aeron_uri_params_t *uri_params, aeron_driver_
}

params->mtu_length = value;
params->has_mtu_length = true;
}

return 0;
Expand Down Expand Up @@ -158,7 +160,9 @@ int aeron_diver_uri_publication_params(

params->linger_timeout_ns = context->publication_linger_timeout_ns;
params->term_length = AERON_URI_IPC == uri->type ? context->ipc_term_buffer_length : context->term_buffer_length;
params->has_term_length = false;
params->mtu_length = AERON_URI_IPC == uri->type ? context->ipc_mtu_length : context->mtu_length;
params->has_mtu_length = false;
params->initial_term_id = 0;
params->term_offset = 0;
params->term_id = 0;
Expand Down Expand Up @@ -235,16 +239,6 @@ int aeron_diver_uri_publication_params(
{
char *end_ptr = NULL;

if (!is_exclusive)
{
AERON_SET_ERR(
EINVAL,
"params: %s %s %s are not supported for concurrent publications",
AERON_URI_INITIAL_TERM_ID_KEY,
AERON_URI_TERM_ID_KEY,
AERON_URI_TERM_OFFSET_KEY);
return -1;
}
if (count < 3)
{
AERON_SET_ERR(
Expand Down
2 changes: 2 additions & 0 deletions aeron-driver/src/main/c/uri/aeron_driver_uri.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ typedef struct aeron_driver_uri_publication_params_stct
bool is_sparse;
bool signal_eos;
bool spies_simulate_connection;
bool has_mtu_length;
size_t mtu_length;
bool has_term_length;
size_t term_length;
size_t term_offset;
int32_t initial_term_id;
Expand Down
24 changes: 21 additions & 3 deletions aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,16 @@ void onAddNetworkPublication(
}
else
{
confirmMatch(channelUri, params, publication.rawLog(), publication.sessionId(), publication.channel());
confirmMatch(
channelUri,
params,
publication.rawLog(),
publication.sessionId(),
publication.channel(),
publication.initialTermId(),
publication.startingTermId(),
publication.startingTermOffset());

validateSpiesSimulateConnection(
params, publication.spiesSimulateConnection(), channel, publication.channel());
}
Expand Down Expand Up @@ -674,7 +683,15 @@ void onAddIpcPublication(
}
else
{
confirmMatch(channelUri, params, publication.rawLog(), publication.sessionId(), publication.channel());
confirmMatch(
channelUri,
params,
publication.rawLog(),
publication.sessionId(),
publication.channel(),
publication.initialTermId(),
publication.startingTermId(),
publication.startingTermOffset());
}

publicationLinks.add(new PublicationLink(correlationId, getOrAddClient(clientId), publication));
Expand Down Expand Up @@ -1797,7 +1814,8 @@ private IpcPublication addIpcPublication(
publisherLimit,
rawLog,
Configuration.producerWindowLength(params.termLength, ctx.ipcPublicationTermWindowLength()),
isExclusive);
isExclusive,
params);

ipcPublications.add(publication);
activeSessionSet.add(new SessionKey(sessionId, streamId, IPC_MEDIA));
Expand Down
Loading

0 comments on commit f2f11fa

Please sign in to comment.