Skip to content

Commit

Permalink
Consolidate produces batch of data
Browse files Browse the repository at this point in the history
Instead of flattening the outputs of consolidate, provide the output as a
whole batch, encoded as an `Rc<Vec<C>>`. This allows downstream operators
to act on a whole batch at once, without relying on undocumented properties
of how Timely channels behave. For operators that need to flatten the
output need slightly different logic.

An alternative approach would be to wrap the chain in a `Box`, but at this
time, Timely doesn't recognize `Box` as a valid container type.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jan 23, 2025
1 parent 60b23af commit 39597c4
Show file tree
Hide file tree
Showing 6 changed files with 66 additions and 76 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ incremental = true
# merged), after which point it becomes impossible to build that historical
# version of Materialize.
[patch.crates-io]
differential-dataflow = { path = "../differential-dataflow" }

# Waiting on https://github.com/sfackler/rust-postgres/pull/752.
postgres = { git = "https://github.com/MaterializeInc/rust-postgres" }
tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres" }
Expand Down
4 changes: 2 additions & 2 deletions src/compute/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ where
CB: ContainerBuilder,
L: Into<LogVariant>,
F: for<'a> FnMut(
<B::Output as Container>::Item<'a>,
<B::Output as Container>::ItemRef<'a>,
&mut PermutedRowPacker,
&mut OutputSession<CB>,
) + 'static,
Expand All @@ -251,7 +251,7 @@ where
move |input, output| {
while let Some((time, data)) = input.next() {
let mut session = output.session_with_builder(&time);
for item in data.drain() {
for item in data.drain().flat_map(|chunk| chunk.iter()) {
logic(item, &mut packer, &mut session);
}
}
Expand Down
40 changes: 15 additions & 25 deletions src/compute/src/sink/copy_to_s3_oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::any::Any;
use std::cell::RefCell;
use std::rc::Rc;

use differential_dataflow::{AsCollection, Collection, Hashable};
use differential_dataflow::{Collection, Hashable};
use mz_compute_client::protocol::response::CopyToResponse;
use mz_compute_types::dyncfgs::{
COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO, COPY_TO_S3_MULTIPART_PART_SIZE_BYTES,
Expand All @@ -23,7 +23,6 @@ use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::errors::DataflowError;
use mz_timely_util::operator::consolidate_pact;
use timely::dataflow::channels::pact::{Exchange, Pipeline};
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::Operator;
use timely::dataflow::Scope;
use timely::progress::Antichain;
Expand Down Expand Up @@ -63,32 +62,18 @@ where
// files based on the user provided `MAX_FILE_SIZE`.
let batch_count = self.output_batch_count;

// This relies on an assumption the output order after the Exchange is deterministic, which
// is necessary to ensure the files written from each compute replica are identical.
// While this is not technically guaranteed, the current implementation uses a FIFO channel.
// In the storage copy_to operator we assert the ordering of rows to detect any regressions.
let input = consolidate_pact::<KeyBatcher<_, _, _>, _, _>(
&sinked_collection
.map(move |row| {
let batch = row.hashed() % batch_count;
((row, batch), ())
})
.inner,
Exchange::new(move |(((_, batch), _), _, _)| *batch),
&sinked_collection.map(move |row| (row, ())).inner,
Exchange::new(move |((row, ()), _, _): &((Row, _), _, _)| row.hashed() % batch_count),
"Consolidated COPY TO S3 input",
);
// TODO: We're converting a stream of region-allocated data to a stream of vectors.
let input = input.map(Clone::clone).as_collection();

// We need to consolidate the error collection to ensure we don't act on retracted errors.
let error = consolidate_pact::<KeyBatcher<_, _, _>, _, _>(
&err_collection
.map(move |row| {
let batch = row.hashed() % batch_count;
((row, batch), ())
})
.inner,
Exchange::new(move |(((_, batch), _), _, _)| *batch),
&err_collection.map(move |err| (err, ())).inner,
Exchange::new(move |((err, _), _, _): &((DataflowError, _), _, _)| {
err.hashed() % batch_count
}),
"Consolidated COPY TO S3 errors",
);

Expand All @@ -104,9 +89,13 @@ where
while let Some((time, data)) = input.next() {
if !up_to.less_equal(time.time()) && !received_one {
received_one = true;
output
.session(&time)
.give_iterator(data.iter().next().cloned().into_iter());
output.session(&time).give_iterator(
data.iter()
.flat_map(|chunk| chunk.iter().cloned())
.next()
.map(|((err, ()), time, diff)| (err, time, diff))
.into_iter(),
);
}
}
}
Expand All @@ -132,6 +121,7 @@ where
self.connection_id,
params,
result_callback,
self.output_batch_count,
);

Some(token)
Expand Down
57 changes: 21 additions & 36 deletions src/storage-operators/src/s3_oneshot_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::rc::Rc;

use anyhow::anyhow;
use aws_types::sdk_config::SdkConfig;
use differential_dataflow::{Collection, Hashable};
use differential_dataflow::Hashable;
use futures::StreamExt;
use mz_ore::cast::CastFrom;
use mz_ore::error::ErrorExt;
Expand All @@ -31,9 +31,10 @@ use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
use mz_timely_util::builder_async::{
Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
};
use timely::container::columnation::TimelyStack;
use timely::dataflow::channels::pact::{Exchange, Pipeline};
use timely::dataflow::operators::Broadcast;
use timely::dataflow::{Scope, Stream};
use timely::dataflow::{Scope, Stream, StreamCore};
use timely::progress::Antichain;
use timely::PartialOrder;
use tracing::debug;
Expand All @@ -55,8 +56,8 @@ mod pgcopy;
///
/// Returns a token that should be held to keep the sink alive.
pub fn copy_to<G, F>(
input_collection: Collection<G, ((Row, u64), ()), Diff>,
err_stream: Stream<G, (((DataflowError, u64), ()), G::Timestamp, Diff)>,
input_collection: StreamCore<G, Rc<Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>>,
err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
up_to: Antichain<G::Timestamp>,
connection_details: S3UploadInfo,
connection_context: ConnectionContext,
Expand All @@ -65,6 +66,7 @@ pub fn copy_to<G, F>(
connection_id: CatalogItemId,
params: CopyToParameters,
worker_callback: F,
output_batch_count: u64,
) -> Rc<dyn Any>
where
G: Scope<Timestamp = Timestamp>,
Expand All @@ -89,6 +91,7 @@ where
up_to,
start_stream,
params,
output_batch_count,
),
S3SinkFormat::Parquet => render_upload_operator::<G, parquet::ParquetUploader>(
scope.clone(),
Expand All @@ -101,6 +104,7 @@ where
up_to,
start_stream,
params,
output_batch_count,
),
};

