Skip to content

Commit

Permalink
Merge pull request #7039 from alvasw/async_persistence_truncate_befor…
Browse files Browse the repository at this point in the history
…e_write

AsyncFileWriter: Truncate before write
  • Loading branch information
alejandrogarcia83 authored Feb 25, 2024
2 parents e53fb35 + fd6b446 commit a3f5bf5
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.nio.channels.CompletionHandler;
import java.nio.file.Path;

import java.io.IOException;

import java.util.concurrent.CompletableFuture;

import lombok.Getter;
Expand All @@ -36,6 +38,18 @@ public AsyncFileChannelWriter(Path filePath, AsynchronousFileChannel fileChannel
this.fileChannel = fileChannel;
}

@Override
public CompletableFuture<Void> truncate() {
var completableFuture = new CompletableFuture<Void>();
try {
fileChannel.truncate(0);
completableFuture.complete(null);
} catch (IOException e) {
completableFuture.completeExceptionally(e);
}
return completableFuture;
}

@Override
public CompletableFuture<Integer> write(byte[] data, int offset) {
var byteBuffer = ByteBuffer.wrap(data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.concurrent.CompletableFuture;

public interface AsyncFileWriter {
CompletableFuture<Void> truncate();

CompletableFuture<Integer> write(byte[] data, int offset);

Path getFilePath();
Expand Down
33 changes: 13 additions & 20 deletions persistence/src/main/java/bisq/persistence/AtomicFileWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@

import java.io.File;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -40,25 +39,19 @@ public AtomicFileWriter(Path destinationPath,
rollingFile = rollingFileWriter.getFilePath().toFile();
}

public synchronized void write(byte[] data) {
try {
CountDownLatch countDownLatch = rollingFileWriter.write(data);
boolean isSuccess = countDownLatch.await(45, TimeUnit.SECONDS);
if (!isSuccess) {
throw new AtomicFileWriteFailedException("Async atomic file write timeout triggered after 45 seconds.");
}

isSuccess = rollingFile.renameTo(activeFile);
if (!isSuccess) {
throw new AtomicFileWriteFailedException("Couldn't rename rolling file to active file.");
}

File tmpFile = activeFile;
activeFile = rollingFile;
rollingFile = tmpFile;
public synchronized CompletableFuture<Void> write(byte[] data) {
return rollingFileWriter.write(data)
.thenRunAsync(this::swapActiveAndRollingFile);
}

} catch (InterruptedException e) {
log.error("AtomicFileWriter got interrupted during write.", e);
private void swapActiveAndRollingFile() {
var isSuccess = rollingFile.renameTo(activeFile);
if (!isSuccess) {
throw new AtomicFileWriteFailedException("Couldn't rename rolling file to active file.");
}

File tmpFile = activeFile;
activeFile = rollingFile;
rollingFile = tmpFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import java.nio.file.Path;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

Expand All @@ -32,35 +32,44 @@ public PersistenceFileWriter(AsyncFileWriter asyncWriter, ExecutorService writeR
this.writeRequestScheduler = writeRequestScheduler;
}

public CountDownLatch write(byte[] data) {
CountDownLatch writeFinished = new CountDownLatch(1);
scheduleAsyncWrite(data, 0, data.length, writeFinished);
return writeFinished;
public CompletableFuture<Void> write(byte[] data) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
asyncWriter.truncate()
.thenRun(() -> scheduleAsyncWrite(data, 0, data.length, completableFuture))
.exceptionally(throwable -> {
completableFuture.completeExceptionally(throwable);
return null;
});
return completableFuture;
}

public Path getFilePath() {
return asyncWriter.getFilePath();
}

private void scheduleAsyncWrite(byte[] data, int offset, int size, CountDownLatch writeFinished) {
private void scheduleAsyncWrite(byte[] data, int offset, int size, CompletableFuture<Void> completableFuture) {
asyncWriter.write(data, offset)
.thenAcceptAsync(writeUntilEndAsync(data, offset, size, writeFinished), writeRequestScheduler);
.thenAcceptAsync(writeUntilEndAsync(data, offset, size, completableFuture), writeRequestScheduler)
.exceptionally(throwable -> {
completableFuture.completeExceptionally(throwable);
return null;
});
}

private Consumer<Integer> writeUntilEndAsync(byte[] data,
int offset,
int totalBytes,
CountDownLatch writeFinished) {
CompletableFuture<Void> completableFuture) {
return writtenBytes -> {
if (writtenBytes == totalBytes) {
writeFinished.countDown();
completableFuture.complete(null);
return;
}

int remainingBytes = totalBytes - writtenBytes;
if (remainingBytes > 0) {
int newOffset = offset + writtenBytes;
scheduleAsyncWrite(data, newOffset, remainingBytes, writeFinished);
scheduleAsyncWrite(data, newOffset, remainingBytes, completableFuture);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* This file is part of Bisq.
*
* Bisq is free software: you can redistribute it and/or modify it
* under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at
* your option) any later version.
*
* Bisq is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public
* License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with Bisq. If not, see <http://www.gnu.org/licenses/>.
*/

package bisq.persistence;

import java.nio.channels.AsynchronousFileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;

import java.io.IOException;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.mockito.junit.jupiter.MockitoExtension;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;

@ExtendWith(MockitoExtension.class)
public class AtomicFileWriterIntegrationTests {
private static ExecutorService executorService;
private Path filePath;
private AtomicFileWriter atomicFileWriter;

@BeforeAll
static void beforeAll() {
executorService = Executors.newSingleThreadExecutor();
}

@AfterAll
static void afterAll() {
executorService.shutdownNow();
}

@BeforeEach
void setup(@TempDir Path tempDir) throws IOException {
filePath = tempDir.resolve("file");
var fileChannel = AsynchronousFileChannel.open(filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE);

AsyncFileChannelWriter asyncFileChannelWriter = new AsyncFileChannelWriter(filePath, fileChannel);
PersistenceFileWriter persistenceFileWriter = new PersistenceFileWriter(asyncFileChannelWriter, executorService);
atomicFileWriter = new AtomicFileWriter(filePath, persistenceFileWriter);
}

@Test
void singleWrite() throws IOException, ExecutionException, InterruptedException, TimeoutException {
byte[] expectedData = "Hello World!".getBytes(StandardCharsets.UTF_8);
atomicFileWriter.write(expectedData).get(30, TimeUnit.SECONDS);

byte[] actualData = Files.readAllBytes(filePath);
assertThat(actualData, is(expectedData));
}

@Test
void twoWritesSecondSmaller() throws IOException, ExecutionException, InterruptedException, TimeoutException {
byte[] expectedData = "Hello World!".getBytes(StandardCharsets.UTF_8);
atomicFileWriter.write(expectedData).get(30, TimeUnit.SECONDS);

expectedData = "Bye!".getBytes(StandardCharsets.UTF_8);
atomicFileWriter.write(expectedData).get(30, TimeUnit.SECONDS);

byte[] actualData = Files.readAllBytes(filePath);
assertThat(actualData, is(expectedData));
}

@Test
void twoWriteSecondLarger() throws IOException, ExecutionException, InterruptedException, TimeoutException {
byte[] expectedData = "Hello World!".getBytes(StandardCharsets.UTF_8);
atomicFileWriter.write(expectedData).get(30, TimeUnit.SECONDS);

expectedData = "Bye! Hello World!".getBytes(StandardCharsets.UTF_8);
atomicFileWriter.write(expectedData).get(30, TimeUnit.SECONDS);

byte[] actualData = Files.readAllBytes(filePath);
assertThat(actualData, is(expectedData));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,13 @@
import java.nio.file.Path;

import java.io.File;
import java.io.IOException;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
Expand All @@ -32,9 +37,10 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;

Expand All @@ -45,13 +51,10 @@ public class AtomicFileWriterTests {
private PersistenceFileWriter persistenceFileWriter;
@Mock
private File rollingFile = mock(File.class);
@Mock
private CountDownLatch countDownLatch;
private AtomicFileWriter atomicFileWriter;

@BeforeEach
void setup(@TempDir Path tempDir, @Mock Path rollingFilePath) {
doReturn(countDownLatch).when(persistenceFileWriter).write(any());
doReturn(rollingFile).when(rollingFilePath).toFile();
doReturn(rollingFilePath).when(persistenceFileWriter).getFilePath();

Expand All @@ -60,25 +63,49 @@ void setup(@TempDir Path tempDir, @Mock Path rollingFilePath) {
}

@Test
void triggerFileWriteTimeout() throws InterruptedException {
doReturn(false).when(countDownLatch).await(anyLong(), any());
assertThrows(AtomicFileWriteFailedException.class,
() -> atomicFileWriter.write(DATA));
void writeFails() throws ExecutionException, InterruptedException, TimeoutException {
var ioException = new IOException();
doReturn(CompletableFuture.failedFuture(ioException))
.when(persistenceFileWriter).write(any());

CountDownLatch exceptionTriggeredLatch = new CountDownLatch(1);
atomicFileWriter.write(DATA)
.exceptionally(throwable -> {
assertThat(throwable.getCause(), is(ioException));
exceptionTriggeredLatch.countDown();
return null;
})
.get(30, TimeUnit.SECONDS);

assertThat(exceptionTriggeredLatch.getCount(), is(0L));
}

@Test
void renameFailure() throws InterruptedException {
doReturn(true).when(countDownLatch).await(anyLong(), any());
void renameFailure() throws InterruptedException, ExecutionException, TimeoutException {
doReturn(CompletableFuture.<Void>completedFuture(null))
.when(persistenceFileWriter).write(any());

doReturn(false).when(rollingFile).renameTo(any());

assertThrows(AtomicFileWriteFailedException.class,
() -> atomicFileWriter.write(DATA));
CountDownLatch exceptionTriggeredLatch = new CountDownLatch(1);
atomicFileWriter.write(DATA)
.exceptionally(throwable -> {
assertThat(throwable.getCause(), instanceOf(AtomicFileWriteFailedException.class));
exceptionTriggeredLatch.countDown();
return null;
})
.get(30, TimeUnit.SECONDS);

assertThat(exceptionTriggeredLatch.getCount(), is(0L));
}

@Test
void write() throws InterruptedException {
doReturn(true).when(countDownLatch).await(anyLong(), any());
void write() throws InterruptedException, ExecutionException, TimeoutException {
doReturn(CompletableFuture.<Void>completedFuture(null))
.when(persistenceFileWriter).write(any());

doReturn(true).when(rollingFile).renameTo(any());
atomicFileWriter.write(DATA);

atomicFileWriter.write(DATA).get(30, TimeUnit.SECONDS);
}
}
Loading

0 comments on commit a3f5bf5

Please sign in to comment.