Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge batcher for flat container without key and value #547

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 21 additions & 26 deletions src/trace/implementations/merge_batcher_flat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::marker::PhantomData;
use timely::progress::frontier::{Antichain, AntichainRef};
use timely::{Data, PartialOrder};
use timely::container::flatcontainer::{Push, FlatStack, Region, ReserveItems};
use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion};
use timely::container::flatcontainer::impls::tuple::TupleABCRegion;

use crate::difference::{IsZero, Semigroup};
use crate::trace::implementations::merge_batcher::Merger;
Expand Down Expand Up @@ -56,10 +56,8 @@ impl<MC: Region> FlatcontainerMerger<MC> {

/// Behavior to dissect items of chunks in the merge batcher
pub trait MergerChunk: Region {
/// The key of the update
type Key<'a>: Ord where Self: 'a;
/// The value of the update
type Val<'a>: Ord where Self: 'a;
/// The data portion of the update
type Data<'a>: Ord where Self: 'a;
/// The time of the update
type Time<'a>: Ord where Self: 'a;
/// The owned time type.
Expand All @@ -70,28 +68,25 @@ pub trait MergerChunk: Region {
type DiffOwned;

/// Split a read item into its constituents. Must be cheap.
fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>);
fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Data<'a>, Self::Time<'a>, Self::Diff<'a>);
}

impl<K,V,T,R> MergerChunk for TupleABCRegion<TupleABRegion<K, V>, T, R>
impl<D,T,R> MergerChunk for TupleABCRegion<D, T, R>
where
K: Region,
for<'a> K::ReadItem<'a>: Ord,
V: Region,
for<'a> V::ReadItem<'a>: Ord,
D: Region,
for<'a> D::ReadItem<'a>: Ord,
T: Region,
for<'a> T::ReadItem<'a>: Ord,
R: Region,
{
type Key<'a> = K::ReadItem<'a> where Self: 'a;
type Val<'a> = V::ReadItem<'a> where Self: 'a;
type Data<'a> = D::ReadItem<'a> where Self: 'a;
type Time<'a> = T::ReadItem<'a> where Self: 'a;
type TimeOwned = T::Owned;
type Diff<'a> = R::ReadItem<'a> where Self: 'a;
type DiffOwned = R::Owned;

fn into_parts<'a>(((key, val), time, diff): Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) {
(key, val, time, diff)
fn into_parts<'a>((data, time, diff): Self::ReadItem<'a>) -> (Self::Data<'a>, Self::Time<'a>, Self::Diff<'a>) {
(data, time, diff)
}
}

Expand All @@ -100,8 +95,8 @@ where
for<'a> MC: MergerChunk + Clone + 'static
+ ReserveItems<<MC as Region>::ReadItem<'a>>
+ Push<<MC as Region>::ReadItem<'a>>
+ Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, &'a MC::DiffOwned)>
+ Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, MC::Diff<'a>)>,
+ Push<(MC::Data<'a>, MC::Time<'a>, &'a MC::DiffOwned)>
+ Push<(MC::Data<'a>, MC::Time<'a>, MC::Diff<'a>)>,
for<'a> MC::Time<'a>: PartialOrder<MC::TimeOwned> + Copy + IntoOwned<'a, Owned=MC::TimeOwned>,
for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>,
for<'a> MC::TimeOwned: Ord + PartialOrder + PartialOrder<MC::Time<'a>> + Data,
Expand All @@ -125,9 +120,9 @@ where
while !head1.is_empty() && !head2.is_empty() {
while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() {
let cmp = {
let (key1, val1, time1, _diff) = MC::into_parts(head1.peek());
let (key2, val2, time2, _diff) = MC::into_parts(head2.peek());
((key1, val1), time1).cmp(&((key2, val2), time2))
let (data1, time1, _diff) = MC::into_parts(head1.peek());
let (data2, time2, _diff) = MC::into_parts(head2.peek());
(data1, time1).cmp(&(data2, time2))
};
// TODO: The following less/greater branches could plausibly be a good moment for
// `copy_range`, on account of runs of records that might benefit more from a
Expand All @@ -140,12 +135,12 @@ where
result.copy(head2.pop());
}
Ordering::Equal => {
let (key, val, time1, diff1) = MC::into_parts(head1.pop());
let (_key, _val, _time2, diff2) = MC::into_parts(head2.pop());
let (data, time1, diff1) = MC::into_parts(head1.pop());
let (_data, _time2, diff2) = MC::into_parts(head2.pop());
diff1.clone_onto(&mut diff);
diff.plus_equals(&diff2);
if !diff.is_zero() {
result.copy(((key, val), time1, &diff));
result.copy((data, time1, &diff));
}
}
}
Expand Down Expand Up @@ -212,20 +207,20 @@ where
let mut ready = self.empty(stash);

for buffer in merged {
for (key, val, time, diff) in buffer.iter().map(MC::into_parts) {
for (data, time, diff) in buffer.iter().map(MC::into_parts) {
if upper.less_equal(&time) {
frontier.insert_with(&time, |time| (*time).into_owned());
if keep.len() == keep.capacity() && !keep.is_empty() {
kept.push(keep);
keep = self.empty(stash);
}
keep.copy(((key, val), time, diff));
keep.copy((data, time, diff));
} else {
if ready.len() == ready.capacity() && !ready.is_empty() {
readied.push(ready);
ready = self.empty(stash);
}
ready.copy(((key, val), time, diff));
ready.copy((data, time, diff));
}
}
// Recycling buffer.
Expand Down
Loading