Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler phase switch #667

Merged
merged 25 commits into from
Dec 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
8005b07
Refactoring scheduler
sebastian-peter Nov 24, 2023
e1eaa9b
Small change in PriorityMultiBiSet
sebastian-peter Nov 27, 2023
f9f317d
Introducing PrioritySwitchBiSet with test
sebastian-peter Nov 27, 2023
8b31cfe
Always send out activations after completion received (important for …
sebastian-peter Nov 27, 2023
09dfccd
Some refactorings, introducing PhaseSwitchCore
sebastian-peter Nov 27, 2023
b665d4c
Merge branch 'sp/#650-scheduling-protocol' into sp/#651-scheduler-pha…
sebastian-peter Nov 28, 2023
b31ce47
Adapting new files to pekko
sebastian-peter Nov 28, 2023
e542d44
Enhancing SchedulerSpec
sebastian-peter Nov 28, 2023
d746236
Configurable Core in Scheduler
sebastian-peter Nov 28, 2023
6fed6ea
Improving comments in SchedulerSpec
sebastian-peter Nov 28, 2023
080f30b
Fixes
sebastian-peter Nov 28, 2023
8d7fd80
Fixing variable shadowing
sebastian-peter Nov 28, 2023
d04fcd1
Enhancing PrioritySwitchBiSet to fix PhaseSwitchCore
sebastian-peter Nov 28, 2023
2ce91b4
Fixing activation retrieval from PrioritySwitchBiSet
sebastian-peter Nov 28, 2023
0e52567
One more test case in SchedulerSpec
sebastian-peter Nov 28, 2023
e9996e8
Fixing scheduling checking when active
sebastian-peter Nov 28, 2023
982b8e3
Refactoring "trigger"
sebastian-peter Nov 28, 2023
74348be
PhaseSwitchSchedulerSpec
sebastian-peter Nov 28, 2023
54dab62
Merge branch 'dev' into sp/#651-scheduler-phase-switch
sebastian-peter Nov 29, 2023
a13238c
Merge branch 'dev' into sp/#651-scheduler-phase-switch
sebastian-peter Nov 30, 2023
a2faeb6
Adding/fixing ScalaDoc
sebastian-peter Nov 30, 2023
39610e5
Fixing code smells
sebastian-peter Nov 30, 2023
a0252de
Merge branch 'dev' into sp/#651-scheduler-phase-switch
sebastian-peter Dec 2, 2023
02037e3
Shorten code with Actor type def
sebastian-peter Dec 5, 2023
7665f4b
Merge branch 'dev' into sp/#651-scheduler-phase-switch
sebastian-peter Dec 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 113 additions & 153 deletions src/main/scala/edu/ie3/simona/scheduler/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ package edu.ie3.simona.scheduler
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.actor.typed.{ActorRef, Behavior}
import edu.ie3.simona.actor.ActorUtil.stopOnError
import edu.ie3.simona.ontology.messages.{Activation, SchedulerMessage}
import edu.ie3.simona.ontology.messages.SchedulerMessage.{
Completion,
ScheduleActivation
}
import edu.ie3.simona.scheduler.SchedulerData.ActivationData
import edu.ie3.simona.ontology.messages.{Activation, SchedulerMessage}
import edu.ie3.simona.scheduler.core.Core.{
ActiveCore,
CoreFactory,
InactiveCore
}
import edu.ie3.simona.scheduler.core.RegularSchedulerCore

/** Scheduler that activates actors at specific ticks and keeps them
* synchronized by waiting for the completions of all activations. Can be
Expand All @@ -27,58 +32,71 @@ object Scheduler {
private final case class WrappedActivation(activation: Activation)
extends Incoming

/** Creates a new scheduler with given parent and core. The scheduler starts
* in the inactive state.
* @param parent
* The parent of this scheduler, which activates this scheduler and waits
* for its completion
* @param coreFactory
* The factory that delivers the core to be used within this scheduler
*/
def apply(
parent: ActorRef[SchedulerMessage]
parent: ActorRef[SchedulerMessage],
coreFactory: CoreFactory = RegularSchedulerCore
): Behavior[Incoming] = Behaviors.setup { ctx =>
val adapter =
ctx.messageAdapter[Activation](msg => WrappedActivation(msg))

inactive(
SchedulerData(parent, adapter)
SchedulerData(parent, adapter),
coreFactory.create()
)
}

private def inactive(
data: SchedulerData,
lastActiveTick: Long = Long.MinValue
core: InactiveCore
): Behavior[Incoming] =
Behaviors.receive {
case (ctx, WrappedActivation(Activation(tick))) =>
checkActivation(data, tick).map(stopOnError(ctx, _)).getOrElse {
sendCurrentTriggers(data, ActivationData(tick)) match {
case (newSchedulerData, newActivationData) =>
active(newSchedulerData, newActivationData)
}
if (core.checkActivation(tick)) {
val (toActivate, activeCore) = core.activate().takeNewActivations()

toActivate.foreach { _ ! Activation(tick) }

active(data, activeCore)
} else {
stopOnError(ctx, s"Cannot activate with new tick $tick")
}

case (
ctx,
ScheduleActivation(actor, newTick, unlockKey)
) =>
checkTriggerSchedule(lastActiveTick + 1L, newTick)
.map(stopOnError(ctx, _))
.getOrElse {
val oldEarliestTick = data.triggerQueue.headKeyOption

val updatedData = scheduleTrigger(data, actor, newTick)
val newEarliestTick = updatedData.triggerQueue.headKeyOption

// also potentially schedule with parent if the new earliest tick is
// different from the old earliest tick (including if nothing had
// been scheduled before)
if (newEarliestTick != oldEarliestTick)
if (core.checkSchedule(newTick)) {
val (maybeSchedule, newCore) = core.handleSchedule(actor, newTick)

maybeSchedule match {
case Some(scheduleTick) =>
// also potentially schedule with parent if the new earliest tick is
// different from the old earliest tick (including if nothing had
// been scheduled before)
data.parent ! ScheduleActivation(
data.activationAdapter,
newTick,
scheduleTick,
unlockKey
)
else {
case None =>
// we don't need to escalate to the parent, this means that we can release the lock (if applicable)
unlockKey.foreach { _.unlock() }
}
inactive(updatedData, lastActiveTick)
unlockKey.foreach {
_.unlock()
}
}

inactive(data, newCore)
} else {
stopOnError(ctx, s"Cannot schedule an event at tick $newTick")
}
case (ctx, unexpected) =>
stopOnError(
ctx,
Expand All @@ -88,152 +106,94 @@ object Scheduler {

private def active(
data: SchedulerData,
activationData: ActivationData
core: ActiveCore
): Behavior[Incoming] = Behaviors.receive {

case (
ctx,
ScheduleActivation(actor, newTick, unlockKey)
) =>
checkTriggerSchedule(activationData.tick, newTick)
.map(stopOnError(ctx, _))
.getOrElse {
// if there's a lock:
// since we're active and any scheduled activation can still influence our next activation,
// we can directly unlock the lock with the key
unlockKey.foreach { _.unlock() }

sendCurrentTriggers(
scheduleTrigger(data, actor, newTick),
activationData
) match {
case (newSchedulerData, newActivationData) =>
active(newSchedulerData, newActivationData)
}
if (core.checkSchedule(actor, newTick)) {
val (toActivate, newCore) =
core.handleSchedule(actor, newTick).takeNewActivations()

// if there's a lock:
// since we're active and any scheduled activation can still influence our next activation,
// we can directly unlock the lock with the key
unlockKey.foreach {
_.unlock()
}

case (ctx, Completion(actor, maybeNewTick)) =>
val currentTick = activationData.tick
toActivate.foreach {
_ ! Activation(newCore.activeTick)
}

checkCompletion(activationData, actor)
.toLeft(handleCompletion(activationData, actor))
.flatMap { updatedActivationData =>
// schedule new triggers, if present
active(data, newCore)
} else {
stopOnError(ctx, s"Cannot schedule an event at tick $newTick")
}

case (ctx, Completion(actor, maybeNewTick)) =>
Either
.cond(
core.checkCompletion(actor),
core.handleCompletion(actor),
s"Actor $actor is not part of the expected completing actors"
)
.flatMap { newCore =>
// if successful
maybeNewTick
.map { newTick =>
checkTriggerSchedule(currentTick, newTick)
.toLeft(
scheduleTrigger(data, actor, newTick)
Either
.cond(
newCore.checkSchedule(actor, newTick),
newCore.handleSchedule(actor, newTick),
s"Cannot schedule an event at tick $newTick for completing actor $actor"
)
}
.getOrElse(Right(data))
.map((_, updatedActivationData))
.getOrElse(Right(newCore))
}
.map { case (updatedData, updatedActivationData) =>
if (isTickCompleted(updatedData, updatedActivationData)) {
// send completion to parent, if all completed
completeWithParent(updatedData)
inactive(updatedData, currentTick)
} else {
// there might be new triggers for current currentTick, send them out
sendCurrentTriggers(updatedData, updatedActivationData) match {
case (newSchedulerData, newActivationData) =>
active(newSchedulerData, newActivationData)
}
.map { newCore =>
val (toActivate, updatedCore) = newCore.takeNewActivations()
toActivate.foreach {
_ ! Activation(updatedCore.activeTick)
}

updatedCore
}
.fold(stopOnError(ctx, _), identity)
.map { newCore =>
newCore
.maybeComplete()
.map { case (maybeScheduleTick, inactiveCore) =>
data.parent ! Completion(
data.activationAdapter,
maybeScheduleTick
)
inactive(data, inactiveCore)
}
.getOrElse {
active(data, newCore)
}
}
.fold(
stopOnError(ctx, _),
identity
)

case (ctx, unexpected) =>
stopOnError(ctx, s"Received unexpected message $unexpected when active")
}

private def checkActivation(
data: SchedulerData,
newTick: Long
): Option[String] =
data.triggerQueue.headKeyOption.flatMap { minScheduledKey =>
Option.when(newTick != minScheduledKey) {
s"The next tick to activate is $minScheduledKey, not $newTick"
}
}

private def checkTriggerSchedule(
minTick: Long,
newTick: Long
): Option[String] = {
Option.when(newTick < minTick) {
s"Cannot schedule an event at tick $newTick when the last or currently activated tick is $minTick"
}
}

private def checkCompletion(
activationData: ActivationData,
actor: ActorRef[Activation]
): Option[String] =
Option.unless(
activationData.activeActors.contains(actor)
) {
s"Actor $actor is not part of expected completing actors ${activationData.activeActors}"
}

private def sendCurrentTriggers(
data: SchedulerData,
activationData: ActivationData
): (SchedulerData, ActivationData) = {
val newActivationData =
data.triggerQueue
.getAndRemoveSet(activationData.tick)
.foldLeft(activationData) {
case (
updatedActivationData,
actor
) =>
// track the trigger id with the scheduled trigger
updatedActivationData.activeActors += actor

actor ! Activation(activationData.tick)

updatedActivationData

}

(data, newActivationData)
}

private def scheduleTrigger(
data: SchedulerData,
actorToBeScheduled: ActorRef[Activation],
tick: Long
): SchedulerData = {
/* update trigger queue */
data.triggerQueue.set(
tick,
actorToBeScheduled
)

data
}

private def handleCompletion(
data: ActivationData,
actor: ActorRef[Activation]
): ActivationData = {
data.activeActors.remove(actor)
data
}

/** Returns true if current tick can be completed with parent.
/** Data that is constant over the life time of a scheduler.
* @param parent
* The parent of the scheduler
* @param activationAdapter
* The activation adapter that is used to activate the scheduler
*/
private def isTickCompleted(
data: SchedulerData,
activationData: ActivationData
): Boolean =
activationData.activeActors.isEmpty &&
!data.triggerQueue.headKeyOption.contains(activationData.tick)

private def completeWithParent(data: SchedulerData): Unit = {
val newTick = data.triggerQueue.headKeyOption
data.parent ! Completion(data.activationAdapter, newTick)
}
private final case class SchedulerData(
parent: ActorRef[
SchedulerMessage
],
activationAdapter: ActorRef[Activation]
)
}
38 changes: 0 additions & 38 deletions src/main/scala/edu/ie3/simona/scheduler/SchedulerData.scala

This file was deleted.

Loading