Skip to content

Commit

Permalink
Exclude exiting members in Read/Write MajorityPlus (migrated from akk…
Browse files Browse the repository at this point in the history
…a/akka#30328) (#5227)

* this saves at least 2 seconds where the coordinator is not able to respond
  when the oldest node is shutdown
  • Loading branch information
zbynek001 authored Aug 26, 2021
1 parent bd695c1 commit a23ebf0
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/contrib/cluster/Akka.DistributedData/ReadAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ public override int GetHashCode()

/// <summary>
/// <see cref="ReadMajority"/> but with the given number of <see cref="Additional"/> nodes added to the majority count. At most
/// all nodes.
/// all nodes. Exiting nodes are excluded using `ReadMajorityPlus` because those are typically
/// about to be removed and will not be able to respond.
/// </summary>
public sealed class ReadMajorityPlus : IReadConsistency, IEquatable<ReadMajorityPlus>
{
Expand Down
45 changes: 38 additions & 7 deletions src/contrib/cluster/Akka.DistributedData/Replicator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,11 @@ public static Props Props(ReplicatorSettings settings) =>
/// </summary>
private ImmutableSortedSet<Address> _joiningNodes = ImmutableSortedSet<Address>.Empty;

/// <summary>
/// cluster exiting nodes, doesn't contain selfAddress
/// </summary>
private ImmutableSortedSet<Address> _exitingNodes = ImmutableSortedSet<Address>.Empty;

private ImmutableDictionary<UniqueAddress, long> _removedNodes = ImmutableDictionary<UniqueAddress, long>.Empty;
private ImmutableDictionary<UniqueAddress, long> _pruningPerformed = ImmutableDictionary<UniqueAddress, long>.Empty;
private ImmutableHashSet<UniqueAddress> _tombstonedNodes = ImmutableHashSet<UniqueAddress>.Empty;
Expand Down Expand Up @@ -423,12 +428,22 @@ public Replicator(ReplicatorSettings settings)
else Become(NormalReceive);
}

private IImmutableList<Address> NodesForReadWrite()
private IImmutableList<Address> NodesForReadWrite(bool excludeExiting)
{
if (_settings.PreferOldest)
return _membersByAge.Select(i => i.Address).ToImmutableList();
if (excludeExiting && !_exitingNodes.IsEmpty)
{
if (_settings.PreferOldest)
return _membersByAge.Where(i => !_exitingNodes.Contains(i.Address)).Select(i => i.Address).ToImmutableList();
else
return _nodes.Except(_exitingNodes).ToImmutableList();
}
else
return _nodes.ToImmutableList();
{
if (_settings.PreferOldest)
return _membersByAge.Select(i => i.Address).ToImmutableList();
else
return _nodes.ToImmutableList();
}
}

protected override void PreStart()
Expand Down Expand Up @@ -585,6 +600,7 @@ private bool NormalReceive(object message)
case ClusterEvent.MemberJoined m: ReceiveMemberJoining(m.Member); return true;
case ClusterEvent.MemberWeaklyUp m: ReceiveMemberWeaklyUp(m.Member); return true;
case ClusterEvent.MemberUp m: ReceiveMemberUp(m.Member); return true;
case ClusterEvent.MemberExited m: ReceiveMemberExiting(m.Member); return true;
case ClusterEvent.MemberRemoved m: ReceiveMemberRemoved(m.Member); return true;

case ClusterEvent.IMemberEvent m: ReceiveOtherMemberEvent(m.Member); return true;
Expand Down Expand Up @@ -613,8 +629,12 @@ private void ReceiveGet(IKey key, IReadConsistency consistency, object req)
else Sender.Tell(new GetSuccess(key, req, localValue.Data));
}
else
Context.ActorOf(ReadAggregator.Props(key, consistency, req, NodesForReadWrite(), _unreachable, !_settings.PreferOldest, localValue, Sender)
{
var excludeExiting = consistency is ReadMajorityPlus || consistency is ReadAll;

Context.ActorOf(ReadAggregator.Props(key, consistency, req, NodesForReadWrite(excludeExiting), _unreachable, !_settings.PreferOldest, localValue, Sender)
.WithDispatcher(Context.Props.Dispatcher));
}
}

private bool IsLocalGet(IReadConsistency consistency)
Expand Down Expand Up @@ -727,8 +747,10 @@ private void ReceiveUpdate(IKey key, Func<IReplicatedData, IReplicatedData> modi
// The order is also kept when prefer-oldest is enabled.
var shuffle = !(_settings.PreferOldest || (writeDelta?.RequiresCausalDeliveryOfDeltas) == true);

var excludeExiting = consistency is WriteMajorityPlus || consistency is WriteAll;

var writeAggregator = Context.ActorOf(WriteAggregator
.Props(key, writeEnvelope, writeDelta, consistency, request, NodesForReadWrite(), _unreachable, shuffle, Sender, durable)
.Props(key, writeEnvelope, writeDelta, consistency, request, NodesForReadWrite(excludeExiting), _unreachable, shuffle, Sender, durable)
.WithDispatcher(Context.Props.Dispatcher));

if (durable)
Expand Down Expand Up @@ -851,8 +873,10 @@ private void ReceiveDelete(IKey key, IWriteConsistency consistency, object reque
}
else
{
var excludeExiting = consistency is WriteMajorityPlus || consistency is WriteAll;

var writeAggregator = Context.ActorOf(WriteAggregator
.Props(key, DeletedEnvelope, null, consistency, request, NodesForReadWrite(), _unreachable, !_settings.PreferOldest, Sender, durable)
.Props(key, DeletedEnvelope, null, consistency, request, NodesForReadWrite(excludeExiting), _unreachable, !_settings.PreferOldest, Sender, durable)
.WithDispatcher(Context.Props.Dispatcher));

if (durable)
Expand Down Expand Up @@ -1302,6 +1326,12 @@ private void ReceiveMemberUp(Member m)
}
}

private void ReceiveMemberExiting(Member m)
{
if (MatchingRole(m) && m.Address != _selfAddress)
_exitingNodes = _exitingNodes.Add(m.Address);
}

private void ReceiveMemberRemoved(Member m)
{
if (m.Address == _selfAddress) Context.Stop(Self);
Expand All @@ -1315,6 +1345,7 @@ private void ReceiveMemberRemoved(Member m)
_nodes = _nodes.Remove(m.Address);
_weaklyUpNodes = _weaklyUpNodes.Remove(m.Address);
_joiningNodes = _joiningNodes.Remove(m.Address);
_exitingNodes = _exitingNodes.Remove(m.Address);

_removedNodes = _removedNodes.SetItem(m.UniqueAddress, _allReachableClockTime);
_unreachable = _unreachable.Remove(m.Address);
Expand Down
3 changes: 2 additions & 1 deletion src/contrib/cluster/Akka.DistributedData/WriteAggregator.cs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,8 @@ public override int GetHashCode()

/// <summary>
/// <see cref="WriteMajority"/> but with the given number of <see cref="Additional"/> nodes added to the majority count. At most
/// all nodes.
/// all nodes. Exiting nodes are excluded using `WriteMajorityPlus` because those are typically
/// about to be removed and will not be able to respond.
/// </summary>
public sealed class WriteMajorityPlus : IWriteConsistency, IEquatable<WriteMajorityPlus>
{
Expand Down

0 comments on commit a23ebf0

Please sign in to comment.