Skip to content

Commit

Permalink
Merge pull request #667 from ie3-institute/sp/#651-scheduler-phase-sw…
Browse files Browse the repository at this point in the history
…itch

Scheduler phase switch
  • Loading branch information
danielfeismann authored Dec 7, 2023
2 parents 2615c28 + 7665f4b commit 2dba213
Show file tree
Hide file tree
Showing 13 changed files with 1,514 additions and 253 deletions.
266 changes: 113 additions & 153 deletions src/main/scala/edu/ie3/simona/scheduler/Scheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@ package edu.ie3.simona.scheduler
import org.apache.pekko.actor.typed.scaladsl.Behaviors
import org.apache.pekko.actor.typed.{ActorRef, Behavior}
import edu.ie3.simona.actor.ActorUtil.stopOnError
import edu.ie3.simona.ontology.messages.{Activation, SchedulerMessage}
import edu.ie3.simona.ontology.messages.SchedulerMessage.{
Completion,
ScheduleActivation
}
import edu.ie3.simona.scheduler.SchedulerData.ActivationData
import edu.ie3.simona.ontology.messages.{Activation, SchedulerMessage}
import edu.ie3.simona.scheduler.core.Core.{
ActiveCore,
CoreFactory,
InactiveCore
}
import edu.ie3.simona.scheduler.core.RegularSchedulerCore

/** Scheduler that activates actors at specific ticks and keeps them
* synchronized by waiting for the completions of all activations. Can be
Expand All @@ -27,58 +32,71 @@ object Scheduler {
private final case class WrappedActivation(activation: Activation)
extends Incoming

/** Creates a new scheduler with given parent and core. The scheduler starts
* in the inactive state.
* @param parent
* The parent of this scheduler, which activates this scheduler and waits
* for its completion
* @param coreFactory
* The factory that delivers the core to be used within this scheduler
*/
def apply(
parent: ActorRef[SchedulerMessage]
parent: ActorRef[SchedulerMessage],
coreFactory: CoreFactory = RegularSchedulerCore
): Behavior[Incoming] = Behaviors.setup { ctx =>
val adapter =
ctx.messageAdapter[Activation](msg => WrappedActivation(msg))

inactive(
SchedulerData(parent, adapter)
SchedulerData(parent, adapter),
coreFactory.create()
)
}

