diff --git a/CHANGES.md b/CHANGES.md index a7b4552da8b..bf62e0edf9a 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -15,8 +15,8 @@ To be released. - Added `IBlockPolicy.BlockAction` property. [[#319], [#367]] - Removed the type parameter of `ActionEvaluation`. [[#319], [#367]] - `ActionEvaluation.Action` became to `IAction` type. [[#319], [#367]] - - `LiteDBStore()` constructor became to have a new option named `flush` and turned on by default. - [[#387], [LiteDB #1268]] + - `LiteDBStore()` constructor became to have a new option named `flush` and + turned on by default. [[#387], [LiteDB #1268]] - `BaseIndex.ContainsKey()` method became `abstract`. [[#390]] - `BlockDownloadState.TotalBlockCount` and `BlockDownloadState.ReceivedBlockCount` became to `Int64` type. [[#396], [#399]] @@ -60,12 +60,17 @@ To be released. own retrieve implementations. [[#390]] - The way `LiteDBStore` stores state references became efficient, but the file-level backward compatibility was also broken. [[#395], [#398]] - - `PreloadAsync` became to report total block download status instead of - chunked download status. [[#396], [#399]] + - `Swarm.PreloadAsync()` method became to report a block downloading + progress with the total number of blocks to download in the entire batch, + instead of the window size of a chunk (i.e., 500). [[#396], [#399]] - `Swarm.PreloadAsync()` became to get the first parameter, `progress`, which accepts `IProgress`. [[#397], [#400]] - `Swarm.PreloadAsync()` became safe from data corruption even if a preloading process suddenly gets shutdown. [[#417]] + - `FileStore` and `LiteDBStore` became to guarantee atomicity of storing + transactions. [[#413]] + - `IStore.PutTransaction()` became to do nothing when it takes + the `Transaction` more than once. [[#413]] ### Bug fixes @@ -110,6 +115,7 @@ To be released. [#398]: https://github.com/planetarium/libplanet/pull/398 [#399]: https://github.com/planetarium/libplanet/pull/399 [#400]: https://github.com/planetarium/libplanet/pull/400 +[#413]: https://github.com/planetarium/libplanet/pull/413 [#414]: https://github.com/planetarium/libplanet/pull/414 [#416]: https://github.com/planetarium/libplanet/pull/416 [#417]: https://github.com/planetarium/libplanet/pull/417 diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index adbfc702247..8541732e11c 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -236,22 +236,20 @@ CancellationToken cancellationToken } [Fact(Timeout = Timeout)] - public async Task CanExchangePeer() + public async Task ExchangePeer() { - BlockChain chain = _blockchains[0]; - var a = new Swarm( - chain, + _blockchains[0], new PrivateKey(), 1, host: IPAddress.Loopback.ToString()); var b = new Swarm( - chain, + _blockchains[1], new PrivateKey(), 1, host: IPAddress.Loopback.ToString()); var c = new Swarm( - chain, + _blockchains[2], new PrivateKey(), 1, host: IPAddress.Loopback.ToString()); @@ -320,26 +318,24 @@ public async Task CanExchangePeer() [Fact(Timeout = Timeout)] public async Task DetectAppProtocolVersion() { - BlockChain chain = _blockchains[0]; - var a = new Swarm( - chain, + _blockchains[0], new PrivateKey(), host: IPAddress.Loopback.ToString(), appProtocolVersion: 2); var b = new Swarm( - chain, + _blockchains[1], new PrivateKey(), host: IPAddress.Loopback.ToString(), appProtocolVersion: 3); var c = new Swarm( - chain, + _blockchains[2], new PrivateKey(), host: IPAddress.Loopback.ToString(), appProtocolVersion: 2); var d = new Swarm( - chain, + new BlockChain(_blockchains[0].Policy, new FileStoreFixture().Store), new PrivateKey(), host: IPAddress.Loopback.ToString(), appProtocolVersion: 3); @@ -381,16 +377,14 @@ void GameHandler(object sender, DifferentProtocolVersionEventArgs e) isCalled = true; } - BlockChain chain = _blockchains[0]; - var a = new Swarm( - chain, + _blockchains[0], new PrivateKey(), host: IPAddress.Loopback.ToString(), appProtocolVersion: 2, differentVersionPeerEncountered: GameHandler); var b = new Swarm( - chain, + _blockchains[1], new PrivateKey(), host: IPAddress.Loopback.ToString(), appProtocolVersion: 3); @@ -558,7 +552,6 @@ public async Task GetTx() Swarm swarmA = _swarms[0]; Swarm swarmB = _swarms[1]; - BlockChain chainA = _blockchains[0]; BlockChain chainB = _blockchains[1]; Transaction tx = Transaction.Create( diff --git a/Libplanet.Tests/Store/StoreTest.cs b/Libplanet.Tests/Store/StoreTest.cs index 1ad769078c3..2adea21ff24 100644 --- a/Libplanet.Tests/Store/StoreTest.cs +++ b/Libplanet.Tests/Store/StoreTest.cs @@ -3,6 +3,7 @@ using System.Collections.Immutable; using System.Linq; using System.Security.Cryptography; +using System.Threading.Tasks; using Libplanet.Action; using Libplanet.Blocks; using Libplanet.Crypto; @@ -542,5 +543,114 @@ public void IndexBlockHashReturnNull() Assert.Equal(1, Fx.Store.CountIndex(Fx.StoreNamespace)); Assert.Null(Fx.Store.IndexBlockHash(Fx.StoreNamespace, 2)); } + + [Fact] + public void TxAtomicity() + { + Transaction MakeTx( + System.Random random, + MD5 md5, + PrivateKey key, + int txNonce + ) + { + byte[] arbitraryBytes = new byte[20]; + random.NextBytes(arbitraryBytes); + byte[] digest = md5.ComputeHash(arbitraryBytes); + var action = new AtomicityTestAction + { + ArbitraryBytes = arbitraryBytes.ToImmutableArray(), + Md5Digest = digest.ToImmutableArray(), + }; + return Transaction.Create( + txNonce, + key, + new[] { action }, + ImmutableHashSet
.Empty, + DateTimeOffset.UtcNow + ); + } + + const int taskCount = 5; + const int txCount = 30; + var md5Hasher = MD5.Create(); + Transaction commonTx = MakeTx( + new System.Random(), + md5Hasher, + new PrivateKey(), + 0 + ); + Task[] tasks = new Task[taskCount]; + for (int i = 0; i < taskCount; i++) + { + var task = new Task(() => + { + PrivateKey key = new PrivateKey(); + var random = new System.Random(); + var md5 = MD5.Create(); + Transaction tx; + for (int j = 0; j < 50; j++) + { + Fx.Store.PutTransaction(commonTx); + } + + for (int j = 0; j < txCount; j++) + { + tx = MakeTx(random, md5, key, j + 1); + Fx.Store.PutTransaction(tx); + } + }); + task.Start(); + tasks[i] = task; + } + + Task.WaitAll(tasks); + + Assert.Equal(1 + (taskCount * txCount), Fx.Store.CountTransactions()); + foreach (TxId txid in Fx.Store.IterateTransactionIds()) + { + var tx = Fx.Store.GetTransaction(txid); + tx.Validate(); + Assert.Single(tx.Actions); + AtomicityTestAction action = tx.Actions[0]; + Assert.Equal( + md5Hasher.ComputeHash(action.ArbitraryBytes.ToBuilder().ToArray()), + action.Md5Digest.ToBuilder().ToArray() + ); + } + } + + private class AtomicityTestAction : IAction + { + public ImmutableArray ArbitraryBytes { get; set; } + + public ImmutableArray Md5Digest { get; set; } + + public IImmutableDictionary PlainValue => + new Dictionary + { + { "bytes", ArbitraryBytes.ToBuilder().ToArray() }, + { "md5", Md5Digest.ToBuilder().ToArray() }, + }.ToImmutableDictionary(); + + public void LoadPlainValue(IImmutableDictionary plainValue) + { + ArbitraryBytes = (plainValue["bytes"] as byte[]).ToImmutableArray(); + Md5Digest = (plainValue["md5"] as byte[]).ToImmutableArray(); + } + + public IAccountStateDelta Execute(IActionContext context) + { + return context.PreviousStates; + } + + public void Render(IActionContext context, IAccountStateDelta nextStates) + { + } + + public void Unrender(IActionContext context, IAccountStateDelta nextStates) + { + } + } } } diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index 6d28332fe51..504aba11ea5 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -1984,11 +1984,11 @@ private void ReceiveMessage(object sender, NetMQSocketEventArgs e) NetMQMessage raw = e.Socket.ReceiveMultipartMessage(); _logger.Verbose( - "The raw message[frame count: {0}] has received.", + "A raw message [frame count: {0}] has received.", raw.FrameCount ); Message message = Message.Parse(raw, reply: false); - _logger.Debug($"The message[{message}] has parsed."); + _logger.Debug("A message has parsed: {0}", message); // it's still async because some method it relies are async yet. Task.Run( @@ -2000,7 +2000,11 @@ private void ReceiveMessage(object sender, NetMQSocketEventArgs e) } catch (Exception exc) { - _logger.Error("Something went wrong during message parsing: {0}", exc); + _logger.Error( + exc, + "Something went wrong during message parsing: {0}", + exc + ); throw; } }, @@ -2008,11 +2012,15 @@ private void ReceiveMessage(object sender, NetMQSocketEventArgs e) } catch (InvalidMessageException ex) { - _logger.Error(ex, "Could not parse NetMQMessage properly; ignore."); + _logger.Error(ex, "Could not parse NetMQMessage properly; ignore: {0}", ex); } catch (Exception ex) { - _logger.Error(ex, "An unexpected exception occured during ReceiveMessage()."); + _logger.Error( + ex, + "An unexpected exception occured during ReceiveMessage(): {0}", + ex + ); } } diff --git a/Libplanet/Store/FileStore.cs b/Libplanet/Store/FileStore.cs index 1fe06512fc3..2063fd0e902 100644 --- a/Libplanet/Store/FileStore.cs +++ b/Libplanet/Store/FileStore.cs @@ -8,6 +8,7 @@ using System.Runtime.Serialization.Formatters.Binary; using System.Security.Cryptography; using System.Text.RegularExpressions; +using System.Threading; using Libplanet.Action; using Libplanet.Blocks; using Libplanet.Serialization; @@ -38,6 +39,8 @@ public class FileStore : BaseStore private readonly string _path; + private readonly ReaderWriterLockSlim _txLock; + public FileStore(string path) { if (File.Exists(path) && !Directory.Exists(path)) @@ -54,6 +57,8 @@ public FileStore(string path) { Directory.CreateDirectory(Path.Combine(_path, dir)); } + + _txLock = new ReaderWriterLockSlim(); } public string GetTransactionPath(TxId txid) @@ -309,12 +314,29 @@ public override bool DeleteBlock(HashDigest blockHash) public override bool DeleteTransaction(TxId txid) { var txFile = new FileInfo(GetTransactionPath(txid)); - if (!txFile.Exists) + _txLock.EnterUpgradeableReadLock(); + try { - return false; + if (!txFile.Exists) + { + return false; + } + + _txLock.EnterWriteLock(); + try + { + txFile.Delete(); + } + finally + { + _txLock.ExitWriteLock(); + } + } + finally + { + _txLock.ExitUpgradeableReadLock(); } - txFile.Delete(); return true; } @@ -484,25 +506,31 @@ public override IEnumerable IterateTransactionIds() RegexOptions.IgnoreCase ); var rootDir = new DirectoryInfo(GetTransactionPath()); - foreach (DirectoryInfo prefix in rootDir.EnumerateDirectories()) + _txLock.EnterReadLock(); + try { - if (!prefixRegex.IsMatch(prefix.Name)) - { - continue; - } - - foreach (FileInfo rest in prefix.EnumerateFiles()) + foreach (DirectoryInfo prefix in rootDir.EnumerateDirectories()) { - if (!restRegex.IsMatch(rest.Name)) + if (!prefixRegex.IsMatch(prefix.Name)) { continue; } - yield return new TxId( - ByteUtil.ParseHex(prefix.Name + rest.Name) - ); + foreach (FileInfo rest in prefix.EnumerateFiles()) + { + if (!restRegex.IsMatch(rest.Name)) + { + continue; + } + + yield return new TxId(ByteUtil.ParseHex(prefix.Name + rest.Name)); + } } } + finally + { + _txLock.ExitReadLock(); + } } /// @@ -528,13 +556,28 @@ public override void PutBlock(Block block) public override void PutTransaction(Transaction tx) { + byte[] txBytes = tx.ToBencodex(true); var txFile = new FileInfo(GetTransactionPath(tx.Id)); - txFile.Directory.Create(); - using (Stream stream = txFile.Open( - FileMode.OpenOrCreate, FileAccess.Write)) + _txLock.EnterUpgradeableReadLock(); + try + { + txFile.Directory.Create(); + _txLock.EnterWriteLock(); + try + { + using (Stream stream = txFile.Open(FileMode.OpenOrCreate, FileAccess.Write)) + { + stream.Write(txBytes, 0, txBytes.Length); + } + } + finally + { + _txLock.ExitWriteLock(); + } + } + finally { - byte[] txBytes = tx.ToBencodex(true); - stream.Write(txBytes, 0, txBytes.Length); + _txLock.ExitUpgradeableReadLock(); } } diff --git a/Libplanet/Store/IStore.cs b/Libplanet/Store/IStore.cs index 5451523a85f..026a3347c6a 100644 --- a/Libplanet/Store/IStore.cs +++ b/Libplanet/Store/IStore.cs @@ -77,6 +77,13 @@ public interface IStore Transaction GetTransaction(TxId txid) where T : IAction, new(); + /// + /// Puts a given to the store. If the same transaction + /// already exists in the store it does nothing. + /// + /// A transaction to put into the store. + /// An type. It should match + /// to 's type parameter. void PutTransaction(Transaction tx) where T : IAction, new(); diff --git a/Libplanet/Store/LiteDBStore.cs b/Libplanet/Store/LiteDBStore.cs index b1caf28ebc7..07575f2ad2b 100644 --- a/Libplanet/Store/LiteDBStore.cs +++ b/Libplanet/Store/LiteDBStore.cs @@ -8,6 +8,7 @@ using System.Runtime.Serialization.Formatters.Binary; using System.Security.Cryptography; using System.Text; +using System.Threading; using Libplanet.Action; using Libplanet.Blocks; using Libplanet.Serialization; @@ -37,6 +38,8 @@ public class LiteDBStore : BaseStore, IDisposable private readonly LiteDatabase _db; + private readonly ReaderWriterLockSlim _txLock; + /// /// Creates a new . /// @@ -86,6 +89,8 @@ public LiteDBStore( _db.Mapper.RegisterType( address => address.ToByteArray(), b => new Address(b.AsBinary)); + + _txLock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion); } private LiteCollection StagedTxIds => @@ -194,50 +199,116 @@ public override IEnumerable IterateStagedTransactionIds(bool toBroadcast) /// public override IEnumerable IterateTransactionIds() { - return _db.FileStorage - .Find(TxIdPrefix) - .Select(file => new TxId(ByteUtil.ParseHex(file.Filename))); + IEnumerable filenames; + _txLock.EnterReadLock(); + try + { + filenames = _db.FileStorage + .Find(TxIdPrefix) + .Select(file => file.Filename) + .ToList(); + } + finally + { + _txLock.ExitReadLock(); + } + + return filenames.Select(filename => new TxId(ByteUtil.ParseHex(filename))); } /// public override Transaction GetTransaction(TxId txid) { - LiteFileInfo file = _db.FileStorage.FindById(TxFileId(txid)); - if (file is null) - { - return null; - } + string fileId = TxFileId(txid); + byte[] bytes; - using (var stream = new MemoryStream()) + _txLock.EnterUpgradeableReadLock(); + try { - DownloadFile(file, stream); - - var bytes = stream.ToArray(); - if (bytes.Length != file.Length || bytes.Length < 1) + LiteFileInfo file = _db.FileStorage.FindById(fileId); + if (file is null) { - _logger.Warning( - "The data file for the transaction {TxId} seems corrupted; " + - "it will be treated nonexistent and removed at all.", - txid - ); - DeleteTransaction(txid); return null; } - return Transaction.FromBencodex(bytes); + using (var stream = new MemoryStream()) + { + DownloadFile(file, stream); + + bytes = stream.ToArray(); + if (bytes.Length != file.Length || bytes.Length < 1) + { + _logger.Warning( + "The data file for the transaction {TxId} seems corrupted; " + + "it will be treated nonexistent and removed at all.", + txid + ); + _txLock.EnterWriteLock(); + try + { + _db.FileStorage.Delete(fileId); + } + finally + { + _txLock.ExitWriteLock(); + } + + return null; + } + } } + finally + { + _txLock.ExitUpgradeableReadLock(); + } + + return Transaction.FromBencodex(bytes); } /// public override void PutTransaction(Transaction tx) { - UploadFile(TxFileId(tx.Id), tx.Id.ToHex(), tx.ToBencodex(true)); + string fileId = TxFileId(tx.Id); + string filename = tx.Id.ToHex(); + byte[] txBytes = tx.ToBencodex(true); + _txLock.EnterUpgradeableReadLock(); + try + { + LiteFileInfo file = _db.FileStorage.FindById(fileId); + if (file is LiteFileInfo) + { + // No-op if already exists. + return; + } + + _txLock.EnterWriteLock(); + try + { + UploadFile(fileId, filename, txBytes); + } + finally + { + _txLock.ExitWriteLock(); + } + } + finally + { + _txLock.ExitUpgradeableReadLock(); + } } /// public override bool DeleteTransaction(TxId txid) { - return _db.FileStorage.Delete(TxFileId(txid)); + _txLock.EnterWriteLock(); + try + { + return _db.FileStorage.Delete(TxFileId(txid)); + } + finally + { + _txLock.ExitWriteLock(); + } } /// @@ -448,7 +519,7 @@ public void Dispose() private string TxFileId(TxId txid) { - return $"tx/{txid.ToHex()}"; + return $"{TxIdPrefix}{txid.ToHex()}"; } private string BlockFileId(HashDigest blockHash)