Skip to content

Commit

Permalink
Fixing tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
conniey committed Jan 7, 2020
1 parent 27614c5 commit cd404de
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ public void canReceive() {
} finally {
isActive.set(false);
producerEvents.dispose();
consumer.close();
dispose(producer, consumer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ public void setup() {
connectionOptions = new ConnectionOptions(HOSTNAME, "event-hub-path", tokenCredential,
CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP_WEB_SOCKETS, new AmqpRetryOptions(),
ProxyOptions.SYSTEM_DEFAULTS, Schedulers.parallel());
connectionProcessor = Mono.fromCallable(() -> connection).subscribeWith(new EventHubConnectionProcessor(
connectionOptions.getFullyQualifiedNamespace(), connectionOptions.getEntityPath(),
connectionOptions.getRetry()));
connectionProcessor = Flux.<EventHubAmqpConnection>create(sink -> sink.next(connection))
.subscribeWith(new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(),
connectionOptions.getEntityPath(), connectionOptions.getRetry()));

when(connection.getEndpointStates()).thenReturn(endpointProcessor);
when(connection.createReceiveLink(any(), argThat(name -> name.endsWith(PARTITION_ID)), any(EventPosition.class), any(ReceiveOptions.class)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,23 @@

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.azure.messaging.eventhubs.models.SendOptions;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import static java.nio.charset.StandardCharsets.UTF_8;

Expand Down Expand Up @@ -155,10 +161,12 @@ public void sendAllPartitions() {
Assertions.assertNotNull(partitionIds);

for (String partitionId : partitionIds) {
final EventDataBatch batch = producer.createBatch(new CreateBatchOptions().setPartitionId(partitionId)).block(TIMEOUT);
final EventDataBatch batch =
producer.createBatch(new CreateBatchOptions().setPartitionId(partitionId)).block(TIMEOUT);
Assertions.assertNotNull(batch);

Assertions.assertTrue(batch.tryAdd(TestUtils.getEvent("event", "test guid", Integer.parseInt(partitionId))));
Assertions.assertTrue(batch.tryAdd(TestUtils.getEvent("event", "test guid",
Integer.parseInt(partitionId))));

// Act & Assert
StepVerifier.create(producer.send(batch)).expectComplete().verify(TIMEOUT);
Expand Down Expand Up @@ -193,4 +201,36 @@ public void sendWithCredentials() {
dispose(client);
}
}

@Disabled("Testing long running operations and disconnections.")
@Test
void worksAfterReconnection() throws InterruptedException {
Flux.interval(Duration.ofSeconds(5))
.flatMap(position -> producer.createBatch().flatMap(batch -> {
IntStream.range(0, 3).mapToObj(number -> new EventData("Position" + position + ": " + number))
.forEach(event -> {
if (!batch.tryAdd(event)) {
logger.error("Could not add event. Size: {}. Max: {}. Content: {}",
batch.getSizeInBytes(), batch.getMaxSizeInBytes(), event.getBodyAsString());
}
});

return producer.send(batch).thenReturn(Instant.now());
}))
.onErrorContinue(error -> error instanceof AmqpException && ((AmqpException) error).isTransient(),
(error, value) -> {
System.out.println("Exception dropped: " + error.getMessage());
})
.subscribe(instant -> {
System.out.println("Sent batch at: " + instant);
}, error -> {
logger.error("Error sending batch: ", error);
}, () -> {
logger.info("Complete.");
});

System.out.println("Sleeping while performing work.");
TimeUnit.MINUTES.sleep(30);
System.out.println("Complete.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.AmqpTransportType;
import com.azure.core.amqp.ProxyOptions;
import com.azure.core.amqp.exception.AmqpErrorCondition;
Expand Down Expand Up @@ -69,8 +69,6 @@ public class EventHubProducerClientTest {
@Mock
private AmqpSendLink sendLink;
@Mock
private AmqpSession session;
@Mock
private EventHubAmqpConnection connection;
@Mock
private TokenCredential tokenCredential;
Expand All @@ -97,11 +95,13 @@ public void setup() {
ConnectionOptions connectionOptions = new ConnectionOptions(HOSTNAME, "event-hub-path", tokenCredential,
CbsAuthorizationType.SHARED_ACCESS_SIGNATURE, AmqpTransportType.AMQP_WEB_SOCKETS, retryOptions,
ProxyOptions.SYSTEM_DEFAULTS, Schedulers.parallel());
connectionProcessor = Mono.fromCallable(() -> connection).subscribeWith(new EventHubConnectionProcessor(
connectionOptions.getFullyQualifiedNamespace(), connectionOptions.getEntityPath(),
connectionOptions.getRetry()));
connectionProcessor = Flux.<EventHubAmqpConnection>create(sink -> sink.next(connection))
.subscribeWith(new EventHubConnectionProcessor(connectionOptions.getFullyQualifiedNamespace(),
connectionOptions.getEntityPath(), connectionOptions.getRetry()));
asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions,
tracerProvider, messageSerializer, false);

when(connection.getEndpointStates()).thenReturn(Flux.create(sink -> sink.next(AmqpEndpointState.ACTIVE)));
}

@AfterEach
Expand All @@ -121,11 +121,8 @@ public void sendSingleMessage() {
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventData eventData = new EventData("hello-world".getBytes(UTF_8));

when(connection.createSession(EVENT_HUB_NAME)).thenReturn(Mono.just(session));

// EC is the prefix they use when creating a link that sends to the service round-robin.
when(session.createProducer(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME),
eq(retryOptions.getTryTimeout()), any()))
when(connection.createSendLink(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME), any()))
.thenReturn(Mono.just(sendLink));

// Act
Expand Down Expand Up @@ -153,11 +150,8 @@ public void sendStartSpanSingleMessage() {
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());
final EventData eventData = new EventData("hello-world".getBytes(UTF_8));

when(connection.createSession(EVENT_HUB_NAME)).thenReturn(Mono.just(session));

// EC is the prefix they use when creating a link that sends to the service round-robin.
when(session.createProducer(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME),
eq(retryOptions.getTryTimeout()), any()))
when(connection.createSendLink(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME), any()))
.thenReturn(Mono.just(sendLink));

when(tracer1.start(eq("EventHubs.send"), any(), eq(ProcessKind.SEND))).thenAnswer(
Expand Down Expand Up @@ -200,11 +194,8 @@ public void sendMessageRetrySpanTest() {
final List<Tracer> tracers = Collections.singletonList(tracer1);
TracerProvider tracerProvider = new TracerProvider(tracers);

when(connection.createSession(EVENT_HUB_NAME)).thenReturn(Mono.just(session));

// EC is the prefix they use when creating a link that sends to the service round-robin.
when(session.createProducer(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME),
eq(retryOptions.getTryTimeout()), any()))
when(connection.createSendLink(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME), any()))
.thenReturn(Mono.just(sendLink));

final EventHubProducerAsyncClient asyncProducer = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME,
Expand Down Expand Up @@ -253,11 +244,9 @@ public void sendMultipleMessages() {
final SendOptions options = new SendOptions().setPartitionId(partitionId);
final EventHubProducerClient producer = new EventHubProducerClient(asyncProducer, retryOptions.getTryTimeout());

when(connection.createSession(argThat(name -> name.endsWith(partitionId))))
.thenReturn(Mono.just(session));

when(session.createProducer(argThat(name -> name.startsWith("PS")), argThat(name -> name.endsWith(partitionId)),
eq(retryOptions.getTryTimeout()), any()))
// EC is the prefix they use when creating a link that sends to the service round-robin.
when(connection.createSendLink(argThat(name -> name.startsWith("PS")),
argThat(name -> name.endsWith(partitionId)), any()))
.thenReturn(Mono.just(sendLink));

// Act
Expand Down Expand Up @@ -287,11 +276,9 @@ public void createsEventDataBatch() {

final AmqpSendLink link = mock(AmqpSendLink.class);
when(link.getLinkSize()).thenReturn(Mono.just(maxLinkSize));
when(connection.createSession(EVENT_HUB_NAME)).thenReturn(Mono.just(session));

// EC is the prefix they use when creating a link that sends to the service round-robin.
when(session.createProducer(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME),
eq(retryOptions.getTryTimeout()), any()))
when(connection.createSendLink(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME), any()))
.thenReturn(Mono.just(link));

// This event is 1024 bytes when serialized.
Expand Down Expand Up @@ -328,12 +315,10 @@ public void startsMessageSpanOnEventBatch() {

final AmqpSendLink link = mock(AmqpSendLink.class);
when(link.getLinkSize()).thenReturn(Mono.just(ClientConstants.MAX_MESSAGE_LENGTH_BYTES));
when(connection.createSession(EVENT_HUB_NAME)).thenReturn(Mono.just(session));

// EC is the prefix they use when creating a link that sends to the service round-robin.
when(session.createProducer(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME),
eq(retryOptions.getTryTimeout()), any()))
.thenReturn(Mono.just(link));
when(connection.createSendLink(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME), any()))
.thenReturn(Mono.just(sendLink));

when(tracer1.start(eq("EventHubs.message"), any(), eq(ProcessKind.MESSAGE))).thenAnswer(
invocation -> {
Expand Down Expand Up @@ -364,11 +349,8 @@ public void createsEventDataBatchWithPartitionKey() {
int eventOverhead = 98;
int maxEventPayload = maxBatchSize - eventOverhead;

when(connection.createSession(EVENT_HUB_NAME)).thenReturn(Mono.just(session));

// EC is the prefix they use when creating a link that sends to the service round-robin.
when(session.createProducer(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME),
eq(retryOptions.getTryTimeout()), any()))
when(connection.createSendLink(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME), any()))
.thenReturn(Mono.just(sendLink));

// This event is 1024 bytes when serialized.
Expand Down Expand Up @@ -401,11 +383,10 @@ public void createsEventDataBatchWithPartitionId() {
int maxEventPayload = maxBatchSize - eventOverhead;

String partitionId = "my-partition-id";
when(connection.createSession(argThat(name -> name.endsWith(partitionId)))).thenReturn(Mono.just(session));

// PS is the prefix when a partition sender link is created.
when(session.createProducer(argThat(name -> name.startsWith("PS")), argThat(name -> name.endsWith(partitionId)),
eq(retryOptions.getTryTimeout()), any()))
when(connection.createSendLink(argThat(name -> name.startsWith("PS")),
argThat(name -> name.endsWith(partitionId)), any()))
.thenReturn(Mono.just(sendLink));

// This event is 1024 bytes when serialized.
Expand Down Expand Up @@ -437,13 +418,11 @@ public void payloadTooLarge() {
int eventOverhead = 24;
int maxEventPayload = maxBatchSize - eventOverhead;

when(connection.createSession(EVENT_HUB_NAME)).thenReturn(Mono.just(session));

// EC is the prefix they use when creating a link that sends to the service round-robin.
when(session.createProducer(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME),
eq(retryOptions.getTryTimeout()), any()))
when(connection.createSendLink(argThat(name -> name.startsWith("EC")), eq(EVENT_HUB_NAME), any()))
.thenReturn(Mono.just(sendLink));


// This event is 1025 bytes when serialized.
final EventData event = new EventData(new byte[maxEventPayload + 1]);

Expand Down

0 comments on commit cd404de

Please sign in to comment.