From f483badd2e144e9fb26845bec4f6e504178a2c20 Mon Sep 17 00:00:00 2001 From: Alva Swanson Date: Wed, 7 Feb 2024 14:53:16 +0100 Subject: [PATCH 1/3] Add getFilePath() to AsyncFileWriter Interface --- .../java/bisq/persistence/AsyncFileChannelWriter.java | 8 +++++++- .../src/main/java/bisq/persistence/AsyncFileWriter.java | 4 ++++ .../bisq/persistence/AsyncFileChannelWriterTests.java | 2 +- 3 files changed, 12 insertions(+), 2 deletions(-) diff --git a/persistence/src/main/java/bisq/persistence/AsyncFileChannelWriter.java b/persistence/src/main/java/bisq/persistence/AsyncFileChannelWriter.java index 7e1f8ed086a..1f9681e6dbf 100644 --- a/persistence/src/main/java/bisq/persistence/AsyncFileChannelWriter.java +++ b/persistence/src/main/java/bisq/persistence/AsyncFileChannelWriter.java @@ -20,13 +20,19 @@ import java.nio.ByteBuffer; import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; +import java.nio.file.Path; import java.util.concurrent.CompletableFuture; +import lombok.Getter; + public class AsyncFileChannelWriter implements AsyncFileWriter { + @Getter + private final Path filePath; private final AsynchronousFileChannel fileChannel; - public AsyncFileChannelWriter(AsynchronousFileChannel fileChannel) { + public AsyncFileChannelWriter(Path filePath, AsynchronousFileChannel fileChannel) { + this.filePath = filePath; this.fileChannel = fileChannel; } diff --git a/persistence/src/main/java/bisq/persistence/AsyncFileWriter.java b/persistence/src/main/java/bisq/persistence/AsyncFileWriter.java index 07493f7fa6a..9cdfc60c4cb 100644 --- a/persistence/src/main/java/bisq/persistence/AsyncFileWriter.java +++ b/persistence/src/main/java/bisq/persistence/AsyncFileWriter.java @@ -17,8 +17,12 @@ package bisq.persistence; +import java.nio.file.Path; + import java.util.concurrent.CompletableFuture; public interface AsyncFileWriter { CompletableFuture write(byte[] data, int offset); + + Path getFilePath(); } diff --git a/persistence/src/test/java/bisq/persistence/AsyncFileChannelWriterTests.java b/persistence/src/test/java/bisq/persistence/AsyncFileChannelWriterTests.java index 16b356cc2de..0152ad7e9f3 100644 --- a/persistence/src/test/java/bisq/persistence/AsyncFileChannelWriterTests.java +++ b/persistence/src/test/java/bisq/persistence/AsyncFileChannelWriterTests.java @@ -47,7 +47,7 @@ public class AsyncFileChannelWriterTests { void setup(@TempDir Path tempDir) throws IOException { filePath = tempDir.resolve("file"); fileChannel = AsynchronousFileChannel.open(filePath, StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE); - asyncFileChannelWriter = new AsyncFileChannelWriter(fileChannel); + asyncFileChannelWriter = new AsyncFileChannelWriter(filePath, fileChannel); } @Test From db5fb3b2fd23a39311ae42653852f5c267d1c996 Mon Sep 17 00:00:00 2001 From: Alva Swanson Date: Wed, 7 Feb 2024 14:53:16 +0100 Subject: [PATCH 2/3] Add getFilePath() to PersistenceFileWriter --- .../main/java/bisq/persistence/PersistenceFileWriter.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java b/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java index c9416694e00..10c0e45b2ab 100644 --- a/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java +++ b/persistence/src/main/java/bisq/persistence/PersistenceFileWriter.java @@ -17,6 +17,8 @@ package bisq.persistence; +import java.nio.file.Path; + import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; @@ -36,6 +38,10 @@ public CountDownLatch write(byte[] data) { return writeFinished; } + public Path getFilePath() { + return asyncWriter.getFilePath(); + } + private void scheduleAsyncWrite(byte[] data, int offset, int size, CountDownLatch writeFinished) { asyncWriter.write(data, offset) .thenAcceptAsync(writeUntilEndAsync(data, offset, size, writeFinished), writeRequestScheduler); From 7f06107f4d4559fb3b45921521b2aab147e87ca4 Mon Sep 17 00:00:00 2001 From: Alva Swanson Date: Wed, 7 Feb 2024 14:53:16 +0100 Subject: [PATCH 3/3] Implement AtomicFileWriter First, the AtomicFileWriter writes data to a rolling file and before swapping the rolling file with the active file. This makes all file writes atomic and let's Bisq recover from crashes. --- .../AtomicFileWriteFailedException.java | 24 ++++++ .../bisq/persistence/AtomicFileWriter.java | 64 ++++++++++++++ .../persistence/AtomicFileWriterTests.java | 84 +++++++++++++++++++ 3 files changed, 172 insertions(+) create mode 100644 persistence/src/main/java/bisq/persistence/AtomicFileWriteFailedException.java create mode 100644 persistence/src/main/java/bisq/persistence/AtomicFileWriter.java create mode 100644 persistence/src/test/java/bisq/persistence/AtomicFileWriterTests.java diff --git a/persistence/src/main/java/bisq/persistence/AtomicFileWriteFailedException.java b/persistence/src/main/java/bisq/persistence/AtomicFileWriteFailedException.java new file mode 100644 index 00000000000..e0457c91f1c --- /dev/null +++ b/persistence/src/main/java/bisq/persistence/AtomicFileWriteFailedException.java @@ -0,0 +1,24 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.persistence; + +public class AtomicFileWriteFailedException extends RuntimeException { + public AtomicFileWriteFailedException(String message) { + super(message); + } +} diff --git a/persistence/src/main/java/bisq/persistence/AtomicFileWriter.java b/persistence/src/main/java/bisq/persistence/AtomicFileWriter.java new file mode 100644 index 00000000000..3c043420adb --- /dev/null +++ b/persistence/src/main/java/bisq/persistence/AtomicFileWriter.java @@ -0,0 +1,64 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.persistence; + +import java.nio.file.Path; + +import java.io.File; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AtomicFileWriter { + private final PersistenceFileWriter rollingFileWriter; + private File activeFile; + private File rollingFile; + + + public AtomicFileWriter(Path destinationPath, + PersistenceFileWriter rollingFileWriter) { + this.rollingFileWriter = rollingFileWriter; + activeFile = destinationPath.toFile(); + rollingFile = rollingFileWriter.getFilePath().toFile(); + } + + public synchronized void write(byte[] data) { + try { + CountDownLatch countDownLatch = rollingFileWriter.write(data); + boolean isSuccess = countDownLatch.await(45, TimeUnit.SECONDS); + if (!isSuccess) { + throw new AtomicFileWriteFailedException("Async atomic file write timeout triggered after 45 seconds."); + } + + isSuccess = rollingFile.renameTo(activeFile); + if (!isSuccess) { + throw new AtomicFileWriteFailedException("Couldn't rename rolling file to active file."); + } + + File tmpFile = activeFile; + activeFile = rollingFile; + rollingFile = tmpFile; + + } catch (InterruptedException e) { + log.error("AtomicFileWriter got interrupted during write.", e); + } + } +} diff --git a/persistence/src/test/java/bisq/persistence/AtomicFileWriterTests.java b/persistence/src/test/java/bisq/persistence/AtomicFileWriterTests.java new file mode 100644 index 00000000000..d6610b8b3a5 --- /dev/null +++ b/persistence/src/test/java/bisq/persistence/AtomicFileWriterTests.java @@ -0,0 +1,84 @@ +/* + * This file is part of Bisq. + * + * Bisq is free software: you can redistribute it and/or modify it + * under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or (at + * your option) any later version. + * + * Bisq is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public + * License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with Bisq. If not, see . + */ + +package bisq.persistence; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; + +import java.io.File; + +import java.util.concurrent.CountDownLatch; + +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +@ExtendWith(MockitoExtension.class) +public class AtomicFileWriterTests { + private static final byte[] DATA = "Hello World!".getBytes(StandardCharsets.UTF_8); + @Mock + private PersistenceFileWriter persistenceFileWriter; + @Mock + private File rollingFile = mock(File.class); + @Mock + private CountDownLatch countDownLatch; + private AtomicFileWriter atomicFileWriter; + + @BeforeEach + void setup(@TempDir Path tempDir, @Mock Path rollingFilePath) { + doReturn(countDownLatch).when(persistenceFileWriter).write(any()); + doReturn(rollingFile).when(rollingFilePath).toFile(); + doReturn(rollingFilePath).when(persistenceFileWriter).getFilePath(); + + var file = tempDir.resolve("my_file"); + atomicFileWriter = new AtomicFileWriter(file, persistenceFileWriter); + } + + @Test + void triggerFileWriteTimeout() throws InterruptedException { + doReturn(false).when(countDownLatch).await(anyLong(), any()); + assertThrows(AtomicFileWriteFailedException.class, + () -> atomicFileWriter.write(DATA)); + } + + @Test + void renameFailure() throws InterruptedException { + doReturn(true).when(countDownLatch).await(anyLong(), any()); + doReturn(false).when(rollingFile).renameTo(any()); + + assertThrows(AtomicFileWriteFailedException.class, + () -> atomicFileWriter.write(DATA)); + } + + @Test + void write() throws InterruptedException { + doReturn(true).when(countDownLatch).await(anyLong(), any()); + doReturn(true).when(rollingFile).renameTo(any()); + atomicFileWriter.write(DATA); + } +}