Skip to content

Commit

Permalink
Merge pull request #846 from longfin/bugfix/slow-preload
Browse files Browse the repository at this point in the history
Parallelizing Swarm<T>.PreloadAsync()
  • Loading branch information
longfin authored Apr 10, 2020
2 parents fb38a16 + 7d7bf04 commit 73ce4f5
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 40 deletions.
3 changes: 3 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ To be released.
simultaneously multiple peers. [[#707], [#798]]
- Improved performance of `Swarm<T>` by preventing unnecessary task
creation. [[#817], [#837]]
- Improved performance of `Swarm<T>.PreloadAsync()` by parallelizing
connections. [[#846]]

### Bug fixes

Expand Down Expand Up @@ -150,6 +152,7 @@ To be released.
[#843]: https://github.com/planetarium/libplanet/issues/843
[#844]: https://github.com/planetarium/libplanet/pull/844
[#845]: https://github.com/planetarium/libplanet/pull/845
[#846]: https://github.com/planetarium/libplanet/pull/846


Version 0.8.0
Expand Down
97 changes: 57 additions & 40 deletions Libplanet/Net/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Net;
using System.Runtime.CompilerServices;
Expand Down Expand Up @@ -461,11 +460,10 @@ public async Task PreloadAsync(
BlockChain.Tip.Hash
);

var peersWithHeightAsync = DialToExistingPeers(dialTimeout, cancellationToken)
var peersWithHeight = (await DialToExistingPeers(dialTimeout, cancellationToken))
.Where(pp => pp.Item2.TipIndex > (initialTip?.Index ?? -1))
.Select(pp => (pp.Item1, pp.Item2.TipIndex));
IList<(BoundPeer, long?)> peersWithHeight =
await peersWithHeightAsync.ToListAsync(cancellationToken);
.Select(pp => (pp.Item1, pp.Item2.TipIndex))
.ToList();

if (!peersWithHeight.Any())
{
Expand Down Expand Up @@ -1062,46 +1060,65 @@ private void BroadcastMessage(Address? except, Message message)
Transport.BroadcastMessage(except, message);
}

private async IAsyncEnumerable<(BoundPeer, Pong)> DialToExistingPeers(
private Task<(BoundPeer, Pong)[]> DialToExistingPeers(
TimeSpan? dialTimeout,
[EnumeratorCancellation] CancellationToken cancellationToken
CancellationToken cancellationToken
)
{
foreach (BoundPeer peer in Peers)
{
Message reply = null;
try
{
reply = await Transport.SendMessageWithReplyAsync(
peer,
new Ping(),
dialTimeout,
cancellationToken
);
}
catch (TimeoutException)
{
_logger.Debug($"TimeoutException occurred during dial to ({peer}).");
}
catch (IOException e)
{
_logger.Error(
e,
$"IOException occurred ({peer})."
);
}
catch (DifferentAppProtocolVersionException e)
{
_logger.Error(
e,
$"Protocol Version is different ({peer}).");
}
IEnumerable<Task<(BoundPeer, Pong)>> tasks = Peers.Select(
peer => Transport.SendMessageWithReplyAsync(
peer, new Ping(), dialTimeout, cancellationToken
).ContinueWith<(BoundPeer, Pong)>(
t =>
{
if (t.IsFaulted || t.IsCanceled || !(t.Result is Pong pong))
{
switch (t.Exception?.InnerException)
{
case TimeoutException te:
_logger.Debug(
$"TimeoutException occurred during dial to ({peer})."
);
break;
case DifferentAppProtocolVersionException dapve:
_logger.Error(
dapve,
$"Protocol Version is different ({peer}).");
break;
case Exception e:
_logger.Error(
e,
$"An unexpected exception occurred during dial to ({peer})."
);
break;
default:
break;
}

// Mark to skip
return (null, null);
}
else
{
return (peer, pong);
}
},
cancellationToken
)
);

if (reply is Pong pong)
return Task.WhenAll(tasks).ContinueWith(
t =>
{
yield return (peer, pong);
}
}
if (t.IsFaulted)
{
throw t.Exception;
}

return t.Result.Where(pair => !(pair.Item1 is null)).ToArray();
},
cancellationToken
);
}

private async IAsyncEnumerable<(long, HashDigest<SHA256>)> GetDemandBlockHashes(
Expand Down

0 comments on commit 73ce4f5

Please sign in to comment.