Skip to content

Commit

Permalink
logging+better concurrency+Watchdog+docker
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeMirzayanov committed Sep 22, 2024
1 parent 0942513 commit 34da2f8
Show file tree
Hide file tree
Showing 14 changed files with 483 additions and 110 deletions.
13 changes: 5 additions & 8 deletions java/riorita/src/main/java/com/codeforces/riorita/Riorita.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -34,7 +35,7 @@ public class Riorita {
private final String hostAndPort;
private String keyPrefix = "";
private final boolean reconnect;
private AtomicInteger connectionOperationCount = new AtomicInteger();
private final AtomicInteger connectionOperationCount = new AtomicInteger();

public Riorita(String host, int port) {
this(host, port, true);
Expand Down Expand Up @@ -160,15 +161,15 @@ private <T> T runOperation(Operation<T> operation, int size) throws IOException
logger.warn("Can't process operation.", e);
exception = e;
try {
Thread.sleep(iteration * 100);
Thread.sleep(iteration * 100L);
} catch (InterruptedException ignored) {
// No operations.
}
reconnectQuietly();
}
} else {
try {
Thread.sleep(iteration * 100);
Thread.sleep(iteration * 100L);
} catch (InterruptedException ignored) {
// No operations.
}
Expand Down Expand Up @@ -266,11 +267,7 @@ private long nextRequestId() {
}

private byte[] getStringBytes(String s) {
try {
return s.getBytes("UTF-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("Can't find UTF-8 {" + this + "}.");
}
return s.getBytes(StandardCharsets.UTF_8);
}

@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package com.codeforces.riorita.watchdog;

import com.codeforces.riorita.Riorita;

import java.io.IOException;
import java.util.Random;
import java.util.concurrent.*;

public class Watchdog {
private static final Random RANDOM = new Random(getRandomSeed());
private static final ExecutorService executorService = Executors.newSingleThreadExecutor();

private static long getRandomSeed() {
return Runtime.getRuntime().freeMemory()
+ Runtime.getRuntime().maxMemory()
+ Runtime.getRuntime().totalMemory()
+ System.currentTimeMillis()
+ System.nanoTime()
+ Thread.currentThread().getId()
+ Watchdog.class.hashCode();
}

public static void main(String[] args) {
try {
if (args.length < 1) {
System.out.println("Usage: java -jar watchdog.jar <hostPort>"
+ " [chunkSize=1024] [chunkCount=10] [totalTimoutSec=50]");
System.exit(1);
}

String hostPort = args[0];

int chunkSize = args.length > 1 ? Integer.parseInt(args[1]) : 1024;
int chunkCount = args.length > 2 ? Integer.parseInt(args[2]) : 10;
int totalTimeoutSec = args.length > 3 ? Integer.parseInt(args[3]) : 50;

System.out.print("Watchdog {hostPort=" + hostPort
+ ", chunkSize="
+ chunkSize + ", chunkCount=" + chunkCount
+ ", totalTimeoutSec=" + totalTimeoutSec + "}: ");

String[] hostPortItems = hostPort.split(":");
if (hostPortItems.length < 1 || hostPortItems.length > 2) {
System.out.println("Invalid hostPort: '" + hostPort + "'");
System.exit(1);
}

String host = hostPortItems[0];
int port = hostPortItems.length > 1 ? Integer.parseInt(hostPortItems[1]) : 80;

executeWithTimeout(() -> {
try {
run(host, port, chunkSize, chunkCount, totalTimeoutSec);
} catch (Exception e) {
System.out.println(e.getMessage());
System.exit(1);
}
return null;
}, totalTimeoutSec);
} catch (Exception e) {
System.out.println(e.getMessage());
System.exit(1);
} finally {
executorService.shutdown();
}
}

private static void run(String host, int port,
int chunkSize, int chunkCount, int totalTimeoutSec) throws IOException {
Riorita riorita = new Riorita(host, port);

long startTimeMillis = System.currentTimeMillis();

byte[] chunk = new byte[chunkSize];
for (int i = 0; i < chunkCount; i++) {
long currentTimeMillis = System.currentTimeMillis();

if (!riorita.ping()) {
System.out.println("Failed to ping the server");
System.exit(1);
}

// Check if total timeout has been reached
if (currentTimeMillis - startTimeMillis > TimeUnit.SECONDS.toMillis(totalTimeoutSec)) {
System.out.println("Timed out after " + totalTimeoutSec + " sec");
System.exit(1);
}

if (riorita.get(getRandomKey()) != null) {
System.out.println("Failed to get chunk for non-existing key");
System.exit(1);
}

RANDOM.nextBytes(chunk);
String randomKey = getRandomKey();
System.out.println(randomKey);

// Perform the `put` operation with timeout
riorita.put(randomKey, chunk);

// Perform the `get` operation with timeout
byte[] gotBytes = riorita.get(randomKey);
if (gotBytes == null || gotBytes.length != chunkSize) {
System.out.println("Failed to get chunk for key: '" + randomKey + "'");
System.exit(1);
}

// Validate the received chunk
for (int j = 0; j < chunkSize; j++) {
if (gotBytes[j] != chunk[j]) {
System.out.println("Sent and got chunks are different for key: '"
+ randomKey + "' [pos=" + j + "]");
System.exit(1);
}
}

// Perform the `delete` operation with timeout
riorita.delete(randomKey);
}

long endTimeMillis = System.currentTimeMillis();
System.out.println("Done in " + (endTimeMillis - startTimeMillis) + " ms");
}

@SuppressWarnings("UnusedReturnValue")
private static <T> T executeWithTimeout(Callable<T> task, int timeoutSec) {
Future<T> future = executorService.submit(task);
try {
return future.get(timeoutSec, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.out.println("Operation timed out after " + timeoutSec + " seconds.");
System.exit(1);
} catch (InterruptedException | ExecutionException e) {
System.out.println("Error during operation: " + e.getMessage());
System.exit(1);
}
return null; // This line will never be reached because System.exit will terminate the program
}

private static String getRandomKey() {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 20; i++) {
sb.append((char) ('a' + RANDOM.nextInt(26)));
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,15 @@

import java.io.IOException;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;

public class RioritaBenchmark {
private static final Random TEST_RANDOM = new Random(System.currentTimeMillis());
private static final int PORT = 8100;

private static String getRandomString(int length) {
StringBuilder result = new StringBuilder(length);
Expand All @@ -23,7 +29,7 @@ private static byte[] getRandomBytes(int length) {
return bytes;
}

private static void validate(Riorita riorita, long total, long size) throws IOException {
private static void validate(long total, long size) {
long iterations = total / size;

System.out.println("Validation: doing " + iterations + " iterations for size " + size + ".");
Expand All @@ -38,41 +44,86 @@ private static void validate(Riorita riorita, long total, long size) throws IOEx

long start = System.currentTimeMillis();

for (int i = 0; i < iterations; i++) {
if (i % 10000 == 0) System.out.println("Done " + i + " in " + (System.currentTimeMillis() - start) + " ms.");

String key = keysList.get(TEST_RANDOM.nextInt(keysList.size()));

boolean has = cache.containsKey(key);
if (riorita.has(key) != has) {
throw new RuntimeException("Invalid has.");
}

byte[] result = riorita.get(key);
//noinspection DoubleNegation
if ((result != null) != has) {
throw new RuntimeException("Invalid get (has): (result != null)=" + (result != null) + ", has=" + has + ".");
}

if (has) {
if (!Arrays.equals(result, cache.get(key))) {
throw new RuntimeException("Invalid get.");
int NUM_THREADS = 16;
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
List<Future<?>> futures = new ArrayList<>();
AtomicLong progressCounter = new AtomicLong(0);
long iterationsPerThread = iterations / NUM_THREADS;

// Submit tasks for each thread
for (int t = 0; t < NUM_THREADS; t++) {
futures.add(executor.submit(() -> {
Riorita riorita = new Riorita("localhost", 8100);

try {
for (int i = 0; i < iterationsPerThread; i++) {
long currentProgress = progressCounter.incrementAndGet();

// Log progress every 10,000 iterations
if (currentProgress % 10000 == 0) {
System.out.println("Done " + currentProgress + " in " + (System.currentTimeMillis() - start) + " ms.");
}

String key = keysList.get(TEST_RANDOM.nextInt(keysList.size()));

{
boolean rioritaHas = riorita.has(key);
boolean cacheHas = cache.containsKey(key);
if (rioritaHas != cacheHas) {
//throw new RuntimeException("Invalid has: key.length=" + key.length() + ", key=" + key + ".");
System.out.println("ri: " + rioritaHas + ", ca: " + cacheHas);
}
}

byte[] result = riorita.get(key);
byte[] expected = cache.get(key);

//noinspection DoubleNegation
if ((result != null) != (expected != null)) {
throw new RuntimeException("Invalid get (has): (result != null)=" + (result != null)
+ ", has=" + (expected != null) + ".");
}

if (result != null) {
if (!Arrays.equals(result, cache.get(key))) {
throw new RuntimeException("Invalid get.");
}
}

// Perform extra gets for stress testing
for (int j = 0; j < 5; j++) {
riorita.get(keysList.get(TEST_RANDOM.nextInt(keysList.size())));
}

// Generate random data and put it into riorita and cache
result = getRandomString((int) size).getBytes();
riorita.put(key, result);
cache.put(key, result);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}));
}

for (int j = 0; j < 5; j++) {
riorita.get(keysList.get(TEST_RANDOM.nextInt(keysList.size())));
// Wait for all tasks to finish
for (Future<?> future : futures) {
try {
future.get(); // This will block until the task is complete
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}

result = getRandomString((int) size).getBytes();
riorita.put(key, result);
cache.put(key, result);
}

// Shutdown the executor
executor.shutdown();

System.out.println("Completed in " + (System.currentTimeMillis() - start) + " ms.");
}

private static void test(Riorita riorita, long total, long size) throws IOException {
private static void test(long total, long size) throws IOException {
Riorita riorita = new Riorita("localhost", PORT);

long iterations = total / size;

System.out.println("Testing: doing " + iterations + " iterations for size " + size + ".");
Expand Down Expand Up @@ -118,18 +169,16 @@ private static void test(Riorita riorita, long total, long size) throws IOExcept
}

public static void main(String[] args) throws IOException {
Riorita riorita = new Riorita("localhost", 8024);

if (args[0].startsWith("val")) {
long total = Long.parseLong(args[1]);
long size = Long.parseLong(args[2]);
validate(riorita, total, size);
validate(total, size);
}

if (args[0].startsWith("test")) {
long total = Long.parseLong(args[1]);
long size = Long.parseLong(args[2]);
test(riorita, total, size);
test(total, size);
}
}
}
39 changes: 39 additions & 0 deletions src/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
FROM alpine:3.18.4

RUN apk update && apk add --no-cache bash \
bash-doc \
bash-completion \
build-base \
git \
linux-headers \
make \
cmake \
zlib-dev \
bzip2-dev \
lz4-dev \
gflags-dev \
snappy-dev \
zstd-dev \
g++ \
boost-system \
boost-thread \
boost-filesystem \
boost-program_options \
leveldb-dev \
rocksdb-dev

RUN apk update && apk add --no-cache boost-dev

RUN apk update && apk add --no-cache valgrind

RUN mkdir /src
COPY *.cpp *.h /src/
RUN cd /src && g++ -std=c++20 -Wall -Wextra -Wconversion -DHAS_ROCKSDB -DHAS_LEVELDB -Ofast -o riorita riorita.cpp protocol.cpp compact.cpp storage.cpp cache.cpp -lboost_system -lboost_thread -lboost_filesystem -lboost_program_options -lpthread -lleveldb -lsnappy -lrocksdb

RUN mkdir /riorita
WORKDIR /riorita

COPY riorita.sh /
EXPOSE 8100

ENTRYPOINT ["/riorita.sh"]
Loading

0 comments on commit 34da2f8

Please sign in to comment.