Skip to content

Commit

Permalink
Add Swarm.PreloadAsync()
Browse files Browse the repository at this point in the history
  • Loading branch information
longfin committed Apr 17, 2019
1 parent b84b302 commit 398642e
Show file tree
Hide file tree
Showing 4 changed files with 134 additions and 16 deletions.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ To be released.
other known peers and synchronizes the blocks if necessary
before propagating/receiving pinpointed recent blocks to prevent inefficient
round-trips. [[#187], [#190]]
- Added `Swarm.PreloadAsync()` that explicitly synchronizes the blocks before
starting and `BlockDownloadState` that represents downloading states.
[[#204]]
- Improved overall read throughput of `BlockChain<T>` while blocks are being
mined by `BlockChain<T>.MineBlock()`.
- Fixed a bug that `TurnClientException` had been thrown by Swarm when a STUN
Expand Down Expand Up @@ -40,6 +43,7 @@ To be released.
[#187]: https://github.com/planetarium/libplanet/issues/187
[#190]: https://github.com/planetarium/libplanet/pull/190
[#193]: https://github.com/planetarium/libplanet/pull/193
[#204]: https://github.com/planetarium/libplanet/issues/204
[#205]: https://github.com/planetarium/libplanet/pull/205


Expand Down
49 changes: 49 additions & 0 deletions Libplanet.Tests/Net/SwarmTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -806,6 +806,55 @@ await Task.WhenAll(
}
}

[Fact]
public async Task Preload()
{
Swarm minerSwarm = _swarms[0];
Swarm receiverSwarm = _swarms[1];

BlockChain<DumbAction> minerChain = _blockchains[0];
BlockChain<DumbAction> receiverChain = _blockchains[1];

foreach (int i in Enumerable.Range(0, 10))
{
minerChain.MineBlock(_fx1.Address1);
}

var actualStates = new List<BlockDownloadState>();
var progress = new Progress<BlockDownloadState>(state =>
{
actualStates.Add(state);
});

try
{
await StartAsync(minerSwarm, minerChain);
receiverSwarm.Add(minerSwarm.AsPeer);

await receiverSwarm.PreloadAsync(receiverChain, progress);

Assert.Equal(minerChain.AsEnumerable(), receiverChain.AsEnumerable());

IEnumerable<BlockDownloadState> expectedStates = minerChain.Select((b, i) =>
{
return new BlockDownloadState()
{
ReceivedBlockHash = b.Hash,
TotalBlockCount = 10,
ReceivedBlockCount = i + 1,
};
});

Assert.Equal(expectedStates, actualStates);
}
finally
{
await Task.WhenAll(
minerSwarm.StopAsync(),
receiverSwarm.StopAsync());
}
}

private async Task<Task> StartAsync<T>(
Swarm swarm,
BlockChain<T> blockChain,
Expand Down
29 changes: 29 additions & 0 deletions Libplanet/Net/BlockDownloadState.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System.Security.Cryptography;

namespace Libplanet.Net
{
/// <summary>
/// A container that indicates the progress of a block download.
/// </summary>
[Uno.GeneratedEquality]
public partial class BlockDownloadState
{
/// <summary>
/// Total number of blocks to receive in the current batch.
/// </summary>
[Uno.EqualityHash]
public int TotalBlockCount { get; internal set; }

/// <summary>
/// The number of currently received blocks.
/// </summary>
[Uno.EqualityHash]
public int ReceivedBlockCount { get; internal set; }

/// <summary>
/// The hash digest of the block just received.
/// </summary>
[Uno.EqualityHash]
public HashDigest<SHA256> ReceivedBlockHash { get; internal set; }
}
}
68 changes: 52 additions & 16 deletions Libplanet/Net/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -478,13 +478,9 @@ public async Task StartAsync<T>(
using (await _runningMutex.LockAsync())
{
Running = true;
IAsyncEnumerable<(Peer, long?)> peersWithLength =
DialToExistingPeers(cancellationToken).Select(
pp => (pp.Item1, pp.Item2.TipIndex));
await SyncBehindsBlocksFromPeersAsync(
await PreloadAsync(
blockChain,
peersWithLength,
cancellationToken);
cancellationToken: cancellationToken);
}

var tasks = new List<Task>
Expand Down Expand Up @@ -545,6 +541,40 @@ await BroadcastMessage(
cancellationToken);
}

/// <summary>
/// Synchronizes the blocks with other registerd <see cref="Peer"/>s.
/// </summary>
/// <param name="blockChain">
/// A <see cref="BlockChain{T}"/> instance to synchronize.
/// </param>
/// <param name="progress">
/// An instance that receives progress updates for block downloads.
/// </param>
/// <param name="cancellationToken">
/// A cancellation token used to propagate notification that this
/// operation should be canceled.
/// </param>
/// <typeparam name="T">An <see cref="IAction"/> type. It should match
/// to <see cref="BlockChain{T}"/>'s type parameter.</typeparam>
/// <returns>
/// No object or value is returned by this method when it completes.
/// </returns>
public async Task PreloadAsync<T>(
BlockChain<T> blockChain,
IProgress<BlockDownloadState> progress = null,
CancellationToken cancellationToken = default(CancellationToken))
where T : IAction, new()
{
IAsyncEnumerable<(Peer, long?)> peersWithLength =
DialToExistingPeers(cancellationToken).Select(
pp => (pp.Item1, pp.Item2.TipIndex));
await SyncBehindsBlocksFromPeersAsync(
blockChain,
peersWithLength,
progress,
cancellationToken);
}

internal async Task<IEnumerable<HashDigest<SHA256>>>
GetBlockHashesAsync(
Peer peer,
Expand All @@ -553,8 +583,6 @@ internal async Task<IEnumerable<HashDigest<SHA256>>>
CancellationToken token = default(CancellationToken)
)
{
CheckStarted();

if (!_peers.ContainsKey(peer))
{
throw new PeerNotFoundException(
Expand Down Expand Up @@ -589,8 +617,6 @@ internal IAsyncEnumerable<Block<T>> GetBlocksAsync<T>(
CancellationToken token = default(CancellationToken))
where T : IAction, new()
{
CheckStarted();

if (!_peers.ContainsKey(peer))
{
throw new PeerNotFoundException(
Expand Down Expand Up @@ -639,8 +665,6 @@ internal IAsyncEnumerable<Transaction<T>> GetTxsAsync<T>(
CancellationToken cancellationToken = default(CancellationToken))
where T : IAction, new()
{
CheckStarted();

if (!_peers.ContainsKey(peer))
{
throw new PeerNotFoundException(
Expand Down Expand Up @@ -771,6 +795,7 @@ await yield.ReturnAsync(
private async Task SyncBehindsBlocksFromPeersAsync<T>(
BlockChain<T> blockChain,
IAsyncEnumerable<(Peer, long?)> peersWithLength,
IProgress<BlockDownloadState> progress,
CancellationToken cancellationToken)
where T : IAction, new()
{
Expand All @@ -789,6 +814,7 @@ await peersWithLength.AggregateAsync(
blockChain,
longestPeerWithLength?.Item1,
null,
progress,
cancellationToken);
if (!synced.Id.Equals(blockChain.Id))
{
Expand Down Expand Up @@ -999,6 +1025,7 @@ private async Task<BlockChain<T>> SyncPreviousBlocksAsync<T>(
BlockChain<T> blockChain,
Peer peer,
HashDigest<SHA256>? stop,
IProgress<BlockDownloadState> progress,
CancellationToken cancellationToken)
where T : IAction, new()
{
Expand Down Expand Up @@ -1051,7 +1078,7 @@ await GetBlockHashesAsync(
try
{
await FillBlocksAsync(
peer, synced, stop, cancellationToken);
peer, synced, stop, progress, cancellationToken);
break;
}
catch (Exception e)
Expand Down Expand Up @@ -1094,6 +1121,7 @@ CancellationToken cancellationToken
blockChain,
peer,
oldest.PreviousHash,
null,
cancellationToken);
_logger.Debug("Filled up. trying to concatenation...");

Expand Down Expand Up @@ -1124,6 +1152,7 @@ private async Task FillBlocksAsync<T>(
Peer peer,
BlockChain<T> blockChain,
HashDigest<SHA256>? stop,
IProgress<BlockDownloadState> progress,
CancellationToken cancellationToken)
where T : IAction, new()
{
Expand All @@ -1144,8 +1173,10 @@ await GetBlockHashesAsync(
break;
}

int hashCount = hashes.Count();
int received = 0;
_logger.Debug(
$"Required hashes (count: {hashes.Count()}). " +
$"Required hashes (count: {hashCount}). " +
$"(tip: {blockChain.Tip?.Hash})"
);

Expand All @@ -1157,6 +1188,13 @@ await GetBlocksAsync<T>(
{
_logger.Debug($"Trying to append block[{block.Hash}]...");
blockChain.Append(block);
received++;
progress?.Report(new BlockDownloadState()
{
TotalBlockCount = hashCount,
ReceivedBlockCount = received,
ReceivedBlockHash = block.Hash,
});
_logger.Debug($"Block[{block.Hash}] is appended.");
});
}
Expand Down Expand Up @@ -1445,8 +1483,6 @@ private async Task<Pong> DialAsync(
CancellationToken cancellationToken
)
{
CheckStarted();

dealer.Connect(address);

_logger.Debug($"Trying to Ping to [{address}]...");
Expand Down

0 comments on commit 398642e

Please sign in to comment.