Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make LiteDBStore & FileStore to atomically store transactions #413

Merged
merged 5 commits into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ To be released.
- Added `IBlockPolicy<T>.BlockAction` property. [[#319], [#367]]
- Removed the type parameter of `ActionEvaluation`. [[#319], [#367]]
- `ActionEvaluation.Action` became to `IAction` type. [[#319], [#367]]
- `LiteDBStore()` constructor became to have a new option named `flush` and turned on by default.
[[#387], [LiteDB #1268]]
- `LiteDBStore()` constructor became to have a new option named `flush` and
turned on by default. [[#387], [LiteDB #1268]]
- `BaseIndex.ContainsKey()` method became `abstract`. [[#390]]
- `BlockDownloadState.TotalBlockCount` and `BlockDownloadState.ReceivedBlockCount`
became to `Int64` type. [[#396], [#399]]
Expand Down Expand Up @@ -60,12 +60,17 @@ To be released.
own retrieve implementations. [[#390]]
- The way `LiteDBStore` stores state references became efficient,
but the file-level backward compatibility was also broken. [[#395], [#398]]
- `PreloadAsync` became to report total block download status instead of
chunked download status. [[#396], [#399]]
- `Swarm<T>.PreloadAsync()` method became to report a block downloading
progress with the total number of blocks to download in the entire batch,
instead of the window size of a chunk (i.e., 500). [[#396], [#399]]
- `Swarm<T>.PreloadAsync()` became to get the first parameter, `progress`,
which accepts `IProgress<PreloadState>`. [[#397], [#400]]
- `Swarm<T>.PreloadAsync()` became safe from data corruption even
if a preloading process suddenly gets shutdown. [[#417]]
- `FileStore` and `LiteDBStore` became to guarantee atomicity of storing
transactions. [[#413]]
- `IStore.PutTransaction<T>()` became to do nothing when it takes
the `Transaction<T>` more than once. [[#413]]

### Bug fixes

Expand Down Expand Up @@ -110,6 +115,7 @@ To be released.
[#398]: https://github.com/planetarium/libplanet/pull/398
[#399]: https://github.com/planetarium/libplanet/pull/399
[#400]: https://github.com/planetarium/libplanet/pull/400
[#413]: https://github.com/planetarium/libplanet/pull/413
[#414]: https://github.com/planetarium/libplanet/pull/414
[#416]: https://github.com/planetarium/libplanet/pull/416
[#417]: https://github.com/planetarium/libplanet/pull/417
Expand Down
27 changes: 10 additions & 17 deletions Libplanet.Tests/Net/SwarmTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,22 +236,20 @@ CancellationToken cancellationToken
}

[Fact(Timeout = Timeout)]
public async Task CanExchangePeer()
public async Task ExchangePeer()
{
BlockChain<DumbAction> chain = _blockchains[0];

var a = new Swarm<DumbAction>(
chain,
_blockchains[0],
new PrivateKey(),
1,
host: IPAddress.Loopback.ToString());
var b = new Swarm<DumbAction>(
chain,
_blockchains[1],
new PrivateKey(),
1,
host: IPAddress.Loopback.ToString());
var c = new Swarm<DumbAction>(
chain,
_blockchains[2],
new PrivateKey(),
1,
host: IPAddress.Loopback.ToString());
Expand Down Expand Up @@ -320,26 +318,24 @@ public async Task CanExchangePeer()
[Fact(Timeout = Timeout)]
public async Task DetectAppProtocolVersion()
{
BlockChain<DumbAction> chain = _blockchains[0];

var a = new Swarm<DumbAction>(
chain,
_blockchains[0],
new PrivateKey(),
host: IPAddress.Loopback.ToString(),
appProtocolVersion: 2);
var b = new Swarm<DumbAction>(
chain,
_blockchains[1],
new PrivateKey(),
host: IPAddress.Loopback.ToString(),
appProtocolVersion: 3);

var c = new Swarm<DumbAction>(
chain,
_blockchains[2],
new PrivateKey(),
host: IPAddress.Loopback.ToString(),
appProtocolVersion: 2);
var d = new Swarm<DumbAction>(
chain,
new BlockChain<DumbAction>(_blockchains[0].Policy, new FileStoreFixture().Store),
new PrivateKey(),
host: IPAddress.Loopback.ToString(),
appProtocolVersion: 3);
Expand Down Expand Up @@ -381,16 +377,14 @@ void GameHandler(object sender, DifferentProtocolVersionEventArgs e)
isCalled = true;
}

BlockChain<DumbAction> chain = _blockchains[0];

var a = new Swarm<DumbAction>(
chain,
_blockchains[0],
new PrivateKey(),
host: IPAddress.Loopback.ToString(),
appProtocolVersion: 2,
differentVersionPeerEncountered: GameHandler);
var b = new Swarm<DumbAction>(
chain,
_blockchains[1],
new PrivateKey(),
host: IPAddress.Loopback.ToString(),
appProtocolVersion: 3);
Expand Down Expand Up @@ -558,7 +552,6 @@ public async Task GetTx()
Swarm<DumbAction> swarmA = _swarms[0];
Swarm<DumbAction> swarmB = _swarms[1];

BlockChain<DumbAction> chainA = _blockchains[0];
BlockChain<DumbAction> chainB = _blockchains[1];

Transaction<DumbAction> tx = Transaction<DumbAction>.Create(
Expand Down
110 changes: 110 additions & 0 deletions Libplanet.Tests/Store/StoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Immutable;
using System.Linq;
using System.Security.Cryptography;
using System.Threading.Tasks;
using Libplanet.Action;
using Libplanet.Blocks;
using Libplanet.Crypto;
Expand Down Expand Up @@ -542,5 +543,114 @@ public void IndexBlockHashReturnNull()
Assert.Equal(1, Fx.Store.CountIndex(Fx.StoreNamespace));
Assert.Null(Fx.Store.IndexBlockHash(Fx.StoreNamespace, 2));
}

[Fact]
public void TxAtomicity()
{
Transaction<AtomicityTestAction> MakeTx(
System.Random random,
MD5 md5,
PrivateKey key,
int txNonce
)
{
byte[] arbitraryBytes = new byte[20];
random.NextBytes(arbitraryBytes);
byte[] digest = md5.ComputeHash(arbitraryBytes);
var action = new AtomicityTestAction
{
ArbitraryBytes = arbitraryBytes.ToImmutableArray(),
Md5Digest = digest.ToImmutableArray(),
};
return Transaction<AtomicityTestAction>.Create(
txNonce,
key,
new[] { action },
ImmutableHashSet<Address>.Empty,
DateTimeOffset.UtcNow
);
}

const int taskCount = 5;
const int txCount = 30;
var md5Hasher = MD5.Create();
Transaction<AtomicityTestAction> commonTx = MakeTx(
new System.Random(),
md5Hasher,
new PrivateKey(),
0
);
Task[] tasks = new Task[taskCount];
for (int i = 0; i < taskCount; i++)
{
var task = new Task(() =>
{
PrivateKey key = new PrivateKey();
var random = new System.Random();
var md5 = MD5.Create();
Transaction<AtomicityTestAction> tx;
for (int j = 0; j < 50; j++)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be txCount too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not necessarily; it's arbitrary.

{
Fx.Store.PutTransaction(commonTx);
}

for (int j = 0; j < txCount; j++)
{
tx = MakeTx(random, md5, key, j + 1);
Fx.Store.PutTransaction(tx);
}
});
task.Start();
tasks[i] = task;
}

Task.WaitAll(tasks);

Assert.Equal(1 + (taskCount * txCount), Fx.Store.CountTransactions());
foreach (TxId txid in Fx.Store.IterateTransactionIds())
{
var tx = Fx.Store.GetTransaction<AtomicityTestAction>(txid);
tx.Validate();
Assert.Single(tx.Actions);
AtomicityTestAction action = tx.Actions[0];
Assert.Equal(
md5Hasher.ComputeHash(action.ArbitraryBytes.ToBuilder().ToArray()),
action.Md5Digest.ToBuilder().ToArray()
);
}
}

private class AtomicityTestAction : IAction
{
public ImmutableArray<byte> ArbitraryBytes { get; set; }

public ImmutableArray<byte> Md5Digest { get; set; }

public IImmutableDictionary<string, object> PlainValue =>
new Dictionary<string, object>
{
{ "bytes", ArbitraryBytes.ToBuilder().ToArray() },
{ "md5", Md5Digest.ToBuilder().ToArray() },
}.ToImmutableDictionary();

public void LoadPlainValue(IImmutableDictionary<string, object> plainValue)
{
ArbitraryBytes = (plainValue["bytes"] as byte[]).ToImmutableArray();
Md5Digest = (plainValue["md5"] as byte[]).ToImmutableArray();
}

public IAccountStateDelta Execute(IActionContext context)
{
return context.PreviousStates;
}

public void Render(IActionContext context, IAccountStateDelta nextStates)
{
}

public void Unrender(IActionContext context, IAccountStateDelta nextStates)
{
}
}
}
}
18 changes: 13 additions & 5 deletions Libplanet/Net/Swarm.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1984,11 +1984,11 @@ private void ReceiveMessage(object sender, NetMQSocketEventArgs e)
NetMQMessage raw = e.Socket.ReceiveMultipartMessage();

_logger.Verbose(
"The raw message[frame count: {0}] has received.",
"A raw message [frame count: {0}] has received.",
raw.FrameCount
);
Message message = Message.Parse(raw, reply: false);
_logger.Debug($"The message[{message}] has parsed.");
_logger.Debug("A message has parsed: {0}", message);

// it's still async because some method it relies are async yet.
Task.Run(
Expand All @@ -2000,19 +2000,27 @@ private void ReceiveMessage(object sender, NetMQSocketEventArgs e)
}
catch (Exception exc)
{
_logger.Error("Something went wrong during message parsing: {0}", exc);
_logger.Error(
exc,
"Something went wrong during message parsing: {0}",
exc
);
throw;
}
},
_cancellationToken);
}
catch (InvalidMessageException ex)
{
_logger.Error(ex, "Could not parse NetMQMessage properly; ignore.");
_logger.Error(ex, "Could not parse NetMQMessage properly; ignore: {0}", ex);
}
catch (Exception ex)
{
_logger.Error(ex, "An unexpected exception occured during ReceiveMessage().");
_logger.Error(
ex,
"An unexpected exception occured during ReceiveMessage(): {0}",
ex
);
}
}

Expand Down
Loading