Expand Down Expand Up @@ -130,7 +134,7 @@ fn render_initialization_operator<G>(
scope: G,
sink_id: GlobalId,
up_to: Antichain<G::Timestamp>,
err_stream: Stream<G, (((DataflowError, u64), ()), G::Timestamp, Diff)>,
err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
) -> (Stream<G, Result<(), String>>, PressOnDropButton)
where
G: Scope<Timestamp = Timestamp>,
Expand Down Expand Up @@ -160,7 +164,7 @@ where
while let Some(event) = error_handle.next().await {
match event {
AsyncEvent::Data(cap, data) => {
for (((error, _), _), ts, _) in data {
for (error, ts, _) in data {
if !up_to.less_equal(&ts) {
start_handle.give(&cap, Err(error.to_string()));
return;
Expand Down Expand Up @@ -288,20 +292,20 @@ fn render_upload_operator<G, T>(
connection_id: CatalogItemId,
connection_details: S3UploadInfo,
sink_id: GlobalId,
input_collection: Collection<G, ((Row, u64), ()), Diff>,
input_collection: StreamCore<G, Rc<Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>>,
up_to: Antichain<G::Timestamp>,
start_stream: Stream<G, Result<(), String>>,
params: CopyToParameters,
output_batch_count: u64,
) -> (Stream<G, Result<u64, String>>, PressOnDropButton)
where
G: Scope<Timestamp = Timestamp>,
T: CopyToS3Uploader,
{
let worker_id = scope.index();
let num_workers = scope.peers();
let mut builder = AsyncOperatorBuilder::new("CopyToS3-uploader".to_string(), scope.clone());

let mut input_handle = builder.new_disconnected_input(&input_collection.inner, Pipeline);
let mut input_handle = builder.new_disconnected_input(&input_collection, Pipeline);
let (completion_handle, completion_stream) = builder.new_output();
let mut start_handle = builder.new_input_for(&start_stream, Pipeline, &completion_handle);

Expand Down Expand Up @@ -351,31 +355,20 @@ where
}

let mut row_count = 0;
let mut last_row = None;
while let Some(event) = input_handle.next().await {
match event {
AsyncEvent::Data(_ts, data) => {
for (((row, batch), ()), ts, diff) in data {
// Check our assumption above that batches are
// always assigned to the worker with ID `batch %
// num_workers`.
if usize::cast_from(batch) % num_workers != worker_id {
anyhow::bail!(
"internal error: batch {} assigned to worker {} (expected worker {})",
batch,
worker_id,
usize::cast_from(batch) % num_workers
);
}
if !up_to.less_equal(&ts) {
if diff < 0 {
for ((row, ()), ts, diff) in data.iter().flat_map(|chunk| chunk.iter()) {
let batch = row.hashed() % output_batch_count;
if !up_to.less_equal(ts) {
if *diff < 0 {
anyhow::bail!(
"Invalid data in source errors, saw retractions ({}) for \
row that does not exist",
diff * -1,
*diff * -1,
)
}
row_count += u64::try_from(diff).unwrap();
row_count += u64::try_from(*diff).unwrap();
let uploader = match s3_uploaders.entry(batch) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
Expand All @@ -389,18 +382,10 @@ where
)?)
}
};
for _ in 0..diff {
uploader.append_row(&row).await?;
for _ in 0..*diff {
uploader.append_row(row).await?;
}
}
// A very crude way to detect if there is ever a regression in the deterministic
// ordering of rows in our input, since we are depending on an implementation
// detail of timely communication (FIFO ordering over an exchange).
let cur = (row, batch);
if let Some(last) = last_row {
assert!(&last < &cur, "broken fifo ordering!");
}
last_row = Some(cur);
}
}
AsyncEvent::Progress(frontier) => {
Expand Down
37 changes: 26 additions & 11 deletions src/timely-util/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use std::future::Future;
use std::hash::{BuildHasher, Hash, Hasher};
use std::marker::PhantomData;
use std::rc::Weak;
use std::rc::{Rc, Weak};

use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
use differential_dataflow::difference::{Multiply, Semigroup};
Expand All @@ -25,7 +25,6 @@ use timely::container::{ContainerBuilder, PushInto};
use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
use timely::dataflow::channels::pushers::Tee;
use timely::dataflow::channels::ContainerBytes;
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
use timely::dataflow::operators::generic::operator::{self, Operator};
use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo, OutputHandleCore};
Expand Down Expand Up @@ -708,7 +707,16 @@ where
h.finish()
});
consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
.map(|((k, ()), time, diff)| (k.clone(), time.clone(), diff.clone()))
.unary(Pipeline, "unpack consolidated", |_, _| {
|input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for ((k, ()), t, d) in data.iter().flat_map(|chunk| chunk.iter()) {
session.give((k.clone(), t.clone(), d.clone()))
}
})
}
})
.as_collection()
} else {
self
Expand All @@ -730,7 +738,16 @@ where
Exchange::new(move |update: &((D1, ()), G::Timestamp, R)| (update.0).0.hashed());

consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
.map(|((k, ()), time, diff)| (k.clone(), time.clone(), diff.clone()))
.unary(Pipeline, "unpack consolidated", |_, _| {
|input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for ((k, ()), t, d) in data.iter().flat_map(|chunk| chunk.iter()) {
session.give((k.clone(), t.clone(), d.clone()))
}
})
}
})
.as_collection()
}
}
Expand Down Expand Up @@ -777,7 +794,7 @@ pub fn consolidate_pact<Ba, P, G>(
stream: &StreamCore<G, Ba::Input>,
pact: P,
name: &str,
) -> StreamCore<G, Ba::Output>
) -> StreamCore<G, Rc<Vec<Ba::Output>>>
where
G: Scope,
Ba: Batcher<Time = G::Timestamp> + 'static,
Expand Down Expand Up @@ -831,11 +848,9 @@ where
// send the batch to downstream consumers, empty or not.
let mut session = output.session(&capabilities.elements()[index]);
// Extract updates not in advance of `upper`.
let output =
let mut output =
batcher.seal::<ConsolidateBuilder<_, Ba::Output>>(upper.clone());
for mut batch in output {
session.give_container(&mut batch);
}
session.give_container(&mut output);
}
}

Expand Down Expand Up @@ -879,7 +894,7 @@ where
{
type Input = I;
type Time = T;
type Output = Vec<I>;
type Output = Rc<Vec<I>>;

fn new() -> Self {
Self {
Expand All @@ -900,6 +915,6 @@ where
}

fn seal(chain: &mut Vec<Self::Input>, _description: Description<Self::Time>) -> Self::Output {
std::mem::take(chain)
Rc::new(std::mem::take(chain))
}
}

0 comments on commit 39597c4

Please sign in to comment.