Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Domains first #88

Merged
merged 13 commits into from
Nov 7, 2019
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/target
/experiments/target
/data

.DS_Store
Expand Down
4 changes: 0 additions & 4 deletions cli/examples/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
"input_semantics": "Distinct",
"query_support": "Basic",
"index_direction": "Forward",
"timeless": false
}
}
},
Expand All @@ -29,7 +28,6 @@
"input_semantics": "Distinct",
"query_support": "Basic",
"index_direction": "Forward",
"timeless": false
}
}
},
Expand All @@ -46,7 +44,6 @@
"input_semantics": "Distinct",
"query_support": "Basic",
"index_direction": "Forward",
"timeless": false
}
}
},
Expand All @@ -63,7 +60,6 @@
"input_semantics": "Distinct",
"query_support": "Basic",
"index_direction": "Forward",
"timeless": false
}
}
}
Expand Down
4 changes: 1 addition & 3 deletions experiments/src/bin/hector_wco.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ fn main() {

worker.dataflow::<u64, _, _>(|scope| {
server
.context
.internal
.create_transactable_attribute("edge", AttributeConfig::tx_time(InputSemantics::Raw), scope)
.create_attribute(scope, "edge", AttributeConfig::tx_time(InputSemantics::Raw))
.unwrap();

server
Expand Down
61 changes: 58 additions & 3 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,61 @@ fn main() {

let result = match req {
Request::Transact(req) => server.transact(req, owner, worker.index()),
Request::Subscribe(aid) => {
let interests = server.interests
.entry(aid.clone())
.or_insert_with(HashSet::new);

// All workers keep track of every client's interests, s.t. they
// know when to clean up unused dataflows.
interests.insert(Token(client));

if interests.len() > 1 {
// We only want to setup the dataflow on
// the first interest.
Ok(())
} else {
let send_results = io.send.clone();

let result = worker.dataflow::<T, _, _>(|scope| {
let (propose, shutdown) = server
.internal
.forward_propose(&aid)
.unwrap()
.import_frontier(scope, &aid);

// @TODO stash this somewhere
std::mem::forget(shutdown);

let pact = Exchange::new(move |_| owner as u64);

propose
.as_collection(|e, v| vec![e.clone(), v.clone()])
.inner
.unary(pact, "Subscription", move |_cap, _info| {
move |input, _output: &mut OutputHandle<_, ResultDiff<T>, _>| {
// Due to the exchange pact, this closure is only
// executed by the owning worker.

input.for_each(|_time, data| {
let data = data.iter()
.map(|(tuple, t, diff)| (tuple.clone(), t.clone().into(), *diff))
.collect::<Vec<ResultDiff<Time>>>();

send_results
.send(Output::QueryDiff(aid.clone(), data))
.expect("internal channel send failed");
});
}
})
.probe_with(&mut server.probe);

Ok(())
});

result
}
}
Request::Interest(req) => {
let interests = server.interests
.entry(req.name.clone())
Expand Down Expand Up @@ -508,7 +563,7 @@ fn main() {
})
}
Request::AdvanceDomain(name, next) => server.advance_domain(name, next.into()),
Request::CloseInput(name) => server.context.internal.close_input(name),
Request::CloseInput(name) => server.internal.close_input(name),
Request::Disconnect => server.disconnect_client(Token(command.client)),
Request::Setup => unimplemented!(),
Request::Tick => {
Expand Down Expand Up @@ -557,7 +612,7 @@ fn main() {
#[cfg(feature = "bitemporal")]
let next = Pair::new(Instant::now().duration_since(worker.timer()), next_tx as u64);

server.context.internal.advance_epoch(next).expect("failed to advance epoch");
server.internal.advance_epoch(next).expect("failed to advance epoch");
}
}

Expand All @@ -574,7 +629,7 @@ fn main() {
// might take a decent amount of time, in case traces get
// compacted. If that happens, we can park less before
// scheduling the next activator.
server.context.internal.advance().expect("failed to advance domain");
server.internal.advance().expect("failed to advance domain");

// Finally, we give the CPU a chance to chill, if no work
// remains.
Expand Down
Loading