diff --git a/aeron-archive/src/test/java/io/aeron/archive/ArchiveTests.java b/aeron-archive/src/test/java/io/aeron/archive/ArchiveTests.java index 6ac0912b91..fcebae6656 100644 --- a/aeron-archive/src/test/java/io/aeron/archive/ArchiveTests.java +++ b/aeron-archive/src/test/java/io/aeron/archive/ArchiveTests.java @@ -21,8 +21,6 @@ import io.aeron.test.Tests; import org.agrona.IoUtil; import org.agrona.SystemUtil; -import org.junit.jupiter.api.extension.ExtensionContext; -import org.junit.jupiter.api.extension.TestWatcher; import java.io.File; import java.util.concurrent.TimeUnit; @@ -143,14 +141,4 @@ public void onResponse( Tests.await(() -> controlResponseAdapter.poll() != 0, TIMEOUT_NS); } - public static TestWatcher newWatcher(final long seed) - { - return new TestWatcher() - { - public void testFailed(final ExtensionContext context, final Throwable cause) - { - System.err.println(context.getDisplayName() + " failed with random seed: " + seed); - } - }; - } } diff --git a/aeron-client/src/main/c/aeron_exclusive_publication.h b/aeron-client/src/main/c/aeron_exclusive_publication.h index cc0dbdd1a0..8d7a285c17 100644 --- a/aeron-client/src/main/c/aeron_exclusive_publication.h +++ b/aeron-client/src/main/c/aeron_exclusive_publication.h @@ -82,7 +82,7 @@ void aeron_exclusive_publication_force_close(aeron_exclusive_publication_t *publ inline void aeron_exclusive_publication_rotate_term(aeron_exclusive_publication_t *publication) { const int32_t next_term_id = publication->term_id + 1; - const int32_t next_term_count = next_term_id - publication->initial_term_id; + const int32_t next_term_count = aeron_logbuffer_compute_term_count(next_term_id, publication->initial_term_id); const size_t next_index = aeron_logbuffer_index_by_term(publication->initial_term_id, next_term_id); publication->active_partition_index = next_index; diff --git a/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.c b/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.c index 38ed33896c..512024cd6f 100644 --- a/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.c +++ b/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.c @@ -83,6 +83,7 @@ int aeron_logbuffer_check_page_size(uint64_t page_size) return 0; } +extern int64_t aeron_logbuffer_compute_term_count(int32_t term_id, int32_t initial_term_id); extern uint64_t aeron_logbuffer_compute_log_length(uint64_t term_length, uint64_t page_size); extern int32_t aeron_logbuffer_term_offset(int64_t raw_tail, int32_t term_length); extern int32_t aeron_logbuffer_term_id(int64_t raw_tail); diff --git a/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.h b/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.h index 96a78f9895..a5d82ecc06 100644 --- a/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.h +++ b/aeron-client/src/main/c/concurrent/aeron_logbuffer_descriptor.h @@ -21,6 +21,7 @@ #include #include "protocol/aeron_udp_protocol.h" #include "util/aeron_bitutil.h" +#include "util/aeron_math.h" #include "concurrent/aeron_atomic.h" #define AERON_LOGBUFFER_PARTITION_COUNT (3) @@ -89,6 +90,11 @@ inline int32_t aeron_logbuffer_term_id(int64_t raw_tail) return (int32_t)(raw_tail >> 32); } +inline int64_t aeron_logbuffer_compute_term_count(int32_t term_id, int32_t initial_term_id) +{ + return aeron_sub_wrap_i32(term_id, initial_term_id); +} + inline size_t aeron_logbuffer_index_by_position(int64_t position, size_t position_bits_to_shift) { return (size_t)((position >> position_bits_to_shift) % AERON_LOGBUFFER_PARTITION_COUNT); @@ -96,7 +102,8 @@ inline size_t aeron_logbuffer_index_by_position(int64_t position, size_t positio inline size_t aeron_logbuffer_index_by_term(int32_t initial_term_id, int32_t active_term_id) { - return (size_t)((active_term_id - initial_term_id) % AERON_LOGBUFFER_PARTITION_COUNT); + int64_t term_count = aeron_logbuffer_compute_term_count(active_term_id, initial_term_id); + return (size_t)(term_count % AERON_LOGBUFFER_PARTITION_COUNT); } inline size_t aeron_logbuffer_index_by_term_count(int64_t term_count) @@ -107,23 +114,20 @@ inline size_t aeron_logbuffer_index_by_term_count(int64_t term_count) inline int64_t aeron_logbuffer_compute_position( int32_t active_term_id, int32_t term_offset, size_t position_bits_to_shift, int32_t initial_term_id) { - int64_t term_count = active_term_id - initial_term_id; - + int64_t term_count = aeron_logbuffer_compute_term_count(active_term_id, initial_term_id); return (term_count << position_bits_to_shift) + term_offset; } inline int64_t aeron_logbuffer_compute_term_begin_position( int32_t active_term_id, size_t position_bits_to_shift, int32_t initial_term_id) { - int64_t term_count = active_term_id - initial_term_id; - - return (term_count << position_bits_to_shift); + return aeron_logbuffer_compute_position(active_term_id, 0, position_bits_to_shift, initial_term_id); } inline int32_t aeron_logbuffer_compute_term_id_from_position( int64_t position, size_t position_bits_to_shift, int32_t initial_term_id) { - return (int32_t)(position >> position_bits_to_shift) + initial_term_id; + return aeron_add_wrap_i32((int32_t)(position >> position_bits_to_shift), initial_term_id); } inline int32_t aeron_logbuffer_compute_term_offset_from_position(int64_t position, size_t position_bits_to_shift) diff --git a/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java b/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java index d3b1fa49ad..2b50a36787 100644 --- a/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java +++ b/aeron-client/src/main/java/io/aeron/ChannelUriStringBuilder.java @@ -1563,9 +1563,14 @@ public Boolean spiesSimulateConnection() */ public ChannelUriStringBuilder initialPosition(final long position, final int initialTermId, final int termLength) { - if (position < 0 || 0 != (position & (FRAME_ALIGNMENT - 1))) + if (position < 0) { - throw new IllegalArgumentException("invalid position: " + position); + throw new IllegalArgumentException("invalid position=" + position + " < 0"); + } + if (0 != (position & (FRAME_ALIGNMENT - 1))) + { + throw new IllegalArgumentException( + "invalid position=" + position + " does not have frame alignment=" + FRAME_ALIGNMENT); } final int bitsToShift = LogBufferDescriptor.positionBitsToShift(termLength); diff --git a/aeron-driver/src/main/c/aeron_driver_conductor.c b/aeron-driver/src/main/c/aeron_driver_conductor.c index 0ca6ed8dab..04058841db 100644 --- a/aeron-driver/src/main/c/aeron_driver_conductor.c +++ b/aeron-driver/src/main/c/aeron_driver_conductor.c @@ -729,38 +729,74 @@ static int aeron_driver_conductor_speculate_next_session_id( int aeron_confirm_publication_match( const aeron_driver_uri_publication_params_t *params, const int32_t existing_session_id, - const aeron_logbuffer_metadata_t *logbuffer_metadata) + const aeron_logbuffer_metadata_t *logbuffer_metadata, + const int32_t existing_initial_term_id, + const int32_t existing_term_id, + const size_t existing_term_offset) { if (params->has_session_id && params->session_id != existing_session_id) { AERON_SET_ERR( EINVAL, - "existing publication has different session id: existing=%" PRId32 " requested=%" PRId32, - existing_session_id, params->session_id); + "existing publication has different '%s': existing=%" PRId32 " requested=%" PRId32, + AERON_URI_SESSION_ID_KEY, existing_session_id, params->session_id); return -1; } - if (params->mtu_length != (size_t)logbuffer_metadata->mtu_length) + if (params->has_mtu_length && params->mtu_length != (size_t)logbuffer_metadata->mtu_length) { AERON_SET_ERR( EINVAL, - "existing publication has different MTU length: existing=%" PRId32 " requested=%" PRIu64, - logbuffer_metadata->mtu_length, (uint64_t)params->mtu_length); + "existing publication has different '%s': existing=%" PRId32 " requested=%" PRIu64, + AERON_URI_MTU_LENGTH_KEY, logbuffer_metadata->mtu_length, (uint64_t)params->mtu_length); return -1; } - if (params->term_length != (size_t)logbuffer_metadata->term_length) + if (params->has_term_length && params->term_length != (size_t)logbuffer_metadata->term_length) { AERON_SET_ERR( EINVAL, - "existing publication has different term length: existing=%" PRId32 " requested=%" PRIu64, - logbuffer_metadata->term_length, (uint64_t)params->term_length); + "existing publication has different '%s': existing=%" PRId32 " requested=%" PRIu64, + AERON_URI_TERM_LENGTH_KEY, logbuffer_metadata->term_length, (uint64_t)params->term_length); return -1; } + if (params->has_position) + { + if (params->initial_term_id != existing_initial_term_id) + { + AERON_SET_ERR( + EINVAL, + "existing publication has different '%s': existing=%" PRId32 " requested=%" PRId32, + AERON_URI_INITIAL_TERM_ID_KEY, existing_initial_term_id, params->initial_term_id); + + return -1; + } + + if (params->term_id != existing_term_id) + { + AERON_SET_ERR( + EINVAL, + "existing publication has different '%s': existing=%" PRId32 " requested=%" PRId32, + AERON_URI_TERM_ID_KEY, existing_term_id, params->term_id); + + return -1; + } + + if (params->term_offset != existing_term_offset) + { + AERON_SET_ERR( + EINVAL, + "existing publication has different '%s': existing=%" PRId64 " requested=%" PRIu64, + AERON_URI_TERM_OFFSET_KEY, (uint64_t)existing_term_offset, (uint64_t)params->term_offset); + + return -1; + } + } + return 0; } @@ -1371,7 +1407,13 @@ aeron_ipc_publication_t *aeron_driver_conductor_get_or_add_ipc_publication( if (!is_exclusive && NULL != publication) { - if (aeron_confirm_publication_match(params, publication->session_id, publication->log_meta_data) < 0) + if (aeron_confirm_publication_match( + params, + publication->session_id, + publication->log_meta_data, + publication->initial_term_id, + publication->starting_term_id, + publication->starting_term_offset) < 0) { return NULL; } @@ -1547,7 +1589,13 @@ aeron_network_publication_t *aeron_driver_conductor_get_or_add_network_publicati return NULL; } - if (0 != aeron_confirm_publication_match(params, publication->session_id, publication->log_meta_data)) + if (0 != aeron_confirm_publication_match( + params, + publication->session_id, + publication->log_meta_data, + publication->initial_term_id, + publication->starting_term_id, + publication->starting_term_offset)) { return NULL; } diff --git a/aeron-driver/src/main/c/aeron_ipc_publication.c b/aeron-driver/src/main/c/aeron_ipc_publication.c index f13c3c0cae..bb23ff72c4 100644 --- a/aeron-driver/src/main/c/aeron_ipc_publication.c +++ b/aeron-driver/src/main/c/aeron_ipc_publication.c @@ -101,7 +101,7 @@ int aeron_ipc_publication_create( if (params->has_position) { int64_t term_id = params->term_id; - int32_t term_count = params->term_id - initial_term_id; + int32_t term_count = aeron_logbuffer_compute_term_count(params->term_id, initial_term_id); size_t active_index = aeron_logbuffer_index_by_term_count(term_count); _pub->log_meta_data->term_tail_counters[active_index] = @@ -163,6 +163,8 @@ int aeron_ipc_publication_create( _pub->pub_pos_position.counter_id = pub_pos_position->counter_id; _pub->pub_pos_position.value_addr = pub_pos_position->value_addr; _pub->initial_term_id = initial_term_id; + _pub->starting_term_id = params->has_position ? params->term_id : initial_term_id; + _pub->starting_term_offset = params->has_position ? params->term_offset : 0; _pub->position_bits_to_shift = (size_t)aeron_number_of_trailing_zeroes((int32_t)params->term_length); _pub->term_window_length = (int64_t)aeron_producer_window_length( context->ipc_publication_window_length, params->term_length); diff --git a/aeron-driver/src/main/c/aeron_ipc_publication.h b/aeron-driver/src/main/c/aeron_ipc_publication.h index 0dd535142b..cf4001707c 100644 --- a/aeron-driver/src/main/c/aeron_ipc_publication.h +++ b/aeron-driver/src/main/c/aeron_ipc_publication.h @@ -65,6 +65,8 @@ typedef struct aeron_ipc_publication_stct int32_t session_id; int32_t stream_id; int32_t initial_term_id; + int32_t starting_term_id; + size_t starting_term_offset; size_t log_file_name_length; size_t position_bits_to_shift; bool is_exclusive; diff --git a/aeron-driver/src/main/c/aeron_loss_detector.c b/aeron-driver/src/main/c/aeron_loss_detector.c index 9b06bafd6b..f75a6237d6 100755 --- a/aeron-driver/src/main/c/aeron_loss_detector.c +++ b/aeron-driver/src/main/c/aeron_loss_detector.c @@ -60,7 +60,7 @@ int32_t aeron_loss_detector_scan( const int32_t rebuild_term_count = (int32_t)(rebuild_position >> position_bits_to_shift); const int32_t hwm_term_count = (int32_t)(hwm_position >> position_bits_to_shift); - const int32_t rebuild_term_id = initial_term_id + rebuild_term_count; + const int32_t rebuild_term_id = aeron_add_wrap_i32(initial_term_id, rebuild_term_count); const int32_t hwm_term_offset = (int32_t)(hwm_position & term_length_mask); const int32_t limit_offset = rebuild_term_count == hwm_term_count ? hwm_term_offset : (int32_t)(term_length_mask + 1); diff --git a/aeron-driver/src/main/c/aeron_network_publication.c b/aeron-driver/src/main/c/aeron_network_publication.c index 2af571b0f3..32b761c1d9 100644 --- a/aeron-driver/src/main/c/aeron_network_publication.c +++ b/aeron-driver/src/main/c/aeron_network_publication.c @@ -122,7 +122,7 @@ int aeron_network_publication_create( if (params->has_position) { int64_t term_id = params->term_id; - int32_t term_count = params->term_id - initial_term_id; + int32_t term_count = aeron_logbuffer_compute_term_count(params->term_id, initial_term_id); size_t active_index = aeron_logbuffer_index_by_term_count(term_count); _pub->log_meta_data->term_tail_counters[active_index] = @@ -197,6 +197,8 @@ int aeron_network_publication_create( _pub->snd_bpe_counter.value_addr = snd_bpe_counter->value_addr; _pub->tag = params->entity_tag; _pub->initial_term_id = initial_term_id; + _pub->starting_term_id = params->has_position ? params->term_id : initial_term_id; + _pub->starting_term_offset = params->has_position ? params->term_offset : 0; _pub->term_buffer_length = _pub->log_meta_data->term_length; _pub->term_length_mask = (int32_t)params->term_length - 1; _pub->position_bits_to_shift = (size_t)aeron_number_of_trailing_zeroes((int32_t)params->term_length); diff --git a/aeron-driver/src/main/c/aeron_network_publication.h b/aeron-driver/src/main/c/aeron_network_publication.h index c1af6a5786..bd8dde8661 100644 --- a/aeron-driver/src/main/c/aeron_network_publication.h +++ b/aeron-driver/src/main/c/aeron_network_publication.h @@ -85,7 +85,9 @@ typedef struct aeron_network_publication_stct int32_t session_id; int32_t stream_id; int32_t initial_term_id; + int32_t starting_term_id; int32_t term_length_mask; + size_t starting_term_offset; size_t log_file_name_length; size_t position_bits_to_shift; size_t mtu_length; diff --git a/aeron-driver/src/main/c/uri/aeron_driver_uri.c b/aeron-driver/src/main/c/uri/aeron_driver_uri.c index 488f543175..11207df5c9 100644 --- a/aeron-driver/src/main/c/uri/aeron_driver_uri.c +++ b/aeron-driver/src/main/c/uri/aeron_driver_uri.c @@ -42,6 +42,7 @@ int aeron_uri_get_term_length_param(aeron_uri_params_t *uri_params, aeron_driver } params->term_length = value; + params->has_term_length = true; } return 0; @@ -67,6 +68,7 @@ int aeron_uri_get_mtu_length_param(aeron_uri_params_t *uri_params, aeron_driver_ } params->mtu_length = value; + params->has_mtu_length = true; } return 0; @@ -158,7 +160,9 @@ int aeron_diver_uri_publication_params( params->linger_timeout_ns = context->publication_linger_timeout_ns; params->term_length = AERON_URI_IPC == uri->type ? context->ipc_term_buffer_length : context->term_buffer_length; + params->has_term_length = false; params->mtu_length = AERON_URI_IPC == uri->type ? context->ipc_mtu_length : context->mtu_length; + params->has_mtu_length = false; params->initial_term_id = 0; params->term_offset = 0; params->term_id = 0; @@ -235,16 +239,6 @@ int aeron_diver_uri_publication_params( { char *end_ptr = NULL; - if (!is_exclusive) - { - AERON_SET_ERR( - EINVAL, - "params: %s %s %s are not supported for concurrent publications", - AERON_URI_INITIAL_TERM_ID_KEY, - AERON_URI_TERM_ID_KEY, - AERON_URI_TERM_OFFSET_KEY); - return -1; - } if (count < 3) { AERON_SET_ERR( diff --git a/aeron-driver/src/main/c/uri/aeron_driver_uri.h b/aeron-driver/src/main/c/uri/aeron_driver_uri.h index 37b62a32b5..20efb0ccee 100644 --- a/aeron-driver/src/main/c/uri/aeron_driver_uri.h +++ b/aeron-driver/src/main/c/uri/aeron_driver_uri.h @@ -27,7 +27,9 @@ typedef struct aeron_driver_uri_publication_params_stct bool is_sparse; bool signal_eos; bool spies_simulate_connection; + bool has_mtu_length; size_t mtu_length; + bool has_term_length; size_t term_length; size_t term_offset; int32_t initial_term_id; diff --git a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java index 292e0c0d3e..e9ac1289eb 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java +++ b/aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java @@ -476,7 +476,16 @@ void onAddNetworkPublication( } else { - confirmMatch(channelUri, params, publication.rawLog(), publication.sessionId(), publication.channel()); + confirmMatch( + channelUri, + params, + publication.rawLog(), + publication.sessionId(), + publication.channel(), + publication.initialTermId(), + publication.startingTermId(), + publication.startingTermOffset()); + validateSpiesSimulateConnection( params, publication.spiesSimulateConnection(), channel, publication.channel()); } @@ -674,7 +683,15 @@ void onAddIpcPublication( } else { - confirmMatch(channelUri, params, publication.rawLog(), publication.sessionId(), publication.channel()); + confirmMatch( + channelUri, + params, + publication.rawLog(), + publication.sessionId(), + publication.channel(), + publication.initialTermId(), + publication.startingTermId(), + publication.startingTermOffset()); } publicationLinks.add(new PublicationLink(correlationId, getOrAddClient(clientId), publication)); @@ -1797,7 +1814,8 @@ private IpcPublication addIpcPublication( publisherLimit, rawLog, Configuration.producerWindowLength(params.termLength, ctx.ipcPublicationTermWindowLength()), - isExclusive); + isExclusive, + params); ipcPublications.add(publication); activeSessionSet.add(new SessionKey(sessionId, streamId, IPC_MEDIA)); diff --git a/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java b/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java index c1ed60f6b4..d0addc3880 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java +++ b/aeron-driver/src/main/java/io/aeron/driver/IpcPublication.java @@ -59,6 +59,8 @@ enum State private final int termWindowLength; private final int positionBitsToShift; private final int initialTermId; + private final int startingTermId; + private final int startingTermOffset; private final ErrorHandler errorHandler; private long tripLimit; private long consumerPosition; @@ -89,7 +91,8 @@ enum State final Position publisherLimit, final RawLog rawLog, final int termWindowLength, - final boolean isExclusive) + final boolean isExclusive, + final PublicationParams params) { this.registrationId = registrationId; this.channel = channel; @@ -98,7 +101,9 @@ enum State this.streamId = streamId; this.isExclusive = isExclusive; this.termBuffers = rawLog.termBuffers(); - this.initialTermId = initialTermId(rawLog.metaData()); + this.initialTermId = LogBufferDescriptor.initialTermId(rawLog.metaData()); + this.startingTermId = params.hasPosition ? params.termId : initialTermId; + this.startingTermOffset = params.hasPosition ? params.termOffset : 0; this.errorHandler = ctx.errorHandler(); final int termLength = rawLog.termLength(); @@ -166,6 +171,21 @@ boolean isExclusive() return isExclusive; } + int initialTermId() + { + return initialTermId; + } + + int startingTermId() + { + return startingTermId; + } + + int startingTermOffset() + { + return startingTermOffset; + } + RawLog rawLog() { return rawLog; diff --git a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java index fde55dbcd4..648181e93d 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java +++ b/aeron-driver/src/main/java/io/aeron/driver/NetworkPublication.java @@ -115,6 +115,8 @@ enum State private final long tag; private final int positionBitsToShift; private final int initialTermId; + private final int startingTermId; + private final int startingTermOffset; private final int termBufferLength; private final int termLengthMask; private final int mtuLength; @@ -200,6 +202,8 @@ enum State this.spiesSimulateConnection = params.spiesSimulateConnection; this.signalEos = params.signalEos; this.isExclusive = isExclusive; + this.startingTermId = params.hasPosition ? params.termId : initialTermId; + this.startingTermOffset = params.hasPosition ? params.termOffset : 0; metaDataBuffer = rawLog.metaData(); setupBuffer = threadLocals.setupBuffer(); @@ -568,6 +572,21 @@ boolean spiesSimulateConnection() return spiesSimulateConnection; } + int initialTermId() + { + return initialTermId; + } + + int startingTermId() + { + return startingTermId; + } + + int startingTermOffset() + { + return startingTermOffset; + } + boolean isAcceptingSubscriptions() { return State.ACTIVE == state || diff --git a/aeron-driver/src/main/java/io/aeron/driver/PublicationParams.java b/aeron-driver/src/main/java/io/aeron/driver/PublicationParams.java index 08ec317fa8..2dc5c86e03 100644 --- a/aeron-driver/src/main/java/io/aeron/driver/PublicationParams.java +++ b/aeron-driver/src/main/java/io/aeron/driver/PublicationParams.java @@ -76,12 +76,6 @@ static PublicationParams getPublicationParams( if (count > 0) { - if (!isExclusive) - { - throw new IllegalArgumentException("params: " + INITIAL_TERM_ID_PARAM_NAME + " " + TERM_ID_PARAM_NAME + - " " + TERM_OFFSET_PARAM_NAME + " are not supported for concurrent publications: channel=" + - channelUri); - } if (count < 3) { throw new IllegalArgumentException("params must be used as a complete set: " + @@ -206,33 +200,87 @@ static void validateMtuLength( } } + private static String formatMatchError( + final String paramName, + final String existingValue, + final String newValue, + final String existingChannelUri, + final String newChannelUri) + { + return "existing publication has different '" + paramName + "': existing=" + + existingValue + " requested=" + newValue + " existingChannel=" + existingChannelUri + + " channel=" + newChannelUri; + } + static void confirmMatch( final ChannelUri channelUri, final PublicationParams params, final RawLog rawLog, final int existingSessionId, - final String existingChannel) + final String existingChannel, + final int existingInitialTermId, + final int existingTermId, + final int existingTermOffset) { final int mtuLength = LogBufferDescriptor.mtuLength(rawLog.metaData()); if (channelUri.containsKey(MTU_LENGTH_PARAM_NAME) && mtuLength != params.mtuLength) { - throw new IllegalStateException("existing publication has different MTU length: existing=" + - mtuLength + " requested=" + params.mtuLength + " existingChannel=" + existingChannel + - " channel=" + channelUri); + throw new IllegalStateException(formatMatchError( + MTU_LENGTH_PARAM_NAME, + String.valueOf(mtuLength), + String.valueOf(params.mtuLength), + existingChannel, + channelUri.toString())); } if (channelUri.containsKey(TERM_LENGTH_PARAM_NAME) && rawLog.termLength() != params.termLength) { - throw new IllegalStateException("existing publication has different term length: existing=" + - rawLog.termLength() + " requested=" + params.termLength + " existingChannel=" + existingChannel + - " channel=" + channelUri); + throw new IllegalStateException(formatMatchError( + TERM_LENGTH_PARAM_NAME, + String.valueOf(rawLog.termLength()), + String.valueOf(params.termLength), + existingChannel, + channelUri.toString())); } if (channelUri.containsKey(SESSION_ID_PARAM_NAME) && params.sessionId != existingSessionId) { - throw new IllegalStateException("existing publication has different session id: existing=" + - existingSessionId + " requested=" + params.sessionId + " existingChannel=" + existingChannel + - " channel=" + channelUri); + throw new IllegalStateException(formatMatchError( + SESSION_ID_PARAM_NAME, + String.valueOf(existingSessionId), + String.valueOf(params.sessionId), + existingChannel, + channelUri.toString())); + } + + if (channelUri.containsKey(INITIAL_TERM_ID_PARAM_NAME) && params.initialTermId != existingInitialTermId) + { + throw new IllegalStateException(formatMatchError( + INITIAL_TERM_ID_PARAM_NAME, + String.valueOf(existingInitialTermId), + String.valueOf(params.initialTermId), + existingChannel, + channelUri.toString())); + } + + if (channelUri.containsKey(TERM_ID_PARAM_NAME) && params.termId != existingTermId) + { + throw new IllegalStateException(formatMatchError( + TERM_ID_PARAM_NAME, + String.valueOf(existingTermId), + String.valueOf(params.termId), + existingChannel, + channelUri.toString())); + } + + if (channelUri.containsKey(TERM_OFFSET_PARAM_NAME) && params.termOffset != existingTermOffset) + { + throw new IllegalStateException(formatMatchError( + TERM_OFFSET_PARAM_NAME, + String.valueOf(existingTermOffset), + String.valueOf(params.termOffset), + existingChannel, + channelUri.toString())); } } diff --git a/aeron-driver/src/test/java/io/aeron/driver/UntetheredSubscriptionTest.java b/aeron-driver/src/test/java/io/aeron/driver/UntetheredSubscriptionTest.java index 2cf867832f..ef9a792eeb 100644 --- a/aeron-driver/src/test/java/io/aeron/driver/UntetheredSubscriptionTest.java +++ b/aeron-driver/src/test/java/io/aeron/driver/UntetheredSubscriptionTest.java @@ -66,7 +66,8 @@ public void before() publisherLimit, rawLog, TERM_WINDOW_LENGTH, - true); + true, + new PublicationParams()); } @Test diff --git a/aeron-system-tests/src/test/java/io/aeron/SpecifiedPositionPublicationTest.java b/aeron-system-tests/src/test/java/io/aeron/SpecifiedPositionPublicationTest.java index 5a4b29f5dd..39d0132783 100644 --- a/aeron-system-tests/src/test/java/io/aeron/SpecifiedPositionPublicationTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/SpecifiedPositionPublicationTest.java @@ -18,41 +18,217 @@ import io.aeron.driver.MediaDriver; import io.aeron.driver.ThreadingMode; import io.aeron.exceptions.RegistrationException; +import io.aeron.logbuffer.FragmentHandler; +import io.aeron.logbuffer.FrameDescriptor; import io.aeron.logbuffer.LogBufferDescriptor; +import io.aeron.protocol.DataHeaderFlyweight; +import io.aeron.test.InterruptAfter; +import io.aeron.test.RandomWatcher; import io.aeron.test.SystemTestWatcher; +import io.aeron.test.Tests; import io.aeron.test.driver.TestMediaDriver; -import org.agrona.ErrorHandler; -import org.junit.jupiter.api.Test; +import org.agrona.DirectBuffer; +import org.agrona.concurrent.UnsafeBuffer; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import java.util.function.Function; + +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.Mockito.mock; class SpecifiedPositionPublicationTest { @RegisterExtension final SystemTestWatcher testWatcher = new SystemTestWatcher(); + @RegisterExtension + final RandomWatcher randomWatcher = new RandomWatcher(); - @Test - void shouldRejectSpecifiedPositionForConcurrentPublications() + @InterruptAfter(5) + @ParameterizedTest + @CsvSource({ + CommonContext.IPC_CHANNEL + ",true", + "aeron:udp?endpoint=localhost:24325,true", + CommonContext.IPC_CHANNEL + ",false", + "aeron:udp?endpoint=localhost:24325,false" + }) + void shouldStartAtSpecifiedPositionForPublications(final String initialUri, final boolean exclusive) { - final ErrorHandler mockErrorHandler = mock(ErrorHandler.class); final MediaDriver.Context context = new MediaDriver.Context() - .errorHandler(mockErrorHandler) .dirDeleteOnStart(true) .ipcPublicationTermWindowLength(LogBufferDescriptor.TERM_MIN_LENGTH) .threadingMode(ThreadingMode.SHARED); + final DirectBuffer msg = new UnsafeBuffer(new byte[64]); + + final int termLength = 1 << 16; + final int initialTermId = randomWatcher.random().nextInt(); + final int activeTermId = initialTermId + randomWatcher.random().nextInt(Integer.MAX_VALUE); + final int positionBitsToShift = LogBufferDescriptor.positionBitsToShift(termLength); + final int termOffset = randomWatcher.random().nextInt(termLength) & -FrameDescriptor.FRAME_ALIGNMENT; + final long startPosition = LogBufferDescriptor.computePosition( + activeTermId, + termOffset, + positionBitsToShift, + initialTermId); + final long nextPosition = startPosition + DataHeaderFlyweight.HEADER_LENGTH + msg.capacity(); + + final String channel = new ChannelUriStringBuilder(initialUri) + .initialPosition(startPosition, initialTermId, termLength) + .build(); + + final int streamId = 1001; + final Function publicationSupplier = exclusive ? + (a) -> a.addExclusivePublication(channel, streamId) : (a) -> a.addPublication(channel, streamId); try ( TestMediaDriver mediaDriver = TestMediaDriver.launch(context, testWatcher); - Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName()))) + Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName())); + Subscription subscription = aeron.addSubscription(initialUri, streamId); + Publication publication = publicationSupplier.apply(aeron)) + { + Tests.awaitConnected(subscription); + Tests.awaitConnected(publication); + + assertEquals(startPosition, publication.position()); + + Tests.await(() -> publication.offer(msg) > 0); + assertEquals(nextPosition, publication.position()); + + final FragmentHandler fragmentHandler = + (buffer, offset, length, header) -> assertEquals(nextPosition, header.position()); + + Tests.await(() -> subscription.poll(fragmentHandler, 1) == 1); + } + finally { - final String channel = new ChannelUriStringBuilder() - .media("ipc") - .initialPosition(1024, -873648623, 65536) + context.deleteDirectory(); + } + } + + @InterruptAfter(5) + @ParameterizedTest + @CsvSource({ + CommonContext.IPC_CHANNEL, + "aeron:udp?endpoint=localhost:24325" + }) + void shouldValidateSpecifiedPositionForConcurrentPublications(final String initialUri) + { + final MediaDriver.Context context = new MediaDriver.Context() + .dirDeleteOnStart(true) + .ipcPublicationTermWindowLength(LogBufferDescriptor.TERM_MIN_LENGTH) + .threadingMode(ThreadingMode.SHARED); + final DirectBuffer msg = new UnsafeBuffer(new byte[64]); + + final int termLength = 1 << 16; + final int initialTermId = randomWatcher.random().nextInt(); + final int activeTermId = initialTermId + randomWatcher.random().nextInt(Integer.MAX_VALUE); + final int positionBitsToShift = LogBufferDescriptor.positionBitsToShift(termLength); + final int termOffset = randomWatcher.random().nextInt(termLength) & -FrameDescriptor.FRAME_ALIGNMENT; + final long startPosition = LogBufferDescriptor.computePosition( + activeTermId, + termOffset, + positionBitsToShift, + initialTermId); + final long positionMsg1 = startPosition + DataHeaderFlyweight.HEADER_LENGTH + msg.capacity(); + final long positionMsg2 = positionMsg1 + DataHeaderFlyweight.HEADER_LENGTH + msg.capacity(); + final long positionMsg3 = positionMsg2 + DataHeaderFlyweight.HEADER_LENGTH + msg.capacity(); + + final String channel = new ChannelUriStringBuilder(initialUri) + .initialPosition(startPosition, initialTermId, termLength) + .build(); + + final String invalidPositionUri = new ChannelUriStringBuilder(initialUri) + .initialPosition(startPosition + FrameDescriptor.FRAME_ALIGNMENT, initialTermId, termLength) + .build(); + final String invalidInitialTermIdUri = new ChannelUriStringBuilder(initialUri) + .initialPosition(startPosition, initialTermId + 1, termLength) + .build(); + final String invalidTermLengthUri = new ChannelUriStringBuilder(initialUri) + .initialPosition(startPosition, initialTermId, termLength << 1) + .build(); + + final int streamId = 1001; + + try ( + TestMediaDriver mediaDriver = TestMediaDriver.launch(context, testWatcher); + Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName())); + Subscription subscription = aeron.addSubscription(initialUri, streamId); + Publication publication = aeron.addPublication(channel, streamId)) + { + Tests.awaitConnected(subscription); + Tests.awaitConnected(publication); + + assertEquals(startPosition, publication.position()); + + Tests.await(() -> publication.offer(msg) > 0); + assertEquals(positionMsg1, publication.position()); + + Tests.await(() -> subscription.poll((buffer, offset, length, header) -> {}, 1) == 1); + + try (Publication publication2 = aeron.addPublication(channel, streamId)) + { + assertEquals(positionMsg1, publication2.position()); + Tests.await(() -> publication.offer(msg) > 0); + assertEquals(positionMsg2, publication2.position()); + final FragmentHandler fragmentHandler = + (buffer, offset, length, header) -> assertEquals(positionMsg2, header.position()); + Tests.await(() -> subscription.poll(fragmentHandler, 1) == 1); + } + + try (Publication publication3 = aeron.addPublication(initialUri, streamId)) + { + assertEquals(positionMsg2, publication3.position()); + Tests.await(() -> publication.offer(msg) > 0); + assertEquals(positionMsg3, publication3.position()); + final FragmentHandler fragmentHandler = + (buffer, offset, length, header) -> assertEquals(positionMsg3, header.position()); + Tests.await(() -> subscription.poll(fragmentHandler, 1) == 1); + } + + assertThrows(RegistrationException.class, () -> aeron.addPublication(invalidPositionUri, streamId)); + assertThrows(RegistrationException.class, () -> aeron.addPublication(invalidInitialTermIdUri, streamId)); + assertThrows(RegistrationException.class, () -> aeron.addPublication(invalidTermLengthUri, streamId)); + } + finally + { + context.deleteDirectory(); + } + } + + @InterruptAfter(5) + @ParameterizedTest + @CsvSource({ + CommonContext.IPC_CHANNEL, + "aeron:udp?endpoint=localhost:24325" + }) + void shouldValidateSpecifiedPositionForConcurrentPublicationsInitiallyUnspecified(final String initialUri) + { + final MediaDriver.Context context = new MediaDriver.Context() + .dirDeleteOnStart(true) + .ipcPublicationTermWindowLength(LogBufferDescriptor.TERM_MIN_LENGTH) + .threadingMode(ThreadingMode.SHARED); + + final int streamId = 1001; + + try ( + TestMediaDriver mediaDriver = TestMediaDriver.launch(context, testWatcher); + Aeron aeron = Aeron.connect(new Aeron.Context().aeronDirectoryName(mediaDriver.aeronDirectoryName())); + Subscription subscription = aeron.addSubscription(initialUri, streamId); + Publication publication = aeron.addPublication(initialUri, streamId)) + { + Tests.awaitConnected(subscription); + Tests.awaitConnected(publication); + + final String channel = new ChannelUriStringBuilder(initialUri) + .initialPosition(publication.position(), publication.initialTermId(), publication.termBufferLength()) .build(); - assertThrows(RegistrationException.class, () -> aeron.addPublication(channel, 1001)); + try (Publication publication2 = aeron.addPublication(channel, streamId)) + { + assertEquals(publication.position(), publication2.position()); + assertEquals(publication.initialTermId(), publication2.initialTermId()); + } } finally { diff --git a/aeron-system-tests/src/test/java/io/aeron/archive/ArchiveDeleteAndRestartTest.java b/aeron-system-tests/src/test/java/io/aeron/archive/ArchiveDeleteAndRestartTest.java index 6724eb77d2..944396df52 100644 --- a/aeron-system-tests/src/test/java/io/aeron/archive/ArchiveDeleteAndRestartTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/archive/ArchiveDeleteAndRestartTest.java @@ -52,7 +52,7 @@ public class ArchiveDeleteAndRestartTest private final long seed = System.nanoTime(); @RegisterExtension - public final TestWatcher randomSeedWatcher = ArchiveTests.newWatcher(seed); + public final TestWatcher randomSeedWatcher = Tests.seedWatcher(seed); @RegisterExtension public final SystemTestWatcher systemTestWatcher = new SystemTestWatcher(); diff --git a/aeron-system-tests/src/test/java/io/aeron/archive/ArchiveTest.java b/aeron-system-tests/src/test/java/io/aeron/archive/ArchiveTest.java index 341859e990..9a26b39714 100644 --- a/aeron-system-tests/src/test/java/io/aeron/archive/ArchiveTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/archive/ArchiveTest.java @@ -81,7 +81,7 @@ private static Stream threadingModes() private final long seed = System.nanoTime(); @RegisterExtension - public final TestWatcher randomSeedWatcher = ArchiveTests.newWatcher(seed); + public final TestWatcher randomSeedWatcher = Tests.seedWatcher(seed); @RegisterExtension public final SystemTestWatcher systemTestWatcher = new SystemTestWatcher(); diff --git a/aeron-system-tests/src/test/java/io/aeron/archive/ExtendRecordingTest.java b/aeron-system-tests/src/test/java/io/aeron/archive/ExtendRecordingTest.java index e6eb4a1d54..f3440c699e 100644 --- a/aeron-system-tests/src/test/java/io/aeron/archive/ExtendRecordingTest.java +++ b/aeron-system-tests/src/test/java/io/aeron/archive/ExtendRecordingTest.java @@ -37,9 +37,10 @@ import org.agrona.concurrent.status.CountersReader; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.InOrder; import org.mockito.Mockito; @@ -141,9 +142,15 @@ void after() CloseHelper.closeAll(aeronArchive, aeron, archive, driver); } - @Test + private interface PublicationFactory + { + Publication create(Aeron aeron, String uri, int streamId); + } + @InterruptAfter(10) - void shouldExtendRecordingAndReplay() + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void shouldExtendRecordingAndReplay(final boolean exclusive) { final long controlSessionId = aeronArchive.controlSessionId(); final int messageCount = 10; @@ -153,7 +160,10 @@ void shouldExtendRecordingAndReplay() final long stopTwo; final long recordingId; - try (Publication publication = aeron.addPublication(RECORDED_CHANNEL, RECORDED_STREAM_ID); + final PublicationFactory publicationFactory = + exclusive ? Aeron::addExclusivePublication : Aeron::addPublication; + + try (Publication publication = publicationFactory.create(aeron, RECORDED_CHANNEL, RECORDED_STREAM_ID); Subscription subscription = aeron.addSubscription(RECORDED_CHANNEL, RECORDED_STREAM_ID)) { @@ -195,7 +205,7 @@ void shouldExtendRecordingAndReplay() .build(); try (Subscription subscription = Tests.reAddSubscription(aeron, EXTEND_CHANNEL, RECORDED_STREAM_ID); - Publication publication = aeron.addExclusivePublication(publicationExtendChannel, RECORDED_STREAM_ID)) + Publication publication = publicationFactory.create(aeron, publicationExtendChannel, RECORDED_STREAM_ID)) { subscriptionIdTwo = aeronArchive.extendRecording(recordingId, EXTEND_CHANNEL, RECORDED_STREAM_ID, LOCAL); pollForRecordingSignal(aeronArchive); diff --git a/aeron-test-support/src/main/java/io/aeron/test/RandomWatcher.java b/aeron-test-support/src/main/java/io/aeron/test/RandomWatcher.java new file mode 100644 index 0000000000..d1a07e8609 --- /dev/null +++ b/aeron-test-support/src/main/java/io/aeron/test/RandomWatcher.java @@ -0,0 +1,37 @@ +/* + * Copyright 2014-2022 Real Logic Limited. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.aeron.test; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.TestWatcher; + +import java.util.Random; + +public class RandomWatcher implements TestWatcher +{ + private final long seed = System.nanoTime(); + private final Random random = new Random(); + + public void testFailed(final ExtensionContext context, final Throwable cause) + { + System.err.println(context.getDisplayName() + " failed with random seed: " + seed); + } + + public Random random() + { + return random; + } +} diff --git a/aeron-test-support/src/main/java/io/aeron/test/Tests.java b/aeron-test-support/src/main/java/io/aeron/test/Tests.java index 9a3c05d248..82f08c3d90 100644 --- a/aeron-test-support/src/main/java/io/aeron/test/Tests.java +++ b/aeron-test-support/src/main/java/io/aeron/test/Tests.java @@ -27,6 +27,8 @@ import org.agrona.concurrent.YieldingIdleStrategy; import org.agrona.concurrent.status.AtomicCounter; import org.agrona.concurrent.status.CountersReader; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.api.extension.TestWatcher; import javax.management.InstanceNotFoundException; import javax.management.MBeanServer; @@ -668,4 +670,15 @@ public static void dumpCollectedLogs(final String filename) LangUtil.rethrowUnchecked(ex); } } + + public static TestWatcher seedWatcher(final long seed) + { + return new TestWatcher() + { + public void testFailed(final ExtensionContext context, final Throwable cause) + { + System.err.println(context.getDisplayName() + " failed with random seed: " + seed); + } + }; + } }