Skip to content

Commit

Permalink
Generalize scheduling, add tick option
Browse files Browse the repository at this point in the history
  • Loading branch information
comnik committed May 4, 2019
1 parent 560a933 commit b17c3e6
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 34 deletions.
58 changes: 46 additions & 12 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use slab::Slab;

use ws::connection::{ConnEvent, Connection};

use declarative_dataflow::server::{Config, CreateAttribute, Request, Server, TxId};
use declarative_dataflow::server::{scheduler, Config, CreateAttribute, Request, Server, TxId};
use declarative_dataflow::sinks::{Sinkable, SinkingContext};
use declarative_dataflow::timestamp::Coarsen;
use declarative_dataflow::{Eid, Error, Output, ResultDiff};
Expand Down Expand Up @@ -71,6 +71,12 @@ fn main() {

let mut opts = Options::new();
opts.optopt("", "port", "server port", "PORT");
opts.optopt(
"",
"tick",
"advance domain at a regular interval",
"SECONDS",
);
opts.optflag(
"",
"manual-advance",
Expand All @@ -94,11 +100,16 @@ fn main() {
Ok(matches) => {
let starting_port = matches
.opt_str("port")
.map(|x| x.parse().unwrap_or(default_config.port))
.map(|x| x.parse().expect("failed to parse port"))
.unwrap_or(default_config.port);

let tick: Option<Duration> = matches
.opt_str("tick")
.map(|x| Duration::from_secs(x.parse().expect("failed to parse tick duration")));

Config {
port: starting_port + (worker.index() as u16),
tick,
manual_advance: matches.opt_present("manual-advance"),
enable_logging: matches.opt_present("enable-logging"),
enable_optimizer: matches.opt_present("enable-optimizer"),
Expand Down Expand Up @@ -130,6 +141,16 @@ fn main() {
let mut sequencer: Sequencer<Command> =
Sequencer::preloaded(worker, Instant::now(), VecDeque::from(vec![preload_command]));

// Kickoff ticking, if configured. We only want to issue ticks
// from a single worker, to avoid redundant ticking.
if worker.index() == 0 && config.tick.is_some() {
sequencer.push(Command {
owner: 0,
client: SYSTEM.0,
requests: vec![Request::Tick],
});
}

// configure websocket server
let ws_settings = ws::Settings {
max_connections: 1024,
Expand Down Expand Up @@ -196,7 +217,17 @@ fn main() {
if server.scheduler.borrow().has_pending() {
let mut scheduler = server.scheduler.borrow_mut();
while let Some(activator) = scheduler.next() {
activator.activate();
if let Some(event) = activator.schedule() {
match event {
scheduler::Event::Tick => {
sequencer.push(Command {
owner: worker.index(),
client: SYSTEM.0,
requests: vec![Request::Tick],
});
}
}
}
}
} else {
// @TODO in blocking mode, we could check whether
Expand Down Expand Up @@ -676,15 +707,18 @@ fn main() {
Request::Disconnect => server.disconnect_client(Token(command.client)),
Request::Setup => unimplemented!(),
Request::Tick => {
#[cfg(all(not(feature = "real-time"), not(feature = "bitemporal")))]
let next = next_tx as u64;
#[cfg(feature = "real-time")]
let next = Instant::now().duration_since(worker.timer());
#[cfg(feature = "bitemporal")]
let next = Pair::new(Instant::now().duration_since(worker.timer()), next_tx as u64);

if let Err(error) = server.context.internal.advance_epoch(next) {
send_results.send(Output::Error(client, error, last_tx)).unwrap();
// We don't actually have to do any actual worker here, because we are
// ticking the domain on each command anyways. We do have to schedule
// the next tick, however.

// We only want to issue ticks from a single worker, to avoid
// redundant ticking.
if worker.index() == 0 {
if let Some(tick) = config.tick {
let interval_end = Instant::now().duration_since(worker.timer()).coarsen(&tick);
let at = worker.timer() + interval_end;
server.scheduler.borrow_mut().event_at(at, scheduler::Event::Tick);
}
}
}
Request::Status => {
Expand Down
3 changes: 3 additions & 0 deletions src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ use self::scheduler::Scheduler;
pub struct Config {
/// Port at which this server will listen at.
pub port: u16,
/// Automatic domain tick interval.
pub tick: Option<Duration>,
/// Do clients have to call AdvanceDomain explicitely?
pub manual_advance: bool,
/// Should logging streams be created?
Expand All @@ -49,6 +51,7 @@ impl Default for Config {
fn default() -> Config {
Config {
port: 6262,
tick: None,
manual_advance: false,
enable_logging: false,
enable_optimizer: false,
Expand Down
91 changes: 69 additions & 22 deletions src/server/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::rc::{Rc, Weak};
use std::rc::Weak;
use std::time::{Duration, Instant};

use timely::scheduling::activate::Activator;
use timely::scheduling::Activator;

/// A scheduler allows polling sources to defer triggering their
/// activators, in case they do not have work available. This reduces
Expand All @@ -30,22 +30,18 @@ impl Scheduler {
/// scheduled.
pub fn has_pending(&self) -> bool {
if let Some(ref timed_activator) = self.activator_queue.peek() {
Instant::now() >= timed_activator.at
timed_activator.is_ready()
} else {
false
}
}
/// Returns the duration until the next activation in `activator_queue`
/// from `now()`. If no activations are present returns None.

/// Returns the duration until the next activation in
/// `activator_queue` from `now()`. If no activations are present
/// returns None.
pub fn until_next(&self) -> Option<Duration> {
if let Some(ref timed_activator) = self.activator_queue.peek() {
// timed_activator.at.check_duration_since(Instant::now()).unwrap_or(Duration::from_millies(0))
let now = Instant::now();
if timed_activator.at > now {
Some(timed_activator.at.duration_since(now))
} else {
Some(Duration::from_millis(0))
}
Some(timed_activator.until_ready())
} else {
None
}
Expand All @@ -54,7 +50,11 @@ impl Scheduler {
/// Schedule activation at the specified instant. No hard
/// guarantees on when the activator will actually be triggered.
pub fn schedule_at(&mut self, at: Instant, activator: Weak<Activator>) {
self.activator_queue.push(TimedActivator { at, activator });
self.activator_queue.push(TimedActivator {
at,
activator,
event: None,
});
}

/// Schedule activation now. No hard guarantees on when the
Expand All @@ -68,25 +68,72 @@ impl Scheduler {
pub fn schedule_after(&mut self, after: Duration, activator: Weak<Activator>) {
self.schedule_at(Instant::now() + after, activator);
}

/// Schedule an event at the specified instant. No hard guarantees
/// on when the activator will actually be triggered.
pub fn event_at(&mut self, at: Instant, event: Event) {
self.activator_queue.push(TimedActivator {
at,
activator: Weak::new(),
event: Some(event),
});
}

/// Schedule an event after the specified duration. No hard
/// guarantees on when the activator will actually be triggered.
pub fn event_after(&mut self, after: Duration, event: Event) {
self.event_at(Instant::now() + after, event);
}
}

impl Iterator for Scheduler {
type Item = Rc<Activator>;
fn next(&mut self) -> Option<Rc<Activator>> {
type Item = TimedActivator;
fn next(&mut self) -> Option<TimedActivator> {
if self.has_pending() {
match self.activator_queue.pop().unwrap().activator.upgrade() {
None => self.next(),
Some(activator) => Some(activator),
}
Some(self.activator_queue.pop().unwrap())
} else {
None
}
}
}

struct TimedActivator {
pub at: Instant,
pub activator: Weak<Activator>,
/// A set of things that we might want to schedule.
#[derive(PartialEq, Eq, Debug)]
pub enum Event {
/// A domain tick.
Tick,
}

/// A thing that can be scheduled at an instant. Scheduling this
/// activator might result in an `Event`.
pub struct TimedActivator {
at: Instant,
activator: Weak<Activator>,
event: Option<Event>,
}

impl TimedActivator {
fn is_ready(&self) -> bool {
Instant::now() >= self.at
}

fn until_ready(&self) -> Duration {
let now = Instant::now();
if self.at > now {
self.at.duration_since(now)
} else {
Duration::from_millis(0)
}
}

/// Trigger the activation, potentially resulting in an event.
pub fn schedule(self) -> Option<Event> {
if let Some(activator) = self.activator.upgrade() {
activator.activate();
}

self.event
}
}

// We want the activator_queue to act like a min-heap.
Expand Down
32 changes: 32 additions & 0 deletions tests/scheduling_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
use declarative_dataflow::server::scheduler::{Event, Scheduler};
use std::time::Duration;

#[test]
fn test_schedule_now() {
let mut scheduler = Scheduler::new();

assert!(!scheduler.has_pending());
assert!(scheduler.until_next().is_none());

scheduler.event_after(Duration::from_secs(0), Event::Tick);

assert!(scheduler.has_pending());
assert_eq!(scheduler.next().unwrap().schedule(), Some(Event::Tick));
}

#[test]
fn test_schedule_after() {
let mut scheduler = Scheduler::new();

scheduler.event_after(Duration::from_secs(2), Event::Tick);

assert!(!scheduler.has_pending());
assert!(scheduler.next().is_none());
assert!(scheduler.until_next().is_some());

std::thread::sleep(scheduler.until_next().unwrap());

assert!(scheduler.has_pending());
assert_eq!(scheduler.next().unwrap().schedule(), Some(Event::Tick));
assert!(scheduler.until_next().is_none());
}

0 comments on commit b17c3e6

Please sign in to comment.