Skip to content
This repository was archived by the owner on Jan 26, 2021. It is now read-only.

Commit 967cb84

Browse files
author
Dennis van der Bij
authored
Merge pull request #80 from SwissBorg/feature/leader-only
only let the leader down nodes
2 parents 50745bd + 35b48d0 commit 967cb84

File tree

3 files changed

+63
-21
lines changed

3 files changed

+63
-21
lines changed

src/main/scala/akka/cluster/swissborg/LithiumReachability.scala

-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package akka.cluster.swissborg
22

3-
import akka.cluster.Reachability.Record
43
import akka.cluster._
54

65
sealed abstract class LithiumReachability {

src/main/scala/com/swissborg/lithium/resolver/SplitBrainResolver.scala

+62-19
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import akka.actor._
66
import akka.cluster._
77
import cats.effect.SyncIO
88
import cats.implicits._
9+
import com.swissborg.lithium.implicits._
910
import com.swissborg.lithium.reporter._
1011
import com.swissborg.lithium.strategy._
1112

@@ -38,6 +39,7 @@ private[lithium] class SplitBrainResolver(private val _strategy: Strategy[SyncIO
3839

3940
private val cluster: Cluster = Cluster(context.system)
4041
private val selfUniqueAddress: UniqueAddress = cluster.selfUniqueAddress
42+
private val selfMember: Member = cluster.selfMember
4143

4244
private val strategy: Union[SyncIO, Strategy, IndirectlyConnected] =
4345
new Union(_strategy, new IndirectlyConnected)
@@ -47,20 +49,42 @@ private[lithium] class SplitBrainResolver(private val _strategy: Strategy[SyncIO
4749
@SuppressWarnings(Array("org.wartremover.warts.Any"))
4850
override def receive: Receive = {
4951
case SplitBrainResolver.ResolveSplitBrain(worldView) =>
50-
resolveSplitBrain(worldView).unsafeRunSync()
52+
cluster.state.leader match {
53+
case Some(leader) if leader === selfUniqueAddress.address =>
54+
resolveSplitBrain(worldView, false).unsafeRunSync()
55+
56+
case None =>
57+
// There is no leader, only down self if part of the nodes to down
58+
resolveSplitBrain(worldView, true).unsafeRunSync()
59+
60+
case _ =>
61+
// member is not the leader, do nothing.
62+
log.debug("[{}}] is not the leader. The leader will handle the split-brain.", selfMember)
63+
}
5164

5265
case SplitBrainResolver.DownAll(worldView) =>
53-
downAll(worldView).unsafeRunSync()
66+
cluster.state.leader match {
67+
case Some(leader) if leader === selfUniqueAddress.address =>
68+
downAll(worldView, false).unsafeRunSync()
69+
70+
case None =>
71+
// There is no leader, only down self if part of the nodes to down
72+
downAll(worldView, true).unsafeRunSync()
73+
74+
case _ =>
75+
// member is not the leader, do nothing.
76+
log.debug("[{}}] is not the leader. The leader will down all the nodes.", selfMember)
77+
}
5478
}
5579

5680
/**
5781
* Handle the partition using the [[Union]] of the configured
5882
* strategy and the [[IndirectlyConnected]].
5983
*/
60-
private def resolveSplitBrain(worldView: WorldView): SyncIO[Unit] =
84+
private def resolveSplitBrain(worldView: WorldView, downSelfOnly: Boolean): SyncIO[Unit] =
6185
for {
6286
_ <- SyncIO(
63-
log.info(
87+
log.warning(
6488
"""[{}] Received request to handle a split-brain...
6589
|-- Worldview --
6690
|Reachable nodes:
@@ -70,22 +94,22 @@ private[lithium] class SplitBrainResolver(private val _strategy: Strategy[SyncIO
7094
|Indirectly-connected nodes:
7195
| {}
7296
|""".stripMargin,
73-
selfUniqueAddress,
97+
selfMember,
7498
worldView.reachableNodes.mkString_("\n "),
7599
worldView.unreachableNodes.mkString_("\n "),
76100
worldView.indirectlyConnectedNodes.mkString_("\n ")
77101
)
78102
)
79-
_ <- runStrategy(strategy, worldView)
103+
_ <- runStrategy(strategy, worldView, downSelfOnly)
80104
} yield ()
81105

82106
/**
83107
* Handle the partition by downing all the members.
84108
*/
85-
private def downAll(worldView: WorldView): SyncIO[Unit] =
109+
private def downAll(worldView: WorldView, downSelfOnly: Boolean): SyncIO[Unit] =
86110
for {
87111
_ <- SyncIO(
88-
log.info(
112+
log.warning(
89113
"""[{}] Received request to down all the nodes...
90114
|-- Worldview --
91115
|Reachable nodes:
@@ -95,13 +119,13 @@ private[lithium] class SplitBrainResolver(private val _strategy: Strategy[SyncIO
95119
|Indirectly-connected nodes:
96120
| {}
97121
|""".stripMargin,
98-
selfUniqueAddress,
122+
selfMember,
99123
worldView.reachableNodes.mkString_("\n "),
100124
worldView.unreachableNodes.mkString_("\n "),
101125
worldView.indirectlyConnectedNodes.mkString_("\n ")
102126
)
103127
)
104-
_ <- runStrategy(downAll, worldView)
128+
_ <- runStrategy(downAll, worldView, downSelfOnly)
105129
} yield ()
106130

107131
/**
@@ -110,16 +134,35 @@ private[lithium] class SplitBrainResolver(private val _strategy: Strategy[SyncIO
110134
* Enable `nonJoiningOnly` so that joining and weakly-up
111135
* members do not run the strategy.
112136
*/
113-
private def runStrategy(strategy: Strategy[SyncIO], worldView: WorldView): SyncIO[Unit] = {
114-
def execute(decision: Decision): SyncIO[Unit] =
115-
for {
116-
_ <- SyncIO(
117-
log.info("""[{}] Downing the nodes:
118-
| {}""".stripMargin, selfUniqueAddress, Decision.allNodesToDown(decision).mkString_("\n "))
137+
private def runStrategy(strategy: Strategy[SyncIO], worldView: WorldView, downSelfOnly: Boolean): SyncIO[Unit] = {
138+
def execute(decision: Decision): SyncIO[Unit] = {
139+
140+
val nodesToDown =
141+
if (downSelfOnly) decision.nodesToDown.filter(_.uniqueAddress === selfUniqueAddress)
142+
else decision.nodesToDown
143+
144+
if (nodesToDown.nonEmpty) {
145+
for {
146+
_ <- SyncIO(
147+
log.warning(
148+
"""[{}] Downing the nodes:
149+
| {}{}
150+
|""".stripMargin,
151+
selfMember,
152+
nodesToDown.mkString_("\n "),
153+
if (downSelfOnly) "\nNote: no leader, only the self node will be downed." else ""
154+
)
155+
)
156+
_ <- nodesToDown.toList.traverse_(node => SyncIO(cluster.down(node.address)))
157+
} yield ()
158+
} else {
159+
SyncIO(
160+
log.warning("[{}] No nodes to down. {}",
161+
selfMember,
162+
if (downSelfOnly) "\nNote: no leader, only the self node can be downed." else "")
119163
)
120-
_ <- decision.nodesToDown.toList.traverse_(node => SyncIO(cluster.down(node.address)))
121-
122-
} yield ()
164+
}
165+
}
123166

124167
strategy
125168
.takeDecision(worldView)

src/multi-jvm/scala/com/swissborg/lithium/strategy/staticquorum/StaticQuorumSpec9.scala

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ class StaticQuorumSpec9MultiJvmNode9 extends StaticQuorumSpec9
2020
class StaticQuorumSpec9MultiJvmNode10 extends StaticQuorumSpec9
2121

2222
/**
23-
* Node2 and node3 are indirectly connected in a ten node cluster
23+
* Node8 and node9 are indirectly connected in a ten node cluster
2424
* Node9 and node10 are indirectly connected in a ten node cluster
2525
*/
2626
sealed abstract class StaticQuorumSpec9 extends TenNodeSpec("StaticQuorum", StaticQuorumSpec3Config) {

0 commit comments

Comments
 (0)