Skip to content

Commit

Permalink
Merge pull request #790 from limebell/feature/prune-state
Browse files Browse the repository at this point in the history
Prune state after sync state from trusted peers
  • Loading branch information
earlbread authored Mar 9, 2020
2 parents f16a572 + 02eb916 commit c4f36de
Show file tree
Hide file tree
Showing 8 changed files with 224 additions and 4 deletions.
2 changes: 2 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ To be released.
- Added `IStore.GetBlockDigest(HashDigest<SHA256>)` method. [[#785]]
- Added `Block<T>.ToBlockDigest()` method. [[#785]]
- Added `ByteArrayExtensions` class. [[#803]]
- Added `IStore.PruneBlockStates<T>(Guid, Block<T>)` method. [[#790]]

### Behavioral changes

Expand Down Expand Up @@ -73,6 +74,7 @@ To be released.
[#785]: https://github.com/planetarium/libplanet/pull/785
[#788]: https://github.com/planetarium/libplanet/pull/788
[#789]: https://github.com/planetarium/libplanet/pull/789
[#790]: https://github.com/planetarium/libplanet/pull/790
[#791]: https://github.com/planetarium/libplanet/pull/791
[#798]: https://github.com/planetarium/libplanet/pull/798
[#802]: https://github.com/planetarium/libplanet/pull/802
Expand Down
63 changes: 63 additions & 0 deletions Libplanet.RocksDBStore/RocksDBStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,37 @@ public override void SetBlockStates(
_statesCache.AddOrUpdate(blockHash, states);
}

/// <inheritdoc/>
public override void PruneBlockStates<T>(
Guid chainId,
Block<T> until)
{
string[] keys = ListStateKeys(chainId).ToArray();
long untilIndex = until.Index;
foreach (var key in keys)
{
Tuple<HashDigest<SHA256>, long>[] stateRefs =
IterateStateReferences(chainId, key, untilIndex, null, null)
.OrderByDescending(tuple => tuple.Item2)
.ToArray();
var dict = new Dictionary<HashDigest<SHA256>, List<string>>();
foreach ((HashDigest<SHA256> blockHash, long index) in stateRefs.Skip(1))
{
if (!dict.ContainsKey(blockHash))
{
dict.Add(blockHash, new List<string>());
}

dict[blockHash].Add(key);
}

foreach (var kv in dict)
{
DeleteBlockStates(kv.Key, kv.Value);
}
}
}

public override Tuple<HashDigest<SHA256>, long> LookupStateReference<T>(
Guid chainId,
string key,
Expand Down Expand Up @@ -806,6 +837,38 @@ public override void Dispose()
_stagedTxDb?.Dispose();
}

/// <summary>
/// Deletes the states with specified keys (i.e., <paramref name="stateKeys"/>)
/// updated by actions in the specified block (i.e., <paramref name="blockHash"/>).
/// </summary>
/// <param name="blockHash"><see cref="Block{T}.Hash"/> to delete states.
/// </param>
/// <param name="stateKeys">The state keys to delete which were updated by actions
/// in the specified block (i.e., <paramref name="blockHash"/>).
/// </param>
/// <seealso cref="GetBlockStates"/>
private void DeleteBlockStates(
HashDigest<SHA256> blockHash,
IEnumerable<string> stateKeys)
{
IImmutableDictionary<string, IValue> dict = GetBlockStates(blockHash);
if (dict is null)
{
return;
}

dict = dict.RemoveRange(stateKeys);
if (dict.Any())
{
SetBlockStates(blockHash, dict);
}
else
{
_stateDb.Remove(BlockStateKey(blockHash));
_statesCache.Remove(blockHash);
}
}

private IEnumerable<Tuple<HashDigest<SHA256>, long>> IterateStateReferences(
byte[] prefix,
Iterator it,
Expand Down
8 changes: 4 additions & 4 deletions Libplanet.Tests/Net/SwarmTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2237,8 +2237,8 @@ public async Task PreloadWithDifferenctFindNextStatesChunkSize()
await StartAsync(swarm2);

await swarm1.AddPeersAsync(new[] { swarm0.AsPeer }, null);
await swarm1.PreloadAsync(trustedStateValidators:
new[] { swarm0.Address }.ToImmutableHashSet());
await swarm1.PreloadAsync(
trustedStateValidators: new[] { swarm0.Address }.ToImmutableHashSet());

Assert.Equal(chain0.BlockHashes, chain1.BlockHashes);

Expand Down Expand Up @@ -2275,8 +2275,8 @@ await swarm1.PreloadAsync(trustedStateValidators:
}

await swarm2.AddPeersAsync(new[] { swarm1.AsPeer }, null);
await swarm2.PreloadAsync(trustedStateValidators:
new[] { swarm1.Address }.ToImmutableHashSet());
await swarm2.PreloadAsync(
trustedStateValidators: new[] { swarm1.Address }.ToImmutableHashSet());

Assert.Equal(chain1.BlockHashes, chain2.BlockHashes);

Expand Down
62 changes: 62 additions & 0 deletions Libplanet.Tests/Store/StoreTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,68 @@ public void Copy()
}
}

[SkippableFact]
public async Task PruneBlockStates()
{
using (StoreFixture fx = FxConstructor())
{
IStore store = fx.Store;
var blocks = new BlockChain<DumbAction>(
new NullPolicy<DumbAction>(),
store,
Fx.GenesisBlock
);

var privKey = new PrivateKey();
Transaction<DumbAction> tx1 = Transaction<DumbAction>.Create(
0,
privKey,
new[] { new DumbAction(fx.Address1, "item0") });
Transaction<DumbAction> tx2 = Transaction<DumbAction>.Create(
1,
privKey,
new[] { new DumbAction(fx.Address1, "item1") });
Transaction<DumbAction> tx3 = Transaction<DumbAction>.Create(
2,
privKey,
new[] { new DumbAction(fx.Address1, "item2") });
blocks.StageTransactions(ImmutableHashSet<Transaction<DumbAction>>.Empty.Add(tx1));
var block1 = await blocks.MineBlock(fx.Address2);
blocks.StageTransactions(ImmutableHashSet<Transaction<DumbAction>>.Empty.Add(tx2));
var block2 = await blocks.MineBlock(fx.Address2);
blocks.StageTransactions(ImmutableHashSet<Transaction<DumbAction>>.Empty.Add(tx3));
var block3 = await blocks.MineBlock(fx.Address2);
Assert.Equal(
(Text)"item0",
blocks.GetState(fx.Address1, block1.Hash));
Assert.Equal(
(Text)"item0,item1",
blocks.GetState(fx.Address1, block2.Hash));
Assert.Equal(
(Text)"item0,item1,item2",
blocks.GetState(fx.Address1, block3.Hash));

store.PruneBlockStates(blocks.Id, block2);
Assert.Throws<IncompleteBlockStatesException>(
() => blocks.GetState(fx.Address1, block1.Hash));
Assert.Equal(
(Text)"item0,item1",
blocks.GetState(fx.Address1, block2.Hash));
Assert.Equal(
(Text)"item0,item1,item2",
blocks.GetState(fx.Address1, block3.Hash));

store.PruneBlockStates(blocks.Id, block3);
Assert.Throws<IncompleteBlockStatesException>(
() => blocks.GetState(fx.Address1, block1.Hash));
Assert.Throws<IncompleteBlockStatesException>(
() => blocks.GetState(fx.Address1, block2.Hash));
Assert.Equal(
(Text)"item0,item1,item2",
blocks.GetState(fx.Address1, block3.Hash));
}
}

private class AtomicityTestAction : IAction
{
public ImmutableArray<byte> ArbitraryBytes { get; set; }
Expand Down
9 changes: 9 additions & 0 deletions Libplanet.Tests/Store/StoreTracker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,15 @@ IImmutableDictionary<string, IValue> states
_store.SetBlockStates(blockHash, states);
}

public void PruneBlockStates<T>(
Guid chainId,
Block<T> until)
where T : IAction, new()
{
Log(nameof(PruneBlockStates), chainId, until);
_store.PruneBlockStates(chainId, until);
}

public Tuple<HashDigest<SHA256>, long> LookupStateReference<T>(
Guid chainId,
string key,
Expand Down
7 changes: 7 additions & 0 deletions Libplanet/Store/BaseStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,18 @@ public abstract IImmutableDictionary<string, IValue> GetBlockStates(
HashDigest<SHA256> blockHash
);

/// <inheritdoc />
public abstract void SetBlockStates(
HashDigest<SHA256> blockHash,
IImmutableDictionary<string, IValue> states
);

/// <inheritdoc />
public abstract void PruneBlockStates<T>(
Guid chainId,
Block<T> until)
where T : IAction, new();

/// <inheritdoc />
public abstract Tuple<HashDigest<SHA256>, long> LookupStateReference<T>(
Guid chainId,
Expand Down
64 changes: 64 additions & 0 deletions Libplanet/Store/DefaultStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,37 @@ public override void SetBlockStates(
_statesCache.AddOrUpdate(blockHash, states);
}

/// <inheritdoc/>
public override void PruneBlockStates<T>(
Guid chainId,
Block<T> until)
{
string[] keys = ListStateKeys(chainId).ToArray();
long untilIndex = until.Index;
foreach (var key in keys)
{
IEnumerable<Tuple<HashDigest<SHA256>, long>> stateRefs =
IterateStateReferences(chainId, key, untilIndex, null, null)
.OrderByDescending(tuple => tuple.Item2);

var dict = new Dictionary<HashDigest<SHA256>, List<string>>();
foreach ((HashDigest<SHA256> blockHash, long index) in stateRefs.Skip(1))
{
if (!dict.ContainsKey(blockHash))
{
dict.Add(blockHash, new List<string>());
}

dict[blockHash].Add(key);
}

foreach (var kv in dict)
{
DeleteBlockStates(kv.Key, kv.Value);
}
}
}

public override Tuple<HashDigest<SHA256>, long> LookupStateReference<T>(
Guid chainId,
string key,
Expand Down Expand Up @@ -859,6 +890,39 @@ private static void CreateDirectoryRecursively(IFileSystem fs, UPath path)
}
}

/// <summary>
/// Deletes the states with specified keys (i.e., <paramref name="stateKeys"/>)
/// updated by actions in the specified block (i.e., <paramref name="blockHash"/>).
/// </summary>
/// <param name="blockHash"><see cref="Block{T}.Hash"/> to delete states.
/// </param>
/// <param name="stateKeys">The state keys to delete which were updated by actions
/// in the specified block (i.e., <paramref name="blockHash"/>).
/// </param>
/// <seealso cref="GetBlockStates"/>
private void DeleteBlockStates(
HashDigest<SHA256> blockHash,
IEnumerable<string> stateKeys)
{
IImmutableDictionary<string, IValue> dict = GetBlockStates(blockHash);
if (dict is null)
{
return;
}

dict = dict.RemoveRange(stateKeys);
if (dict.Any())
{
SetBlockStates(blockHash, dict);
}
else
{
UPath path = StatePath(blockHash);
_states.DeleteFile(path);
_statesCache.Remove(blockHash);
}
}

private void WriteContentAddressableFile(IFileSystem fs, UPath path, byte[] contents)
{
UPath dirPath = path.GetDirectory();
Expand Down
13 changes: 13 additions & 0 deletions Libplanet/Store/IStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,19 @@ void SetBlockStates(
IImmutableDictionary<string, IValue> states
);

/// <summary>
/// Prunes states in blockchain <paramref name="chainId"/> with until specified block
/// <paramref name="until"/>.
/// </summary>
/// <param name="chainId">The chain ID to prune block states.</param>
/// <param name="until">The upper bound block to prune states.</param>
/// <typeparam name="T">An <see cref="IAction"/> class used with
/// <paramref name="until"/>.</typeparam>
void PruneBlockStates<T>(
Guid chainId,
Block<T> until)
where T : IAction, new();

/// <summary>
/// Looks up a state reference, which is a block's <see cref="Block{T}.Hash"/> that contains
/// an action mutating the <paramref name="key"/>'s tate.
Expand Down

0 comments on commit c4f36de

Please sign in to comment.