From 398642eefd76327f9b84117410ccd9f567e187b0 Mon Sep 17 00:00:00 2001 From: Swen Mun Date: Wed, 17 Apr 2019 14:21:11 +0900 Subject: [PATCH] Add Swarm.PreloadAsync() closes #204 --- CHANGES.md | 4 ++ Libplanet.Tests/Net/SwarmTest.cs | 49 +++++++++++++++++++++ Libplanet/Net/BlockDownloadState.cs | 29 ++++++++++++ Libplanet/Net/Swarm.cs | 68 ++++++++++++++++++++++------- 4 files changed, 134 insertions(+), 16 deletions(-) create mode 100644 Libplanet/Net/BlockDownloadState.cs diff --git a/CHANGES.md b/CHANGES.md index cd4f4251257..d177b4467c4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -13,6 +13,9 @@ To be released. other known peers and synchronizes the blocks if necessary before propagating/receiving pinpointed recent blocks to prevent inefficient round-trips. [[#187], [#190]] + - Added `Swarm.PreloadAsync()` that explicitly synchronizes the blocks before + starting and `BlockDownloadState` that represents downloading states. + [[#204]] - Improved overall read throughput of `BlockChain` while blocks are being mined by `BlockChain.MineBlock()`. - Fixed a bug that `TurnClientException` had been thrown by Swarm when a STUN @@ -40,6 +43,7 @@ To be released. [#187]: https://github.com/planetarium/libplanet/issues/187 [#190]: https://github.com/planetarium/libplanet/pull/190 [#193]: https://github.com/planetarium/libplanet/pull/193 +[#204]: https://github.com/planetarium/libplanet/issues/204 [#205]: https://github.com/planetarium/libplanet/pull/205 diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index c13e8895e95..c3efb5c3739 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -806,6 +806,55 @@ await Task.WhenAll( } } + [Fact] + public async Task Preload() + { + Swarm minerSwarm = _swarms[0]; + Swarm receiverSwarm = _swarms[1]; + + BlockChain minerChain = _blockchains[0]; + BlockChain receiverChain = _blockchains[1]; + + foreach (int i in Enumerable.Range(0, 10)) + { + minerChain.MineBlock(_fx1.Address1); + } + + var actualStates = new List(); + var progress = new Progress(state => + { + actualStates.Add(state); + }); + + try + { + await StartAsync(minerSwarm, minerChain); + receiverSwarm.Add(minerSwarm.AsPeer); + + await receiverSwarm.PreloadAsync(receiverChain, progress); + + Assert.Equal(minerChain.AsEnumerable(), receiverChain.AsEnumerable()); + + IEnumerable expectedStates = minerChain.Select((b, i) => + { + return new BlockDownloadState() + { + ReceivedBlockHash = b.Hash, + TotalBlockCount = 10, + ReceivedBlockCount = i + 1, + }; + }); + + Assert.Equal(expectedStates, actualStates); + } + finally + { + await Task.WhenAll( + minerSwarm.StopAsync(), + receiverSwarm.StopAsync()); + } + } + private async Task StartAsync( Swarm swarm, BlockChain blockChain, diff --git a/Libplanet/Net/BlockDownloadState.cs b/Libplanet/Net/BlockDownloadState.cs new file mode 100644 index 00000000000..ab35146f8dd --- /dev/null +++ b/Libplanet/Net/BlockDownloadState.cs @@ -0,0 +1,29 @@ +using System.Security.Cryptography; + +namespace Libplanet.Net +{ + /// + /// A container that indicates the progress of a block download. + /// + [Uno.GeneratedEquality] + public partial class BlockDownloadState + { + /// + /// Total number of blocks to receive in the current batch. + /// + [Uno.EqualityHash] + public int TotalBlockCount { get; internal set; } + + /// + /// The number of currently received blocks. + /// + [Uno.EqualityHash] + public int ReceivedBlockCount { get; internal set; } + + /// + /// The hash digest of the block just received. + /// + [Uno.EqualityHash] + public HashDigest ReceivedBlockHash { get; internal set; } + } +} diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index 04ce292b846..6c09b8ccded 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -478,13 +478,9 @@ public async Task StartAsync( using (await _runningMutex.LockAsync()) { Running = true; - IAsyncEnumerable<(Peer, long?)> peersWithLength = - DialToExistingPeers(cancellationToken).Select( - pp => (pp.Item1, pp.Item2.TipIndex)); - await SyncBehindsBlocksFromPeersAsync( + await PreloadAsync( blockChain, - peersWithLength, - cancellationToken); + cancellationToken: cancellationToken); } var tasks = new List @@ -545,6 +541,40 @@ await BroadcastMessage( cancellationToken); } + /// + /// Synchronizes the blocks with other registerd s. + /// + /// + /// A instance to synchronize. + /// + /// + /// An instance that receives progress updates for block downloads. + /// + /// + /// A cancellation token used to propagate notification that this + /// operation should be canceled. + /// + /// An type. It should match + /// to 's type parameter. + /// + /// No object or value is returned by this method when it completes. + /// + public async Task PreloadAsync( + BlockChain blockChain, + IProgress progress = null, + CancellationToken cancellationToken = default(CancellationToken)) + where T : IAction, new() + { + IAsyncEnumerable<(Peer, long?)> peersWithLength = + DialToExistingPeers(cancellationToken).Select( + pp => (pp.Item1, pp.Item2.TipIndex)); + await SyncBehindsBlocksFromPeersAsync( + blockChain, + peersWithLength, + progress, + cancellationToken); + } + internal async Task>> GetBlockHashesAsync( Peer peer, @@ -553,8 +583,6 @@ internal async Task>> CancellationToken token = default(CancellationToken) ) { - CheckStarted(); - if (!_peers.ContainsKey(peer)) { throw new PeerNotFoundException( @@ -589,8 +617,6 @@ internal IAsyncEnumerable> GetBlocksAsync( CancellationToken token = default(CancellationToken)) where T : IAction, new() { - CheckStarted(); - if (!_peers.ContainsKey(peer)) { throw new PeerNotFoundException( @@ -639,8 +665,6 @@ internal IAsyncEnumerable> GetTxsAsync( CancellationToken cancellationToken = default(CancellationToken)) where T : IAction, new() { - CheckStarted(); - if (!_peers.ContainsKey(peer)) { throw new PeerNotFoundException( @@ -771,6 +795,7 @@ await yield.ReturnAsync( private async Task SyncBehindsBlocksFromPeersAsync( BlockChain blockChain, IAsyncEnumerable<(Peer, long?)> peersWithLength, + IProgress progress, CancellationToken cancellationToken) where T : IAction, new() { @@ -789,6 +814,7 @@ await peersWithLength.AggregateAsync( blockChain, longestPeerWithLength?.Item1, null, + progress, cancellationToken); if (!synced.Id.Equals(blockChain.Id)) { @@ -999,6 +1025,7 @@ private async Task> SyncPreviousBlocksAsync( BlockChain blockChain, Peer peer, HashDigest? stop, + IProgress progress, CancellationToken cancellationToken) where T : IAction, new() { @@ -1051,7 +1078,7 @@ await GetBlockHashesAsync( try { await FillBlocksAsync( - peer, synced, stop, cancellationToken); + peer, synced, stop, progress, cancellationToken); break; } catch (Exception e) @@ -1094,6 +1121,7 @@ CancellationToken cancellationToken blockChain, peer, oldest.PreviousHash, + null, cancellationToken); _logger.Debug("Filled up. trying to concatenation..."); @@ -1124,6 +1152,7 @@ private async Task FillBlocksAsync( Peer peer, BlockChain blockChain, HashDigest? stop, + IProgress progress, CancellationToken cancellationToken) where T : IAction, new() { @@ -1144,8 +1173,10 @@ await GetBlockHashesAsync( break; } + int hashCount = hashes.Count(); + int received = 0; _logger.Debug( - $"Required hashes (count: {hashes.Count()}). " + + $"Required hashes (count: {hashCount}). " + $"(tip: {blockChain.Tip?.Hash})" ); @@ -1157,6 +1188,13 @@ await GetBlocksAsync( { _logger.Debug($"Trying to append block[{block.Hash}]..."); blockChain.Append(block); + received++; + progress?.Report(new BlockDownloadState() + { + TotalBlockCount = hashCount, + ReceivedBlockCount = received, + ReceivedBlockHash = block.Hash, + }); _logger.Debug($"Block[{block.Hash}] is appended."); }); } @@ -1445,8 +1483,6 @@ private async Task DialAsync( CancellationToken cancellationToken ) { - CheckStarted(); - dealer.Connect(address); _logger.Debug($"Trying to Ping to [{address}]...");