diff --git a/server/src/main.rs b/server/src/main.rs index e9802f4..1443d71 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -30,7 +30,7 @@ use slab::Slab; use ws::connection::{ConnEvent, Connection}; -use declarative_dataflow::server::{Config, CreateAttribute, Request, Server, TxId}; +use declarative_dataflow::server::{scheduler, Config, CreateAttribute, Request, Server, TxId}; use declarative_dataflow::sinks::{Sinkable, SinkingContext}; use declarative_dataflow::timestamp::Coarsen; use declarative_dataflow::{Eid, Error, Output, ResultDiff}; @@ -71,6 +71,12 @@ fn main() { let mut opts = Options::new(); opts.optopt("", "port", "server port", "PORT"); + opts.optopt( + "", + "tick", + "advance domain at a regular interval", + "SECONDS", + ); opts.optflag( "", "manual-advance", @@ -94,11 +100,16 @@ fn main() { Ok(matches) => { let starting_port = matches .opt_str("port") - .map(|x| x.parse().unwrap_or(default_config.port)) + .map(|x| x.parse().expect("failed to parse port")) .unwrap_or(default_config.port); + let tick: Option = matches + .opt_str("tick") + .map(|x| Duration::from_secs(x.parse().expect("failed to parse tick duration"))); + Config { port: starting_port + (worker.index() as u16), + tick, manual_advance: matches.opt_present("manual-advance"), enable_logging: matches.opt_present("enable-logging"), enable_optimizer: matches.opt_present("enable-optimizer"), @@ -130,6 +141,16 @@ fn main() { let mut sequencer: Sequencer = Sequencer::preloaded(worker, Instant::now(), VecDeque::from(vec![preload_command])); + // Kickoff ticking, if configured. We only want to issue ticks + // from a single worker, to avoid redundant ticking. + if worker.index() == 0 && config.tick.is_some() { + sequencer.push(Command { + owner: 0, + client: SYSTEM.0, + requests: vec![Request::Tick], + }); + } + // configure websocket server let ws_settings = ws::Settings { max_connections: 1024, @@ -196,7 +217,17 @@ fn main() { if server.scheduler.borrow().has_pending() { let mut scheduler = server.scheduler.borrow_mut(); while let Some(activator) = scheduler.next() { - activator.activate(); + if let Some(event) = activator.schedule() { + match event { + scheduler::Event::Tick => { + sequencer.push(Command { + owner: worker.index(), + client: SYSTEM.0, + requests: vec![Request::Tick], + }); + } + } + } } } else { // @TODO in blocking mode, we could check whether @@ -676,15 +707,18 @@ fn main() { Request::Disconnect => server.disconnect_client(Token(command.client)), Request::Setup => unimplemented!(), Request::Tick => { - #[cfg(all(not(feature = "real-time"), not(feature = "bitemporal")))] - let next = next_tx as u64; - #[cfg(feature = "real-time")] - let next = Instant::now().duration_since(worker.timer()); - #[cfg(feature = "bitemporal")] - let next = Pair::new(Instant::now().duration_since(worker.timer()), next_tx as u64); - - if let Err(error) = server.context.internal.advance_epoch(next) { - send_results.send(Output::Error(client, error, last_tx)).unwrap(); + // We don't actually have to do any actual worker here, because we are + // ticking the domain on each command anyways. We do have to schedule + // the next tick, however. + + // We only want to issue ticks from a single worker, to avoid + // redundant ticking. + if worker.index() == 0 { + if let Some(tick) = config.tick { + let interval_end = Instant::now().duration_since(worker.timer()).coarsen(&tick); + let at = worker.timer() + interval_end; + server.scheduler.borrow_mut().event_at(at, scheduler::Event::Tick); + } } } Request::Status => { diff --git a/src/server/mod.rs b/src/server/mod.rs index 125a09b..4fb43c7 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -35,6 +35,8 @@ use self::scheduler::Scheduler; pub struct Config { /// Port at which this server will listen at. pub port: u16, + /// Automatic domain tick interval. + pub tick: Option, /// Do clients have to call AdvanceDomain explicitely? pub manual_advance: bool, /// Should logging streams be created? @@ -49,6 +51,7 @@ impl Default for Config { fn default() -> Config { Config { port: 6262, + tick: None, manual_advance: false, enable_logging: false, enable_optimizer: false, diff --git a/src/server/scheduler.rs b/src/server/scheduler.rs index d652a7d..f08036b 100644 --- a/src/server/scheduler.rs +++ b/src/server/scheduler.rs @@ -2,10 +2,10 @@ use std::cmp::Ordering; use std::collections::BinaryHeap; -use std::rc::{Rc, Weak}; +use std::rc::Weak; use std::time::{Duration, Instant}; -use timely::scheduling::activate::Activator; +use timely::scheduling::Activator; /// A scheduler allows polling sources to defer triggering their /// activators, in case they do not have work available. This reduces @@ -30,22 +30,18 @@ impl Scheduler { /// scheduled. pub fn has_pending(&self) -> bool { if let Some(ref timed_activator) = self.activator_queue.peek() { - Instant::now() >= timed_activator.at + timed_activator.is_ready() } else { false } } - /// Returns the duration until the next activation in `activator_queue` - /// from `now()`. If no activations are present returns None. + + /// Returns the duration until the next activation in + /// `activator_queue` from `now()`. If no activations are present + /// returns None. pub fn until_next(&self) -> Option { if let Some(ref timed_activator) = self.activator_queue.peek() { - // timed_activator.at.check_duration_since(Instant::now()).unwrap_or(Duration::from_millies(0)) - let now = Instant::now(); - if timed_activator.at > now { - Some(timed_activator.at.duration_since(now)) - } else { - Some(Duration::from_millis(0)) - } + Some(timed_activator.until_ready()) } else { None } @@ -54,7 +50,11 @@ impl Scheduler { /// Schedule activation at the specified instant. No hard /// guarantees on when the activator will actually be triggered. pub fn schedule_at(&mut self, at: Instant, activator: Weak) { - self.activator_queue.push(TimedActivator { at, activator }); + self.activator_queue.push(TimedActivator { + at, + activator, + event: None, + }); } /// Schedule activation now. No hard guarantees on when the @@ -68,25 +68,72 @@ impl Scheduler { pub fn schedule_after(&mut self, after: Duration, activator: Weak) { self.schedule_at(Instant::now() + after, activator); } + + /// Schedule an event at the specified instant. No hard guarantees + /// on when the activator will actually be triggered. + pub fn event_at(&mut self, at: Instant, event: Event) { + self.activator_queue.push(TimedActivator { + at, + activator: Weak::new(), + event: Some(event), + }); + } + + /// Schedule an event after the specified duration. No hard + /// guarantees on when the activator will actually be triggered. + pub fn event_after(&mut self, after: Duration, event: Event) { + self.event_at(Instant::now() + after, event); + } } impl Iterator for Scheduler { - type Item = Rc; - fn next(&mut self) -> Option> { + type Item = TimedActivator; + fn next(&mut self) -> Option { if self.has_pending() { - match self.activator_queue.pop().unwrap().activator.upgrade() { - None => self.next(), - Some(activator) => Some(activator), - } + Some(self.activator_queue.pop().unwrap()) } else { None } } } -struct TimedActivator { - pub at: Instant, - pub activator: Weak, +/// A set of things that we might want to schedule. +#[derive(PartialEq, Eq, Debug)] +pub enum Event { + /// A domain tick. + Tick, +} + +/// A thing that can be scheduled at an instant. Scheduling this +/// activator might result in an `Event`. +pub struct TimedActivator { + at: Instant, + activator: Weak, + event: Option, +} + +impl TimedActivator { + fn is_ready(&self) -> bool { + Instant::now() >= self.at + } + + fn until_ready(&self) -> Duration { + let now = Instant::now(); + if self.at > now { + self.at.duration_since(now) + } else { + Duration::from_millis(0) + } + } + + /// Trigger the activation, potentially resulting in an event. + pub fn schedule(self) -> Option { + if let Some(activator) = self.activator.upgrade() { + activator.activate(); + } + + self.event + } } // We want the activator_queue to act like a min-heap. diff --git a/tests/scheduling_test.rs b/tests/scheduling_test.rs new file mode 100644 index 0000000..9701b4f --- /dev/null +++ b/tests/scheduling_test.rs @@ -0,0 +1,32 @@ +use declarative_dataflow::server::scheduler::{Event, Scheduler}; +use std::time::Duration; + +#[test] +fn test_schedule_now() { + let mut scheduler = Scheduler::new(); + + assert!(!scheduler.has_pending()); + assert!(scheduler.until_next().is_none()); + + scheduler.event_after(Duration::from_secs(0), Event::Tick); + + assert!(scheduler.has_pending()); + assert_eq!(scheduler.next().unwrap().schedule(), Some(Event::Tick)); +} + +#[test] +fn test_schedule_after() { + let mut scheduler = Scheduler::new(); + + scheduler.event_after(Duration::from_secs(2), Event::Tick); + + assert!(!scheduler.has_pending()); + assert!(scheduler.next().is_none()); + assert!(scheduler.until_next().is_some()); + + std::thread::sleep(scheduler.until_next().unwrap()); + + assert!(scheduler.has_pending()); + assert_eq!(scheduler.next().unwrap().schedule(), Some(Event::Tick)); + assert!(scheduler.until_next().is_none()); +}