Skip to content

Commit

Permalink
Clean-up event queue merging.
Browse files Browse the repository at this point in the history
  • Loading branch information
thorstenhater committed Jan 7, 2025
1 parent a029a33 commit 734055a
Showing 1 changed file with 50 additions and 53 deletions.
103 changes: 50 additions & 53 deletions arbor/communication/communicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,38 +251,11 @@ void communicator::set_remote_spike_filter(const spike_predicate& p) { remote_sp
void communicator::remote_ctrl_send_continue(const epoch& e) { ctx_->distributed->remote_ctrl_send_continue(e); }
void communicator::remote_ctrl_send_done() { ctx_->distributed->remote_ctrl_send_done(); }

// Given
// * a set of connections and an index into the set
// * a range of spikes
// * an output queue,
// append events for that sub-range of spikes to the
// queue that has the same source as the connection
// at index.
template<typename It>
void enqueue_from_source(const communicator::connection_list& cons,
const size_t idx,
It& spk,
const It end,
std::vector<pse_vector>& out) {
// const refs to connection.
auto src = cons.srcs[idx];
auto dst = cons.dests[idx];
auto del = cons.delays[idx];
auto wgt = cons.weights[idx];
auto dom = cons.idx_on_domain[idx];
auto& que = out[dom];
for (; spk != end && spk->source == src; ++spk) {
que.emplace_back(dst, spk->time + del, wgt);
}
}

// Internal helper to append to the event queues
template<typename S>
void append_events_from_domain(const communicator::connection_list& cons, size_t cn, const size_t ce,
const S& spks,
std::vector<pse_vector>& queues) {
auto sp = spks.begin(), se = spks.end();
if (se == sp) return;
// We have a choice of whether to walk spikes or connections:
// i.e., we can iterate over the spikes, and for each spike search
// the for connections that have the same source; or alternatively
Expand All @@ -292,37 +265,61 @@ void append_events_from_domain(const communicator::connection_list& cons, size_t
// We iterate over whichever set is the smallest, which has
// complexity of order max(S log(C), C log(S)), where S is the
// number of spikes, and C is the number of connections.
if (cons.size() < spks.size()) {
for (; sp != se && cn < ce; ++cn) {
// sp is now the beginning of a range of spikes from the same
// source.
sp = std::lower_bound(sp, se,
cons.srcs[cn],
[](const auto& spk, const auto& src) { return spk.source < src; });
// now, sp is at the end of the equal source range.
enqueue_from_source(cons, cn, sp, se, queues);
// Thus the whole algorithm has O(min(S, C) log max(S, C))
while (sp < se && cn < ce) {
if ((ce - cn) < size_t(se - sp)) {
auto src = cons.srcs[cn];
// identify range of spikes to enqueue.
auto fst = sp;
if (fst->source != src) {
fst = std::lower_bound(sp, se,
src,
[](const auto& spk, const auto& src) { return spk.source < src; });
}
for (; cn < ce && cons.srcs[cn] == src; ++cn) {
auto dst = cons.dests[cn];
auto del = cons.delays[cn];
auto wgt = cons.weights[cn];
auto dom = cons.idx_on_domain[cn];
auto& que = queues[dom];
// Handle all connections with the same source
// scan the range of spikes, once per connection
for (sp = fst; sp < se && sp->source == src; ++sp) {
que.emplace_back(dst, sp->time + del, wgt);
}
}
// once we leave here, sp will be at the end of the eglible range
// and all connections with the same source will have been treated.
// so, we can just leave sp at this end.
}
}
else {
while (sp != se) {
auto beg = sp;
auto src = beg->source;
else { // less spikes than connections, so iterate spikes linearly and bsearch connections
auto spk = sp;
auto src = spk->source;
// Here, `cn` is the index of the first connection whose source
// is larger or equal to the spike's source. It may be `ce` if
// all elements compare < to spk.source.
cn = std::lower_bound(cons.srcs.begin() + cn,
cons.srcs.begin() + ce,
src)
- cons.srcs.begin();
for (; cn < ce && cons.srcs[cn] == src; ++cn) {
// Reset the spike iterator as we walk the same sub-range
// for each connection with the same source.
sp = beg;
// If we ever get multiple spikes from the same source, treat
// them all. This is mostly rare.
enqueue_from_source(cons, cn, sp, se, queues);
auto fst = cn;
if (cons.srcs[fst] != src) {
fst = std::lower_bound(cons.srcs.begin() + cn,
cons.srcs.begin() + ce,
src,
[](auto a, auto b) { return a < b; })
- cons.srcs.begin();
}
for (sp = spk; sp < se && sp->source == src; ++sp) {
for (cn = fst; cn < ce && cons.srcs[cn] == src; ++cn) {
auto dst = cons.dests[cn];
auto del = cons.delays[cn];
auto wgt = cons.weights[cn];
auto dom = cons.idx_on_domain[cn];
auto& que = queues[dom];
// If we ever get multiple spikes from the same source, treat
// them all. This is mostly rare.
// NB: Reset the spike iterator as we walk the same sub-range
// for each connection with the same source.
que.emplace_back(dst, sp->time + del, wgt);
}
}
while (sp != se && sp->source == src) ++sp;
}
}
}
Expand Down

0 comments on commit 734055a

Please sign in to comment.