Skip to content

Commit

Permalink
Direct TCP: address read-my-writes test failure (#6201)
Browse files Browse the repository at this point in the history
* Diagnostic improvements based on recent changes to 2.6.X

* Tweaked diagnostic messages

* Tweaks for diagnostics

* Tweaks for cloud test runs

* Tweaks for performance test run

* Tidied two files and tweaked ReadMyWriteWorkflow for performace test run on cloud

* Tidied some code and tweaked DEBUG messages for performance test run

* Changes for verifying cross-partition query fix on azure cloud

* Changes for verifying cross-partition query fix on azure cloud

* Switched from DEBUG to INFO-level logging

* Tweaked benchmark logger settings

* Added a newline
  • Loading branch information
David Noble authored Nov 7, 2019
1 parent 987324a commit b4029c2
Show file tree
Hide file tree
Showing 26 changed files with 502 additions and 320 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -160,8 +161,10 @@ protected String getDocumentLink(Document doc) {
protected abstract void performWorkload(BaseSubscriber<T> baseSubscriber, long i) throws Exception;

private boolean shouldContinue(long startTimeMillis, long iterationCount) {

Duration maxDurationTime = configuration.getMaxRunningTimeDuration();
int maxNumberOfOperations = configuration.getNumberOfOperations();

if (maxDurationTime == null) {
return iterationCount < maxNumberOfOperations;
}
Expand All @@ -181,16 +184,18 @@ void run() throws Exception {

successMeter = metricsRegistry.meter("#Successful Operations");
failureMeter = metricsRegistry.meter("#Unsuccessful Operations");

if (configuration.getOperationType() == Configuration.Operation.ReadLatency
|| configuration.getOperationType() == Configuration.Operation.WriteLatency)
|| configuration.getOperationType() == Configuration.Operation.WriteLatency) {
latency = metricsRegistry.timer("Latency");
}

reporter.start(configuration.getPrintingInterval(), TimeUnit.SECONDS);

long startTime = System.currentTimeMillis();

AtomicLong count = new AtomicLong(0);
long i;

for ( i = 0; shouldContinue(startTime, i); i++) {

BaseSubscriber<T> baseSubscriber = new BaseSubscriber<T>() {
Expand All @@ -201,7 +206,12 @@ protected void hookOnSubscribe(Subscription subscription) {

@Override
protected void hookOnNext(T value) {
logger.debug("hookOnNext: {}, count:{}", value, count.get());
}

@Override
protected void hookOnCancel() {
this.hookOnError(new CancellationException());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ protected void performWorkload(BaseSubscriber<FeedResponse<Document>> baseSubscr
} else {
throw new IllegalArgumentException("Unsupported Operation: " + configuration.getOperationType());
}
concurrencyControlSemaphore.acquire();

concurrencyControlSemaphore.acquire();
obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public static void main(String[] args) throws Exception {
throw new RuntimeException(cfg.getOperationType() + " is not supported");
}

LOGGER.info("Starting {}", cfg.getOperationType());
benchmark.run();
benchmark.shutdown();

Expand All @@ -73,6 +74,8 @@ public static void main(String[] args) throws Exception {
System.err.println("INVALID Usage: " + e.getMessage());
System.err.println("Try '-help' for more information.");
throw e;
} finally {
System.exit(0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,19 @@ protected void init() {

@Override
protected void performWorkload(BaseSubscriber<Document> baseSubscriber, long i) throws Exception {

Flux<Document> obs;
boolean readyMyWrite = RandomUtils.nextBoolean();

if (readyMyWrite) {

// will do a write and immediately upon success will either
// do a point read
// or single partition query
// or cross partition query to find the write.

int j = Math.toIntExact(Math.floorMod(i, 3));

switch (j) {
case 0:
// write a random document to cosmodb and update the cache.
Expand All @@ -78,7 +83,7 @@ protected void performWorkload(BaseSubscriber<Document> baseSubscriber, long i)
"couldn't find my write in a single partition query!"))));
break;
case 2:
// write a random document to cosmodb and update the cache.
// write a random document to cosmosdb and update the cache.
// then try to query for the document which just was written
obs = writeDocument()
.flatMap(d -> xPartitionQuery(generateQuery(d))
Expand All @@ -90,12 +95,15 @@ protected void performWorkload(BaseSubscriber<Document> baseSubscriber, long i)
throw new IllegalStateException();
}
} else {

// will either do
// a write
// a point read for a in memory cached document
// a point read for a in memory cached document4
// or single partition query for a in memory cached document
// or cross partition query for a in memory cached document

int j = Math.toIntExact(Math.floorMod(i, 4));

switch (j) {
case 0:
// write a random document to cosmosdb and update the cache
Expand Down Expand Up @@ -124,8 +132,14 @@ protected void performWorkload(BaseSubscriber<Document> baseSubscriber, long i)
}

concurrencyControlSemaphore.acquire();
logger.debug("concurrencyControlSemaphore: {}", concurrencyControlSemaphore);

obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber);
try {
obs.subscribeOn(Schedulers.parallel()).subscribe(baseSubscriber);
} catch (Throwable error) {
concurrencyControlSemaphore.release();
logger.error("subscription failed due to ", error);
}
}

private void populateCache() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,29 @@
# this is the log4j configuration for tests
# This is the log4j configuration for benchmarks

# Set root logger level to DEBUG and its only appender to A1.
log4j.rootLogger=INFO, A1
log4j.rootLogger=INFO, Console

log4j.category.com.azure.cosmos.internal.directconnectivity.rntbd=WARN
log4j.category.io.netty=INFO
log4j.category.io.reactivex=INFO
# A1 is set to be a ConsoleAppender.
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.category.com.azure.cosmos=INFO
log4j.category.com.azure.cosmos.benchmark=INFO
log4j.category.com.azure.cosmos.internal=INFO
log4j.category.com.aure.cosmos.internal.caches=INFO
log4j.category.com.aure.cosmos.internal.changefeed=INFO
log4j.category.com.azure.cosmos.internal.directconnectivity=INFO
log4j.category.com.azure.cosmos.internal.directconnectivity.rntbd=INFO
log4j.category.com.azure.cosmos.internal.http=INFO
log4j.category.com.azure.cosmos.internal.query=INFO
log4j.category.com.azure.cosmos.internal.query.aggregation=INFO
log4j.category.com.azure.cosmos.internal.query.metrics=INFO
log4j.category.com.azure.cosmos.internal.query.orderbyquery=INFO
log4j.category.com.azure.cosmos.internal.routing=INFO

# A1 uses PatternLayout.
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%d %5X{pid} [%t] %-5p %c - %m%n
log4j.category.com.azure.cosmos.internal.directconnectivity.RntbdTransportClient=INFO
log4j.category.com.azure.cosmos.internal.directconnectivity.rntbd.RntbdRequestManager=INFO

log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d %5X{pid} [%t] %-5p %c - %m%n

log4j.appender.LogFile=org.apache.log4j.FileAppender
log4j.appender.LogFile.File=${azure.cosmos.logger.directory}/azure-cosmos-benchmark.log
log4j.appender.LogFile.layout=org.apache.log4j.PatternLayout
log4j.appender.LogFile.layout.ConversionPattern=[%d][%p][${azure.cosmos.hostname}][thread:%t][logger:%c] %m%n
Loading

0 comments on commit b4029c2

Please sign in to comment.