Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TMP: Introducing priority triggers to SimScheduler #380

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/main/scala/edu/ie3/simona/api/ExtMessageUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ object ExtMessageUtils {
val newTriggers =
Option.when(!extCompl.newTriggers.isEmpty) {
extCompl.newTriggers.asScala.map { tick =>
ScheduleTriggerMessage(ActivityStartTrigger(tick), triggerActor)
ScheduleTriggerMessage(
ActivityStartTrigger(tick),
triggerActor,
priority = true
)
}.toSeq
}

Expand All @@ -44,7 +48,8 @@ object ExtMessageUtils {
def toSimona(tick: Long): ScheduleTriggerMessage =
ScheduleTriggerMessage(
ActivityStartTrigger(tick),
sched.getDataService
sched.getDataService,
priority = true
)
}
}
3 changes: 2 additions & 1 deletion src/main/scala/edu/ie3/simona/api/ExtSimAdapter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ final case class ExtSimAdapter(scheduler: ActorRef)
Seq(
ScheduleTriggerMessage(
ActivityStartTrigger(INIT_SIM_TICK),
self
self,
priority = true
)
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ object SchedulerMessage {
*/
final case class ScheduleTriggerMessage(
trigger: Trigger,
actorToBeScheduled: ActorRef
actorToBeScheduled: ActorRef,
priority: Boolean = false
) extends SchedulerMessage

/** Confirm the end of an action e.g. fsm state transitions for one tick to
Expand Down
102 changes: 77 additions & 25 deletions src/main/scala/edu/ie3/simona/scheduler/SchedulerHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ trait SchedulerHelper extends SimonaActorLogging {
stateData: SchedulerStateData
): SchedulerStateData =
stateData.copy(
trigger =
sendEligibleTrigger(stateData.trigger, stateData.time.nowInTicks)
trigger = sendEligibleTrigger(
stateData.trigger,
stateData.time.nowInTicks,
stateData.runtime.priorityPhase
)
)

/** Send out all trigger that are eligible to be send for the current state of
Expand All @@ -75,10 +78,17 @@ trait SchedulerHelper extends SimonaActorLogging {
*/
protected def sendEligibleTrigger(
triggerData: TriggerData,
nowInTicks: Long
nowInTicks: Long,
priorityPhase: Boolean
): TriggerData = {

triggerData.triggerQueue.pollTo(nowInTicks).foreach {
val queue =
if (priorityPhase)
triggerData.priorityTriggerQueue
else
triggerData.triggerQueue

queue.pollTo(nowInTicks).foreach {
case scheduledTrigger @ ScheduledTrigger(triggerWithIdMessage, actor) =>
// track that we wait for a response for this tick
triggerData.awaitingResponseMap.add(triggerWithIdMessage.trigger.tick)
Expand Down Expand Up @@ -132,7 +142,7 @@ trait SchedulerHelper extends SimonaActorLogging {

/* if we do not exceed nowInSeconds + parallelWindow OR do not wait on any responses we can send out new triggers */
if (
canWeSendTrigger(
canWeAdvanceInTime(
stateData.trigger.awaitingResponseMap,
nowInTicks,
parallelWindow
Expand All @@ -151,18 +161,29 @@ trait SchedulerHelper extends SimonaActorLogging {

/* if we do not exceed (nowInTicks+1) + parallelWindow OR do not wait on any responses, we can move on in time by one tick */
if (
nowInTicks <= endTick && canWeSendTrigger(
nowInTicks <= endTick && canWeAdvanceInTime(
updatedStateData.trigger.awaitingResponseMap,
nowInTicks + 1,
parallelWindow
)
) {
doSimStep(
updatedStateData.copy(
time = updatedStateData.time
.copy(nowInTicks = nowInTicks + 1)
)
)
val nextStateData =
if (stateData.runtime.priorityPhase) {
updatedStateData.copy(
runtime = updatedStateData.runtime.copy(
priorityPhase = false
)
)
} else
updatedStateData.copy(
time = updatedStateData.time
.copy(nowInTicks = nowInTicks + 1),
runtime = updatedStateData.runtime.copy(
priorityPhase = true
)
)

doSimStep(nextStateData)
} else {
/* we cannot move on in time for (nowInTicks+1), return updated data */
updatedStateData
Expand Down Expand Up @@ -295,7 +316,7 @@ trait SchedulerHelper extends SimonaActorLogging {
* @return
* true if trigger can be send, false otherwise
*/
private def canWeSendTrigger(
private def canWeAdvanceInTime(
awaitingResponseMap: CountingMap[Long],
nowInTicks: Long,
parallelWindow: Long
Expand Down Expand Up @@ -335,6 +356,9 @@ trait SchedulerHelper extends SimonaActorLogging {
&& noScheduledTriggersForCurrentTick(
stateData.trigger.triggerQueue,
nowInTicks
) && noScheduledTriggersForCurrentTick(
stateData.trigger.priorityTriggerQueue,
nowInTicks
)
&& stateData.event.lastCheckWindowPassedTick < nowInTicks
) {
Expand Down Expand Up @@ -379,6 +403,9 @@ trait SchedulerHelper extends SimonaActorLogging {
&& noScheduledTriggersForCurrentTick(
stateData.trigger.triggerQueue,
nowInTicks
) && noScheduledTriggersForCurrentTick(
stateData.trigger.priorityTriggerQueue,
nowInTicks
)
) {
/* ready! - notify listener */
Expand Down Expand Up @@ -419,6 +446,9 @@ trait SchedulerHelper extends SimonaActorLogging {
stateData.trigger.awaitingResponseMap.isEmpty && noScheduledTriggersForCurrentTick(
stateData.trigger.triggerQueue,
stateData.time.nowInTicks
) && noScheduledTriggersForCurrentTick(
stateData.trigger.priorityTriggerQueue,
stateData.time.nowInTicks
)
) {
finishSimulation(stateData)
Expand Down Expand Up @@ -451,6 +481,9 @@ trait SchedulerHelper extends SimonaActorLogging {
stateData.trigger.triggerQueue.allValues.foreach(trig =>
context.unwatch(trig.agent)
)
stateData.trigger.priorityTriggerQueue.allValues.foreach(trig =>
context.unwatch(trig.agent)
)

/* notify listeners */
notifyListener(
Expand Down Expand Up @@ -532,7 +565,7 @@ trait SchedulerHelper extends SimonaActorLogging {
if (stateData.runtime.initComplete && stateData.runtime.scheduleStarted)
doSimStep(stateData)
else if (!stateData.runtime.initComplete && stateData.runtime.initStarted)
sendEligibleTrigger(stateData)
doSimStep(stateData)
else
stateData
}
Expand Down Expand Up @@ -631,10 +664,14 @@ trait SchedulerHelper extends SimonaActorLogging {
/* we are @ the init tick (SimonaSim#initTick), if no init triggers are in the queue and in the await map
* anymore we're done with the init process, hence we check for this case */
val initDone =
isInitDone(updatedAwaitingResponseMap, triggerData.triggerQueue)
isInitDone(
updatedAwaitingResponseMap,
triggerData.triggerQueue,
triggerData.priorityTriggerQueue
)

/* steps to be carried out when init is done */
val updateTime = if (initDone) {
val updatedStateData = if (initDone) {
val initDuration = calcDuration(
stateData.time.initStartTime.toDouble
)
Expand All @@ -647,20 +684,23 @@ trait SchedulerHelper extends SimonaActorLogging {
self ! StartScheduleMessage()

/* Advance time, if init is done */
stateData.time.copy(nowInTicks = stateData.time.nowInTicks + 1)
stateData.copy(
time =
stateData.time.copy(nowInTicks = stateData.time.nowInTicks + 1),
runtime = stateData.runtime.copy(priorityPhase = true)
)
} else {
stateData.time
stateData
}

// set initComplete to initDone value
stateData.copy(
runtime = stateData.runtime.copy(initComplete = initDone),
updatedStateData.copy(
runtime = updatedStateData.runtime.copy(initComplete = initDone),
trigger = triggerData.copy(
awaitingResponseMap = updatedAwaitingResponseMap,
triggerIdToScheduledTriggerMap =
updatedTriggerIdToScheduledTriggerMap
),
time = updateTime
)
)
}
}
Expand Down Expand Up @@ -729,10 +769,14 @@ trait SchedulerHelper extends SimonaActorLogging {
*/
private def isInitDone(
awaitingResponseMap: CountingMap[Long],
triggerQueue: PriorityMultiQueue[Long, ScheduledTrigger]
triggerQueue: PriorityMultiQueue[Long, ScheduledTrigger],
priorityTriggerQueue: PriorityMultiQueue[Long, ScheduledTrigger]
): Boolean =
!awaitingResponseMap.contains(SimonaConstants.INIT_SIM_TICK) &&
!triggerQueue.headKeyOption.contains(SimonaConstants.INIT_SIM_TICK)
!triggerQueue.headKeyOption.contains(SimonaConstants.INIT_SIM_TICK) &&
!priorityTriggerQueue.headKeyOption.contains(
SimonaConstants.INIT_SIM_TICK
)

/** Calculate the duration between a given start time and the current system
* time in milliseconds
Expand Down Expand Up @@ -764,6 +808,7 @@ trait SchedulerHelper extends SimonaActorLogging {
scheduleTrigger(
triggerMessage.trigger,
triggerMessage.actorToBeScheduled,
triggerMessage.priority,
stateData
)

Expand All @@ -782,6 +827,7 @@ trait SchedulerHelper extends SimonaActorLogging {
protected final def scheduleTrigger(
trigger: Trigger,
actorToBeScheduled: ActorRef,
priority: Boolean,
stateData: SchedulerStateData
): SchedulerStateData = {

Expand All @@ -808,8 +854,14 @@ trait SchedulerHelper extends SimonaActorLogging {
actorToBeScheduled
)

val queue =
if (priority)
stateData.trigger.priorityTriggerQueue
else
stateData.trigger.triggerQueue

/* update trigger queue */
stateData.trigger.triggerQueue.add(
queue.add(
trigger.tick,
ScheduledTrigger(
triggerWithIdMessage,
Expand Down
19 changes: 15 additions & 4 deletions src/main/scala/edu/ie3/simona/scheduler/SimScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,26 @@ class SimScheduler(
// set init sender
val startSender = sender()

// if there's no prio triggers upfront, go to regular triggers
val updatedStateData =
if (stateData.trigger.priorityTriggerQueue.isEmpty)
stateData.copy(
stateData.runtime.copy(
priorityPhase = false
)
)
else
stateData

// initializing process
val initStartTime = System.nanoTime
sendEligibleTrigger(stateData)
sendEligibleTrigger(updatedStateData)

context become schedulerReceive(
stateData.copy(
runtime = stateData.runtime
updatedStateData.copy(
runtime = updatedStateData.runtime
.copy(initStarted = true, initSender = startSender),
time = stateData.time.copy(
time = updatedStateData.time.copy(
initStartTime = initStartTime
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ object SimSchedulerStateData {
initStarted: Boolean = false,
scheduleStarted: Boolean = false,
initSender: ActorRef = Actor.noSender,
noOfFailedPF: Int = 0
noOfFailedPF: Int = 0,
priorityPhase: Boolean = true
)

/** Holds information about [[edu.ie3.simona.ontology.trigger.Trigger]] that
Expand All @@ -81,6 +82,8 @@ object SimSchedulerStateData {
triggerIdCounter: Int = 0,
triggerQueue: PriorityMultiQueue[Long, ScheduledTrigger] =
PriorityMultiQueue.empty[Long, ScheduledTrigger],
priorityTriggerQueue: PriorityMultiQueue[Long, ScheduledTrigger] =
PriorityMultiQueue.empty,
triggerIdToScheduledTriggerMap: mutable.Map[Long, ScheduledTrigger] =
mutable.Map.empty[Long, ScheduledTrigger],
awaitingResponseMap: CountingMap[Long] = CountingMap.empty[Long]
Expand Down
3 changes: 2 additions & 1 deletion src/main/scala/edu/ie3/simona/sim/SimonaSim.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ class SimonaSim(simonaSetup: SimonaSetup)
case (actor, initTrigger) =>
scheduler ! ScheduleTriggerMessage(
initTrigger,
actor
actor,
priority = true
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ class DBFSAlgorithmSupGridSpec
Seq(
ScheduleTriggerMessage(
StartGridSimulationTrigger(3600),
_,
_
)
)
Expand All @@ -223,7 +224,7 @@ class DBFSAlgorithmSupGridSpec
3,
Some(
Seq(
ScheduleTriggerMessage(ActivityStartTrigger(7200), _)
ScheduleTriggerMessage(ActivityStartTrigger(7200), _, _)
)
)
) =>
Expand Down Expand Up @@ -369,6 +370,7 @@ class DBFSAlgorithmSupGridSpec
Seq(
ScheduleTriggerMessage(
StartGridSimulationTrigger(3600),
_,
_
)
)
Expand All @@ -380,7 +382,7 @@ class DBFSAlgorithmSupGridSpec
_,
Some(
Seq(
ScheduleTriggerMessage(ActivityStartTrigger(7200), _)
ScheduleTriggerMessage(ActivityStartTrigger(7200), _, _)
)
)
) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ class ParticipantAgentExternalSourceSpec
triggers.exists {
case ScheduleTriggerMessage(
ActivityStartTrigger(tick),
actorToBeScheduled
actorToBeScheduled,
_
) =>
tick == 4711L && actorToBeScheduled == mockAgent
} shouldBe true
Expand Down
Loading