Skip to content

Commit

Permalink
Consolidate produces batch of data
Browse files Browse the repository at this point in the history
Instead of flattening the outputs of consolidate, provide the output as a
whole batch, encoded as an `Vec<C>`. This allows downstream operators
to act on a whole batch at once, without relying on undocumented properties
of how Timely channels behave. For operators that need to flatten the
output need slightly different logic.

The copy-to-s3 operator sees some changes, which are roughly:
* Accept the chain formed by `consolidate_pact`, and rely on it
  partitioning the data according to the exchange function.
* Skip encoding the batch, and recompute it when writing the data.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jan 23, 2025
1 parent b6c1b0a commit c55fbdd
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 72 deletions.
4 changes: 2 additions & 2 deletions src/compute/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ where
CB: ContainerBuilder,
L: Into<LogVariant>,
F: for<'a> FnMut(
<B::Output as Container>::Item<'a>,
<B::Output as Container>::ItemRef<'a>,
&mut PermutedRowPacker,
&mut OutputSession<CB>,
) + 'static,
Expand All @@ -251,7 +251,7 @@ where
move |input, output| {
while let Some((time, data)) = input.next() {
let mut session = output.session_with_builder(&time);
for item in data.drain() {
for item in data.iter().flatten().flat_map(|chunk| chunk.iter()) {
logic(item, &mut packer, &mut session);
}
}
Expand Down
43 changes: 18 additions & 25 deletions src/compute/src/sink/copy_to_s3_oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::any::Any;
use std::cell::RefCell;
use std::rc::Rc;

use differential_dataflow::{AsCollection, Collection, Hashable};
use differential_dataflow::{Collection, Hashable};
use mz_compute_client::protocol::response::CopyToResponse;
use mz_compute_types::dyncfgs::{
COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO, COPY_TO_S3_MULTIPART_PART_SIZE_BYTES,
Expand All @@ -23,7 +23,6 @@ use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::errors::DataflowError;
use mz_timely_util::operator::consolidate_pact;
use timely::dataflow::channels::pact::{Exchange, Pipeline};
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::Operator;
use timely::dataflow::Scope;
use timely::progress::Antichain;
Expand Down Expand Up @@ -63,32 +62,20 @@ where
// files based on the user provided `MAX_FILE_SIZE`.
let batch_count = self.output_batch_count;

// This relies on an assumption the output order after the Exchange is deterministic, which
// is necessary to ensure the files written from each compute replica are identical.
// While this is not technically guaranteed, the current implementation uses a FIFO channel.
// In the storage copy_to operator we assert the ordering of rows to detect any regressions.
// We exchange the data according to batch, but we don't want to send the batch ID to the
// sink. The sink can re-compute the batch ID from the data.
let input = consolidate_pact::<KeyBatcher<_, _, _>, _, _>(
&sinked_collection
.map(move |row| {
let batch = row.hashed() % batch_count;
((row, batch), ())
})
.inner,
Exchange::new(move |(((_, batch), _), _, _)| *batch),
&sinked_collection.map(move |row| (row, ())).inner,
Exchange::new(move |((row, ()), _, _): &((Row, _), _, _)| row.hashed() % batch_count),
"Consolidated COPY TO S3 input",
);
// TODO: We're converting a stream of region-allocated data to a stream of vectors.
let input = input.map(Clone::clone).as_collection();

// We need to consolidate the error collection to ensure we don't act on retracted errors.
let error = consolidate_pact::<KeyBatcher<_, _, _>, _, _>(
&err_collection
.map(move |row| {
let batch = row.hashed() % batch_count;
((row, batch), ())
})
.inner,
Exchange::new(move |(((_, batch), _), _, _)| *batch),
&err_collection.map(move |err| (err, ())).inner,
Exchange::new(move |((err, _), _, _): &((DataflowError, _), _, _)| {
err.hashed() % batch_count
}),
"Consolidated COPY TO S3 errors",
);

Expand All @@ -104,9 +91,14 @@ where
while let Some((time, data)) = input.next() {
if !up_to.less_equal(time.time()) && !received_one {
received_one = true;
output
.session(&time)
.give_iterator(data.iter().next().cloned().into_iter());
output.session(&time).give_iterator(
data.iter()
.flatten()
.flat_map(|chunk| chunk.iter().cloned())
.next()
.map(|((err, ()), time, diff)| (err, time, diff))
.into_iter(),
);
}
}
}
Expand All @@ -132,6 +124,7 @@ where
self.connection_id,
params,
result_callback,
self.output_batch_count,
);

Some(token)
Expand Down
65 changes: 30 additions & 35 deletions src/storage-operators/src/s3_oneshot_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::rc::Rc;

use anyhow::anyhow;
use aws_types::sdk_config::SdkConfig;
use differential_dataflow::{Collection, Hashable};
use differential_dataflow::Hashable;
use futures::StreamExt;
use mz_ore::cast::CastFrom;
use mz_ore::error::ErrorExt;
Expand All @@ -31,6 +31,7 @@ use mz_storage_types::sinks::{S3SinkFormat, S3UploadInfo};
use mz_timely_util::builder_async::{
Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
};
use timely::container::columnation::TimelyStack;
use timely::dataflow::channels::pact::{Exchange, Pipeline};
use timely::dataflow::operators::Broadcast;
use timely::dataflow::{Scope, Stream};
Expand All @@ -54,9 +55,12 @@ mod pgcopy;
/// - completion: removes the sentinel file and calls the `worker_callback`
///
/// Returns a token that should be held to keep the sink alive.
///
/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
/// modulo the number of batches.
pub fn copy_to<G, F>(
input_collection: Collection<G, ((Row, u64), ()), Diff>,
err_stream: Stream<G, (((DataflowError, u64), ()), G::Timestamp, Diff)>,
input_collection: Stream<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
up_to: Antichain<G::Timestamp>,
connection_details: S3UploadInfo,
connection_context: ConnectionContext,
Expand All @@ -65,6 +69,7 @@ pub fn copy_to<G, F>(
connection_id: CatalogItemId,
params: CopyToParameters,
worker_callback: F,
output_batch_count: u64,
) -> Rc<dyn Any>
where
G: Scope<Timestamp = Timestamp>,
Expand All @@ -89,6 +94,7 @@ where
up_to,
start_stream,
params,
output_batch_count,
),
S3SinkFormat::Parquet => render_upload_operator::<G, parquet::ParquetUploader>(
scope.clone(),
Expand All @@ -101,6 +107,7 @@ where
up_to,
start_stream,
params,
output_batch_count,
),
};