private def inactive(
data: SchedulerData,
lastActiveTick: Long = Long.MinValue
core: InactiveCore
): Behavior[Incoming] =
Behaviors.receive {
case (ctx, WrappedActivation(Activation(tick))) =>
checkActivation(data, tick).map(stopOnError(ctx, _)).getOrElse {
sendCurrentTriggers(data, ActivationData(tick)) match {
case (newSchedulerData, newActivationData) =>
active(newSchedulerData, newActivationData)
}
if (core.checkActivation(tick)) {
val (toActivate, activeCore) = core.activate().takeNewActivations()

toActivate.foreach { _ ! Activation(tick) }

active(data, activeCore)
} else {
stopOnError(ctx, s"Cannot activate with new tick $tick")
}

case (
ctx,
ScheduleActivation(actor, newTick, unlockKey)
) =>
checkTriggerSchedule(lastActiveTick + 1L, newTick)
.map(stopOnError(ctx, _))
.getOrElse {
val oldEarliestTick = data.triggerQueue.headKeyOption

val updatedData = scheduleTrigger(data, actor, newTick)
val newEarliestTick = updatedData.triggerQueue.headKeyOption

// also potentially schedule with parent if the new earliest tick is
// different from the old earliest tick (including if nothing had
// been scheduled before)
if (newEarliestTick != oldEarliestTick)
if (core.checkSchedule(newTick)) {
val (maybeSchedule, newCore) = core.handleSchedule(actor, newTick)

maybeSchedule match {
case Some(scheduleTick) =>
// also potentially schedule with parent if the new earliest tick is
// different from the old earliest tick (including if nothing had
// been scheduled before)
data.parent ! ScheduleActivation(
data.activationAdapter,
newTick,
scheduleTick,
unlockKey
)
else {
case None =>
// we don't need to escalate to the parent, this means that we can release the lock (if applicable)
unlockKey.foreach { _.unlock() }
}
inactive(updatedData, lastActiveTick)
unlockKey.foreach {
_.unlock()
}
}

inactive(data, newCore)
} else {
stopOnError(ctx, s"Cannot schedule an event at tick $newTick")
}
case (ctx, unexpected) =>
stopOnError(
ctx,
Expand All @@ -88,152 +106,94 @@ object Scheduler {

private def active(
data: SchedulerData,
activationData: ActivationData
core: ActiveCore
): Behavior[Incoming] = Behaviors.receive {

case (
ctx,
ScheduleActivation(actor, newTick, unlockKey)
) =>
checkTriggerSchedule(activationData.tick, newTick)
.map(stopOnError(ctx, _))
.getOrElse {
// if there's a lock:
// since we're active and any scheduled activation can still influence our next activation,
// we can directly unlock the lock with the key
unlockKey.foreach { _.unlock() }

sendCurrentTriggers(
scheduleTrigger(data, actor, newTick),
activationData
) match {
case (newSchedulerData, newActivationData) =>
active(newSchedulerData, newActivationData)
}
if (core.checkSchedule(actor, newTick)) {
val (toActivate, newCore) =
core.handleSchedule(actor, newTick).takeNewActivations()

// if there's a lock:
// since we're active and any scheduled activation can still influence our next activation,
// we can directly unlock the lock with the key
unlockKey.foreach {
_.unlock()
}

case (ctx, Completion(actor, maybeNewTick)) =>
val currentTick = activationData.tick
toActivate.foreach {
_ ! Activation(newCore.activeTick)
}

checkCompletion(activationData, actor)
.toLeft(handleCompletion(activationData, actor))
.flatMap { updatedActivationData =>
// schedule new triggers, if present
active(data, newCore)
} else {
stopOnError(ctx, s"Cannot schedule an event at tick $newTick")
}

case (ctx, Completion(actor, maybeNewTick)) =>
Either
.cond(
core.checkCompletion(actor),
core.handleCompletion(actor),
s"Actor $actor is not part of the expected completing actors"
)
.flatMap { newCore =>
// if successful
maybeNewTick
.map { newTick =>
checkTriggerSchedule(currentTick, newTick)
.toLeft(
scheduleTrigger(data, actor, newTick)
Either
.cond(
newCore.checkSchedule(actor, newTick),
newCore.handleSchedule(actor, newTick),
s"Cannot schedule an event at tick $newTick for completing actor $actor"
)
}
.getOrElse(Right(data))
.map((_, updatedActivationData))
.getOrElse(Right(newCore))
}
.map { case (updatedData, updatedActivationData) =>
if (isTickCompleted(updatedData, updatedActivationData)) {
// send completion to parent, if all completed
completeWithParent(updatedData)
inactive(updatedData, currentTick)
} else {
// there might be new triggers for current currentTick, send them out
sendCurrentTriggers(updatedData, updatedActivationData) match {
case (newSchedulerData, newActivationData) =>
active(newSchedulerData, newActivationData)
}
.map { newCore =>
val (toActivate, updatedCore) = newCore.takeNewActivations()
toActivate.foreach {
_ ! Activation(updatedCore.activeTick)
}

updatedCore
}
.fold(stopOnError(ctx, _), identity)
.map { newCore =>
newCore
.maybeComplete()
.map { case (maybeScheduleTick, inactiveCore) =>
data.parent ! Completion(
data.activationAdapter,
maybeScheduleTick
)
inactive(data, inactiveCore)
}
.getOrElse {
active(data, newCore)
}
}
.fold(
stopOnError(ctx, _),
identity
)

case (ctx, unexpected) =>
stopOnError(ctx, s"Received unexpected message $unexpected when active")
}

private def checkActivation(
data: SchedulerData,
newTick: Long
): Option[String] =
data.triggerQueue.headKeyOption.flatMap { minScheduledKey =>
Option.when(newTick != minScheduledKey) {
s"The next tick to activate is $minScheduledKey, not $newTick"
}
}

private def checkTriggerSchedule(
minTick: Long,
newTick: Long
): Option[String] = {
Option.when(newTick < minTick) {
s"Cannot schedule an event at tick $newTick when the last or currently activated tick is $minTick"
}
}

private def checkCompletion(
activationData: ActivationData,
actor: ActorRef[Activation]
): Option[String] =
Option.unless(
activationData.activeActors.contains(actor)
) {
s"Actor $actor is not part of expected completing actors ${activationData.activeActors}"
}

private def sendCurrentTriggers(
data: SchedulerData,
activationData: ActivationData
): (SchedulerData, ActivationData) = {
val newActivationData =
data.triggerQueue
.getAndRemoveSet(activationData.tick)
.foldLeft(activationData) {
case (
updatedActivationData,
actor
) =>
// track the trigger id with the scheduled trigger
updatedActivationData.activeActors += actor

actor ! Activation(activationData.tick)

updatedActivationData

}

(data, newActivationData)
}

private def scheduleTrigger(
data: SchedulerData,
actorToBeScheduled: ActorRef[Activation],
tick: Long
): SchedulerData = {
/* update trigger queue */
data.triggerQueue.set(
tick,
actorToBeScheduled
)

data
}

private def handleCompletion(
data: ActivationData,
actor: ActorRef[Activation]
): ActivationData = {
data.activeActors.remove(actor)
data
}

/** Returns true if current tick can be completed with parent.
/** Data that is constant over the life time of a scheduler.
* @param parent
* The parent of the scheduler
* @param activationAdapter
* The activation adapter that is used to activate the scheduler
*/
private def isTickCompleted(
data: SchedulerData,
activationData: ActivationData
): Boolean =
activationData.activeActors.isEmpty &&
!data.triggerQueue.headKeyOption.contains(activationData.tick)

private def completeWithParent(data: SchedulerData): Unit = {
val newTick = data.triggerQueue.headKeyOption
data.parent ! Completion(data.activationAdapter, newTick)
}
private final case class SchedulerData(
parent: ActorRef[
SchedulerMessage
],
activationAdapter: ActorRef[Activation]
)
}
38 changes: 0 additions & 38 deletions src/main/scala/edu/ie3/simona/scheduler/SchedulerData.scala

This file was deleted.

Loading

0 comments on commit 2dba213

Please sign in to comment.