From a30d4d55d6e0097b8d78a786a0435387c5c7b2fc Mon Sep 17 00:00:00 2001 From: Alva Swanson Date: Sat, 10 Feb 2024 15:30:11 +0100 Subject: [PATCH 1/4] Implement AsyncFileWriter.truncate() --- .../bisq/persistence/AsyncFileChannelWriter.java | 14 ++++++++++++++ .../java/bisq/persistence/AsyncFileWriter.java | 2 ++ 2 files changed, 16 insertions(+) diff --git a/persistence/src/main/java/bisq/persistence/AsyncFileChannelWriter.java b/persistence/src/main/java/bisq/persistence/AsyncFileChannelWriter.java index 1f9681e6dbf..36f257b5750 100644 --- a/persistence/src/main/java/bisq/persistence/AsyncFileChannelWriter.java +++ b/persistence/src/main/java/bisq/persistence/AsyncFileChannelWriter.java @@ -22,6 +22,8 @@ import java.nio.channels.CompletionHandler; import java.nio.file.Path; +import java.io.IOException; + import java.util.concurrent.CompletableFuture; import lombok.Getter; @@ -36,6 +38,18 @@ public AsyncFileChannelWriter(Path filePath, AsynchronousFileChannel fileChannel this.fileChannel = fileChannel; } + @Override + public CompletableFuture truncate() { + var completableFuture = new CompletableFuture(); + try { + fileChannel.truncate(0); + completableFuture.complete(null); + } catch (IOException e) { + completableFuture.completeExceptionally(e); + } + return completableFuture; + } + @Override public CompletableFuture write(byte[] data, int offset) { var byteBuffer = ByteBuffer.wrap(data); diff --git a/persistence/src/main/java/bisq/persistence/AsyncFileWriter.java b/persistence/src/main/java/bisq/persistence/AsyncFileWriter.java index 9cdfc60c4cb..359d11bd021 100644 --- a/persistence/src/main/java/bisq/persistence/AsyncFileWriter.java +++ b/persistence/src/main/java/bisq/persistence/AsyncFileWriter.java @@ -22,6 +22,8 @@ import java.util.concurrent.CompletableFuture; public interface AsyncFileWriter { + CompletableFuture truncate(); + CompletableFuture write(byte[] data, int offset); Path getFilePath(); From bf575246715cdce20db1ffbae792b820e88ad479 Mon Sep 17 00:00:00 2001 From: Alva Swanson Date: Thu, 15 Feb 2024 14:41:53 +0100 Subject: [PATCH 2/4] Return CompletableFuture in PersistenceFileWriter.write --- .../bisq/persistence/AtomicFileWriter.java | 33 ++++------- .../persistence/PersistenceFileWriter.java | 24 ++++---- .../persistence/AtomicFileWriterTests.java | 59 ++++++++++++++----- .../PersistenceFileWriterTests.java | 25 ++++---- 4 files changed, 81 insertions(+), 60 deletions(-) diff --git a/persistence/src/main/java/bisq/persistence/AtomicFileWriter.java b/persistence/src/main/java/bisq/persistence/AtomicFileWriter.java index 3c043420adb..39f1a6e2106 100644 --- a/persistence/src/main/java/bisq/persistence/AtomicFileWriter.java +++ b/persistence/src/main/java/bisq/persistence/AtomicFileWriter.java @@ -21,8 +21,7 @@ import java.io.File; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; @@ -40,25 +39,19 @@ public AtomicFileWriter(Path destinationPath, rollingFile = rollingFileWriter.getFilePath().toFile(); } - public synchronized void write(byte[] data) { - try { - CountDownLatch countDownLatch = rollingFileWriter.write(data); - boolean isSuccess = countDownLatch.await(45, TimeUnit.SECONDS); - if (!isSuccess) { - throw new AtomicFileWriteFailedException("Async atomic file write timeout triggered after 45 seconds."); - } - - isSuccess = rollingFile.renameTo(activeFile); - if (!isSuccess) { - throw new AtomicFileWriteFailedException("Couldn't rename rolling file to active file."); - } - - File tmpFile = activeFile; - activeFile = rollingFile; - rollingFile = tmpFile; + public synchronized CompletableFuture write(byte[] data) { + return rollingFileWriter.write(data) + .thenRunAsync(this::swapActiveAndRollingFile); + } - } catch (InterruptedException e) { - log.error("AtomicFileWriter got interrupted during write.", e); + private void swapActiveAndRollingFile() { + var isSuccess = rollingFile.renameTo(activeFile); + if (!isSuccess) { + throw new AtomicFileWriteFailedException("Couldn't rename rolling file to active file."); } + + File tmpFile = activeFile; + activeFile = rollingFile; + rollingFile = tmpFile; } } diff --git a/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java b/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java index 10c0e45b2ab..7b96e218f2d 100644 --- a/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java +++ b/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java @@ -19,7 +19,7 @@ import java.nio.file.Path; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; @@ -32,35 +32,39 @@ public PersistenceFileWriter(AsyncFileWriter asyncWriter, ExecutorService writeR this.writeRequestScheduler = writeRequestScheduler; } - public CountDownLatch write(byte[] data) { - CountDownLatch writeFinished = new CountDownLatch(1); - scheduleAsyncWrite(data, 0, data.length, writeFinished); - return writeFinished; + public CompletableFuture write(byte[] data) { + CompletableFuture completableFuture = new CompletableFuture<>(); + scheduleAsyncWrite(data, 0, data.length, completableFuture); + return completableFuture; } public Path getFilePath() { return asyncWriter.getFilePath(); } - private void scheduleAsyncWrite(byte[] data, int offset, int size, CountDownLatch writeFinished) { + private void scheduleAsyncWrite(byte[] data, int offset, int size, CompletableFuture completableFuture) { asyncWriter.write(data, offset) - .thenAcceptAsync(writeUntilEndAsync(data, offset, size, writeFinished), writeRequestScheduler); + .thenAcceptAsync(writeUntilEndAsync(data, offset, size, completableFuture), writeRequestScheduler) + .exceptionally(throwable -> { + completableFuture.completeExceptionally(throwable); + return null; + }); } private Consumer writeUntilEndAsync(byte[] data, int offset, int totalBytes, - CountDownLatch writeFinished) { + CompletableFuture completableFuture) { return writtenBytes -> { if (writtenBytes == totalBytes) { - writeFinished.countDown(); + completableFuture.complete(null); return; } int remainingBytes = totalBytes - writtenBytes; if (remainingBytes > 0) { int newOffset = offset + writtenBytes; - scheduleAsyncWrite(data, newOffset, remainingBytes, writeFinished); + scheduleAsyncWrite(data, newOffset, remainingBytes, completableFuture); } }; } diff --git a/persistence/src/test/java/bisq/persistence/AtomicFileWriterTests.java b/persistence/src/test/java/bisq/persistence/AtomicFileWriterTests.java index d6610b8b3a5..872d9187e7a 100644 --- a/persistence/src/test/java/bisq/persistence/AtomicFileWriterTests.java +++ b/persistence/src/test/java/bisq/persistence/AtomicFileWriterTests.java @@ -21,8 +21,13 @@ import java.nio.file.Path; import java.io.File; +import java.io.IOException; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -32,9 +37,10 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.io.TempDir; -import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -45,13 +51,10 @@ public class AtomicFileWriterTests { private PersistenceFileWriter persistenceFileWriter; @Mock private File rollingFile = mock(File.class); - @Mock - private CountDownLatch countDownLatch; private AtomicFileWriter atomicFileWriter; @BeforeEach void setup(@TempDir Path tempDir, @Mock Path rollingFilePath) { - doReturn(countDownLatch).when(persistenceFileWriter).write(any()); doReturn(rollingFile).when(rollingFilePath).toFile(); doReturn(rollingFilePath).when(persistenceFileWriter).getFilePath(); @@ -60,25 +63,49 @@ void setup(@TempDir Path tempDir, @Mock Path rollingFilePath) { } @Test - void triggerFileWriteTimeout() throws InterruptedException { - doReturn(false).when(countDownLatch).await(anyLong(), any()); - assertThrows(AtomicFileWriteFailedException.class, - () -> atomicFileWriter.write(DATA)); + void writeFails() throws ExecutionException, InterruptedException, TimeoutException { + var ioException = new IOException(); + doReturn(CompletableFuture.failedFuture(ioException)) + .when(persistenceFileWriter).write(any()); + + CountDownLatch exceptionTriggeredLatch = new CountDownLatch(1); + atomicFileWriter.write(DATA) + .exceptionally(throwable -> { + assertThat(throwable.getCause(), is(ioException)); + exceptionTriggeredLatch.countDown(); + return null; + }) + .get(30, TimeUnit.SECONDS); + + assertThat(exceptionTriggeredLatch.getCount(), is(0L)); } @Test - void renameFailure() throws InterruptedException { - doReturn(true).when(countDownLatch).await(anyLong(), any()); + void renameFailure() throws InterruptedException, ExecutionException, TimeoutException { + doReturn(CompletableFuture.completedFuture(null)) + .when(persistenceFileWriter).write(any()); + doReturn(false).when(rollingFile).renameTo(any()); - assertThrows(AtomicFileWriteFailedException.class, - () -> atomicFileWriter.write(DATA)); + CountDownLatch exceptionTriggeredLatch = new CountDownLatch(1); + atomicFileWriter.write(DATA) + .exceptionally(throwable -> { + assertThat(throwable.getCause(), instanceOf(AtomicFileWriteFailedException.class)); + exceptionTriggeredLatch.countDown(); + return null; + }) + .get(30, TimeUnit.SECONDS); + + assertThat(exceptionTriggeredLatch.getCount(), is(0L)); } @Test - void write() throws InterruptedException { - doReturn(true).when(countDownLatch).await(anyLong(), any()); + void write() throws InterruptedException, ExecutionException, TimeoutException { + doReturn(CompletableFuture.completedFuture(null)) + .when(persistenceFileWriter).write(any()); + doReturn(true).when(rollingFile).renameTo(any()); - atomicFileWriter.write(DATA); + + atomicFileWriter.write(DATA).get(30, TimeUnit.SECONDS); } } diff --git a/persistence/src/test/java/bisq/persistence/PersistenceFileWriterTests.java b/persistence/src/test/java/bisq/persistence/PersistenceFileWriterTests.java index ffcd0fd3742..348f4b06a98 100644 --- a/persistence/src/test/java/bisq/persistence/PersistenceFileWriterTests.java +++ b/persistence/src/test/java/bisq/persistence/PersistenceFileWriterTests.java @@ -17,9 +17,11 @@ package bisq.persistence; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -30,8 +32,6 @@ import org.junit.jupiter.api.extension.ExtendWith; import static java.util.concurrent.CompletableFuture.completedFuture; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.CoreMatchers.is; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doReturn; @@ -57,40 +57,37 @@ static void teardown() { } @Test - void writeInOneGo() throws InterruptedException { + void writeInOneGo() throws InterruptedException, ExecutionException, TimeoutException { doReturn(completedFuture(DATA.length)) .when(asyncWriter).write(any(), anyInt()); - boolean isSuccess = fileWriter.write(DATA) - .await(30, TimeUnit.SECONDS); + fileWriter.write(DATA) + .get(30, TimeUnit.SECONDS); - assertThat(isSuccess, is(true)); verify(asyncWriter, times(1)).write(any(), anyInt()); } @Test - void writeInTwoPhases() throws InterruptedException { + void writeInTwoPhases() throws InterruptedException, ExecutionException, TimeoutException { doReturn(completedFuture(25), completedFuture(75)) .when(asyncWriter).write(any(), anyInt()); - boolean isSuccess = fileWriter.write(DATA) - .await(30, TimeUnit.SECONDS); + fileWriter.write(DATA) + .get(30, TimeUnit.SECONDS); - assertThat(isSuccess, is(true)); verify(asyncWriter, times(2)).write(any(), anyInt()); } @Test - void writeInFivePhases() throws InterruptedException { + void writeInFivePhases() throws InterruptedException, ExecutionException, TimeoutException { doReturn(completedFuture(10), completedFuture(20), completedFuture(30), completedFuture(15), completedFuture(25)) .when(asyncWriter).write(any(), anyInt()); - boolean isSuccess = fileWriter.write(DATA) - .await(30, TimeUnit.SECONDS); + fileWriter.write(DATA) + .get(30, TimeUnit.SECONDS); - assertThat(isSuccess, is(true)); verify(asyncWriter, times(5)).write(any(), anyInt()); } } From f7443b76765eb9f09ad45c466cf9c4e185ed4320 Mon Sep 17 00:00:00 2001 From: Alva Swanson Date: Thu, 15 Feb 2024 14:41:53 +0100 Subject: [PATCH 3/4] PersistenceFileWrite: Truncate file before writing --- .../persistence/PersistenceFileWriter.java | 7 ++++- .../PersistenceFileWriterTests.java | 30 +++++++++++++++++++ 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java b/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java index 7b96e218f2d..76b0dfcf58c 100644 --- a/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java +++ b/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java @@ -34,7 +34,12 @@ public PersistenceFileWriter(AsyncFileWriter asyncWriter, ExecutorService writeR public CompletableFuture write(byte[] data) { CompletableFuture completableFuture = new CompletableFuture<>(); - scheduleAsyncWrite(data, 0, data.length, completableFuture); + asyncWriter.truncate() + .thenRun(() -> scheduleAsyncWrite(data, 0, data.length, completableFuture)) + .exceptionally(throwable -> { + completableFuture.completeExceptionally(throwable); + return null; + }); return completableFuture; } diff --git a/persistence/src/test/java/bisq/persistence/PersistenceFileWriterTests.java b/persistence/src/test/java/bisq/persistence/PersistenceFileWriterTests.java index 348f4b06a98..4e101bb4c5a 100644 --- a/persistence/src/test/java/bisq/persistence/PersistenceFileWriterTests.java +++ b/persistence/src/test/java/bisq/persistence/PersistenceFileWriterTests.java @@ -17,6 +17,9 @@ package bisq.persistence; +import java.io.IOException; + +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -32,9 +35,13 @@ import org.junit.jupiter.api.extension.ExtendWith; import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -56,8 +63,29 @@ static void teardown() { writeRequestScheduler.shutdownNow(); } + @Test + void failTruncation() throws ExecutionException, InterruptedException, TimeoutException { + var ioException = new IOException("Truncation failed."); + doReturn(failedFuture(ioException)) + .when(asyncWriter).truncate(); + + CountDownLatch exceptionThrownLatch = new CountDownLatch(1); + fileWriter.write(DATA) + .exceptionally(throwable -> { + assertThat(throwable.getCause(), is(ioException)); + exceptionThrownLatch.countDown(); + return null; + }) + .get(30, TimeUnit.SECONDS); + + assertThat(exceptionThrownLatch.getCount(), is(0L)); + verify(asyncWriter, times(1)).truncate(); + verify(asyncWriter, never()).write(any(), anyInt()); + } + @Test void writeInOneGo() throws InterruptedException, ExecutionException, TimeoutException { + doReturn(completedFuture(null)).when(asyncWriter).truncate(); doReturn(completedFuture(DATA.length)) .when(asyncWriter).write(any(), anyInt()); @@ -69,6 +97,7 @@ void writeInOneGo() throws InterruptedException, ExecutionException, TimeoutExce @Test void writeInTwoPhases() throws InterruptedException, ExecutionException, TimeoutException { + doReturn(completedFuture(null)).when(asyncWriter).truncate(); doReturn(completedFuture(25), completedFuture(75)) .when(asyncWriter).write(any(), anyInt()); @@ -80,6 +109,7 @@ void writeInTwoPhases() throws InterruptedException, ExecutionException, Timeout @Test void writeInFivePhases() throws InterruptedException, ExecutionException, TimeoutException { + doReturn(completedFuture(null)).when(asyncWriter).truncate(); doReturn(completedFuture(10), completedFuture(20), completedFuture(30), completedFuture(15), completedFuture(25)) From fd6b446544919e92e7d3892b3fab0fd8bb1228a7 Mon Sep 17 00:00:00 2001 From: Alva Swanson Date: Thu, 15 Feb 2024 14:41:53 +0100 Subject: [PATCH 4/4] Implement AtomicFileWriterIntegrationTests - singleWrite - twoWritesSecondSmaller - twoWriteSecondLarger --- .../AtomicFileWriterIntegrationTests.java | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) create mode 100644 persistence/src/test/java/bisq/persistence/AtomicFileWriterIntegrationTests.java diff --git a/persistence/src/test/java/bisq/persistence/AtomicFileWriterIntegrationTests.java b/persistence/src/test/java/bisq/persistence/AtomicFileWriterIntegrationTests.java new file mode 100644 index 00000000000..44e39fd9946 --- /dev/null +++ b/persistence/src/test/java/bisq/persistence/AtomicFileWriterIntegrationTests.java @@ -0,0 +1,104 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.persistence; + +import java.nio.channels.AsynchronousFileChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +import java.io.IOException; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.mockito.junit.jupiter.MockitoExtension; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.CoreMatchers.is; + +@ExtendWith(MockitoExtension.class) +public class AtomicFileWriterIntegrationTests { + private static ExecutorService executorService; + private Path filePath; + private AtomicFileWriter atomicFileWriter; + + @BeforeAll + static void beforeAll() { + executorService = Executors.newSingleThreadExecutor(); + } + + @AfterAll + static void afterAll() { + executorService.shutdownNow(); + } + + @BeforeEach + void setup(@TempDir Path tempDir) throws IOException { + filePath = tempDir.resolve("file"); + var fileChannel = AsynchronousFileChannel.open(filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); + + AsyncFileChannelWriter asyncFileChannelWriter = new AsyncFileChannelWriter(filePath, fileChannel); + PersistenceFileWriter persistenceFileWriter = new PersistenceFileWriter(asyncFileChannelWriter, executorService); + atomicFileWriter = new AtomicFileWriter(filePath, persistenceFileWriter); + } + + @Test + void singleWrite() throws IOException, ExecutionException, InterruptedException, TimeoutException { + byte[] expectedData = "Hello World!".getBytes(StandardCharsets.UTF_8); + atomicFileWriter.write(expectedData).get(30, TimeUnit.SECONDS); + + byte[] actualData = Files.readAllBytes(filePath); + assertThat(actualData, is(expectedData)); + } + + @Test + void twoWritesSecondSmaller() throws IOException, ExecutionException, InterruptedException, TimeoutException { + byte[] expectedData = "Hello World!".getBytes(StandardCharsets.UTF_8); + atomicFileWriter.write(expectedData).get(30, TimeUnit.SECONDS); + + expectedData = "Bye!".getBytes(StandardCharsets.UTF_8); + atomicFileWriter.write(expectedData).get(30, TimeUnit.SECONDS); + + byte[] actualData = Files.readAllBytes(filePath); + assertThat(actualData, is(expectedData)); + } + + @Test + void twoWriteSecondLarger() throws IOException, ExecutionException, InterruptedException, TimeoutException { + byte[] expectedData = "Hello World!".getBytes(StandardCharsets.UTF_8); + atomicFileWriter.write(expectedData).get(30, TimeUnit.SECONDS); + + expectedData = "Bye! Hello World!".getBytes(StandardCharsets.UTF_8); + atomicFileWriter.write(expectedData).get(30, TimeUnit.SECONDS); + + byte[] actualData = Files.readAllBytes(filePath); + assertThat(actualData, is(expectedData)); + } +}