Expand Down Expand Up @@ -130,7 +137,7 @@ fn render_initialization_operator<G>(
scope: G,
sink_id: GlobalId,
up_to: Antichain<G::Timestamp>,
err_stream: Stream<G, (((DataflowError, u64), ()), G::Timestamp, Diff)>,
err_stream: Stream<G, (DataflowError, G::Timestamp, Diff)>,
) -> (Stream<G, Result<(), String>>, PressOnDropButton)
where
G: Scope<Timestamp = Timestamp>,
Expand Down Expand Up @@ -160,7 +167,7 @@ where
while let Some(event) = error_handle.next().await {
match event {
AsyncEvent::Data(cap, data) => {
for (((error, _), _), ts, _) in data {
for (error, ts, _) in data {
if !up_to.less_equal(&ts) {
start_handle.give(&cap, Err(error.to_string()));
return;
Expand Down Expand Up @@ -281,27 +288,30 @@ where
/// Returns a `completion_stream` which contains 1 event per worker of
/// the result of the upload operation, either an error or the number of rows
/// uploaded by the worker.
///
/// The `input_collection` must be a stream of chains, partitioned and exchanged by the row's hash
/// modulo the number of batches.
fn render_upload_operator<G, T>(
scope: G,
connection_context: ConnectionContext,
aws_connection: AwsConnection,
connection_id: CatalogItemId,
connection_details: S3UploadInfo,
sink_id: GlobalId,
input_collection: Collection<G, ((Row, u64), ()), Diff>,
input_collection: Stream<G, Vec<TimelyStack<((Row, ()), G::Timestamp, Diff)>>>,
up_to: Antichain<G::Timestamp>,
start_stream: Stream<G, Result<(), String>>,
params: CopyToParameters,
output_batch_count: u64,
) -> (Stream<G, Result<u64, String>>, PressOnDropButton)
where
G: Scope<Timestamp = Timestamp>,
T: CopyToS3Uploader,
{
let worker_id = scope.index();
let num_workers = scope.peers();
let mut builder = AsyncOperatorBuilder::new("CopyToS3-uploader".to_string(), scope.clone());

let mut input_handle = builder.new_disconnected_input(&input_collection.inner, Pipeline);
let mut input_handle = builder.new_disconnected_input(&input_collection, Pipeline);
let (completion_handle, completion_stream) = builder.new_output();
let mut start_handle = builder.new_input_for(&start_stream, Pipeline, &completion_handle);

Expand Down Expand Up @@ -351,31 +361,24 @@ where
}

let mut row_count = 0;
let mut last_row = None;
while let Some(event) = input_handle.next().await {
match event {
AsyncEvent::Data(_ts, data) => {
for (((row, batch), ()), ts, diff) in data {
// Check our assumption above that batches are
// always assigned to the worker with ID `batch %
// num_workers`.
if usize::cast_from(batch) % num_workers != worker_id {
anyhow::bail!(
"internal error: batch {} assigned to worker {} (expected worker {})",
batch,
worker_id,
usize::cast_from(batch) % num_workers
);
}
if !up_to.less_equal(&ts) {
if diff < 0 {
for ((row, ()), ts, diff) in
data.iter().flatten().flat_map(|chunk| chunk.iter())
{
// We're consuming a batch of data, and the upstream operator has to ensure
// that the data is exchanged according to the batch.
let batch = row.hashed() % output_batch_count;
if !up_to.less_equal(ts) {
if *diff < 0 {
anyhow::bail!(
"Invalid data in source errors, saw retractions ({}) for \
row that does not exist",
diff * -1,
*diff * -1,
)
}
row_count += u64::try_from(diff).unwrap();
row_count += u64::try_from(*diff).unwrap();
let uploader = match s3_uploaders.entry(batch) {
Entry::Occupied(entry) => entry.into_mut(),
Entry::Vacant(entry) => {
Expand All @@ -389,18 +392,10 @@ where
)?)
}
};
for _ in 0..diff {
uploader.append_row(&row).await?;
for _ in 0..*diff {
uploader.append_row(row).await?;
}
}
// A very crude way to detect if there is ever a regression in the deterministic
// ordering of rows in our input, since we are depending on an implementation
// detail of timely communication (FIFO ordering over an exchange).
let cur = (row, batch);
if let Some(last) = last_row {
assert!(&last < &cur, "broken fifo ordering!");
}
last_row = Some(cur);
}
}
AsyncEvent::Progress(frontier) => {
Expand Down
39 changes: 29 additions & 10 deletions src/timely-util/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@ use timely::container::{ContainerBuilder, PushInto};
use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
use timely::dataflow::channels::pushers::Tee;
use timely::dataflow::channels::ContainerBytes;
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder as OperatorBuilderRc;
use timely::dataflow::operators::generic::operator::{self, Operator};
use timely::dataflow::operators::generic::{InputHandleCore, OperatorInfo, OutputHandleCore};
use timely::dataflow::operators::Capability;
use timely::dataflow::{Scope, StreamCore};
use timely::dataflow::{Scope, Stream, StreamCore};
use timely::progress::{Antichain, Timestamp};
use timely::{Container, Data, PartialOrder};

Expand Down Expand Up @@ -708,7 +707,18 @@ where
h.finish()
});
consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
.map(|((k, ()), time, diff)| (k.clone(), time.clone(), diff.clone()))
.unary(Pipeline, "unpack consolidated", |_, _| {
|input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for ((k, ()), t, d) in
data.iter().flatten().flat_map(|chunk| chunk.iter())
{
session.give((k.clone(), t.clone(), d.clone()))
}
})
}
})
.as_collection()
} else {
self
Expand All @@ -730,7 +740,17 @@ where
Exchange::new(move |update: &((D1, ()), G::Timestamp, R)| (update.0).0.hashed());

consolidate_pact::<Ba, _, _>(&self.map(|k| (k, ())).inner, exchange, name)
.map(|((k, ()), time, diff)| (k.clone(), time.clone(), diff.clone()))
.unary(Pipeline, &format!("Unpack {name}"), |_, _| {
|input, output| {
input.for_each(|time, data| {
let mut session = output.session(&time);
for ((k, ()), t, d) in data.iter().flatten().flat_map(|chunk| chunk.iter())
{
session.give((k.clone(), t.clone(), d.clone()))
}
})
}
})
.as_collection()
}
}
Expand Down Expand Up @@ -770,14 +790,15 @@ where

/// Aggregates the weights of equal records into at most one record.
///
/// The data are accumulated in place, each held back until their timestamp has completed.
/// Produces a stream of chains of records, partitioned according to `pact`. The
/// data is sorted according to `Ba`. For each timestamp, it produces at most one chain.
///
/// This serves as a low-level building-block for more user-friendly functions.
/// The data are accumulated in place, each held back until their timestamp has completed.
pub fn consolidate_pact<Ba, P, G>(
stream: &StreamCore<G, Ba::Input>,
pact: P,
name: &str,
) -> StreamCore<G, Ba::Output>
) -> Stream<G, Vec<Ba::Output>>
where
G: Scope,
Ba: Batcher<Time = G::Timestamp> + 'static,
Expand Down Expand Up @@ -833,9 +854,7 @@ where
// Extract updates not in advance of `upper`.
let output =
batcher.seal::<ConsolidateBuilder<_, Ba::Output>>(upper.clone());
for mut batch in output {
session.give_container(&mut batch);
}
session.give(output);
}
}

Expand Down

0 comments on commit c55fbdd

Please sign in to comment.