-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Breakdown Reveal Aggregation #1172
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #1172 +/- ##
==========================================
+ Coverage 92.79% 92.84% +0.04%
==========================================
Files 197 197
Lines 30523 30735 +212
==========================================
+ Hits 28325 28535 +210
- Misses 2198 2200 +2 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have left a couple of comments. I think we need to think more about a strategy of how to vectorize the aggregation efficiently.
// 1. TODO: Add fake attribution outputs. | ||
// 2. Shuffle. | ||
// 3. Reveal breakdown. Trigger values are not revelaed. | ||
// 4. Aggregation if trigger values secret shares. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you do the aggregation, will you sort the records by breakdown? I think that might be expensive for many breakdowns. I think it would make sense to chunk the list of records into smaller parts and only sort within these chunks. Vectorization should still be efficient within the chunks and you won't need a sort on the huge list of all records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I still need to come up with a way to chunk/vectorize things.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't we know the breakdown in the clear, so we can trivially group them as we reveal it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes its all local so you could sort by breakdown and then vectorize within events with the same breakdown. Sorting a large list might still cause a lot of reallocations which might not be necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, for the breakdown counts that we're contemplating currently, I would say just fan the values out into a bucket per breakdown. Then once we get two chunks worth of records in a bucket, we can do a vectorized reduction.
If we're doing a full resharding here (which a sharded shuffle is), then maybe it makes sense to shard by breakdowns rather than randomly? There is a question whether we get better parallelism across breakdowns or by eagerly pulling more chunks from a particular aggregation stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes its all local so you could sort by breakdown
I still don't see a need to sort a big list by breakdown key. As Andy pointed out, we fan out events to the corresponding shards by the breakdown key as we drain the input. Grouping comes essentially for free
If we're doing a full resharding here (which a sharded shuffle is), then maybe it makes sense to shard by breakdowns rather than randomly?
This makes sense to me - maybe in sharded world in this case we can avoid transpose
There is a question whether we get better parallelism across breakdowns or by eagerly pulling more chunks from a particular aggregation stream.
To make a decision here, it would be helpful to understand the distribution of events per breakdown. If it is not close to uniform in presence of fake events (basically follows user-provided distribution), then we can assume the worst case and vectorize across breakdowns per shard
HV: BooleanArray + U128Conversions, | ||
{ | ||
// TODO: Maybe move this to the function contract | ||
let contribs: Vec<_> = contributions_stream.try_collect().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
contribs is a strange name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change it on a future revision. Please consider this code just a starting point to better discuss the vectorization part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Daniel - namings should be part of the review. For things not relevant, there needs to be a hint to reviewers to skip certain parts of the code on the review - although I think it makes reviewer's life harder.
for (i, attribution_outputs) in contribs.into_iter().enumerate() { | ||
let record_id = RecordId::from(i); | ||
let ao: SecretSharedAttributionOutputs<BK, TV> = attribution_outputs; // For Rust Analyzer | ||
let bk_share = ao.attributed_breakdown_key_bits; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why is ao needed here and why not using attribution_outputs directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a small aid for Rust Analyzer. Rust can't infer the type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, IDEs that support Rust language are far from ideal and there are gaps in them. I don't think it warrants code modifications like this though - this code will be read in many IDEs, text editors, web browsers and it is bad for readability.
let bk_share = ao.attributed_breakdown_key_bits; | ||
let reveal_ctx = ctx | ||
.narrow(&AggregationStep::RevealStep) | ||
.set_total_records(TotalRecords::Indeterminate); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reveal context needs to be defined outside of the loop, otherwise you generate a context for every record.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do. The for loop is going away anyway.
@@ -51,6 +51,7 @@ impl From<MoveToBucketError> for Error { | |||
/// | |||
/// ## Errors | |||
/// If `breakdown_count` does not fit into `BK` bits or greater than or equal to $2^9$ | |||
#[allow(dead_code)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this in this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because I haven't deleted the old aggregation code yet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not relevant to this change and shouldn't be a part of it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will delete all the old-aggregation code before I merge but I would like to keep it for reference (easier to navigate code with IDE than Github). I allow the code so that all code checks still run on every push.
attributed_breakdown_key: u128, | ||
capped_attributed_trigger_value: u128, | ||
#[derive(Debug, Clone, Ord, PartialEq, PartialOrd, Eq)] | ||
pub struct PreAggregationTestOutputInDecimal { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is still stored in a u128. I am not sure whether I would call it a decimal and whether this is substantially different from the way other numbers are stored.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't came up with that name. Rather have a separate PR for renaming things if you deem that necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this made it public which made me questioning whether it is only used in tests and should be marked as such?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's configured to be a test struct only with #[cfg]
@@ -111,17 +145,90 @@ where | |||
} | |||
} | |||
|
|||
// This function converts Attribution Outputs to an AdditiveShare needed for shuffle protocol | |||
pub fn attribution_outputs_to_shuffle_input<BK, TV, YS>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the naming is confusing and I do think it makes more sense to implement shared value (which implies additiveshare) for SecretSharedAttributionOutputs rather than merging BK and TV into a single Boolean Array. We might not even implement BooleanArrays that have the correct size needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This entire part of the shuffling is going away anyway in favor of the sharded version which has a Shuffle-able trait
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see so this is just a temporary change. Maybe mark it with a todo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea that's pretty gross, but as @cberkhoff pointed out a necessary change due to shuffle not really supporting any other input except IPA event
@@ -43,6 +50,33 @@ where | |||
.collect::<Vec<_>>()) | |||
} | |||
|
|||
#[tracing::instrument(name = "shuffle_attribution_outputs", skip_all)] | |||
pub async fn shuffle_attribution_outputs<C, BK, TV, R>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't try to merge BA just for the sake of shuffling (see comment below on attribution_outputs_to_shuffle) We do not need the generic R and the trait bounds. I don't think we should introduce new BA types just for the sake of shuffling.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I don't think these conversions to BA are necessary, in fact the sharded version of shuffle provides better ergonomics. I'm just using this version as a temporary stop-gap.
} | ||
|
||
// This function converts Attribution Outputs obtained from shuffle protocol to OprfReport | ||
pub fn shuffled_to_attribution_outputs<YS, BK, TV>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could also be removed by implementing shared value for SecretSharedAttributionOutputs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will keep this in mind but this whole part of shuffling is going away
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this may be a good suggestion to keep the code concise for the time being, if that works. I much rather prefer traits over standalone functions to keep the nomenclature of things used somewhat under control
ipa-core/src/error.rs
Outdated
@@ -72,6 +72,8 @@ pub enum Error { | |||
DecompressingInvalidCurvePoint(String), | |||
#[error(transparent)] | |||
LengthError(#[from] LengthError), | |||
#[error(transparent)] | |||
TryFromIntError(#[from] std::num::TryFromIntError), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is too generic to be at this level I think. We should use a more specific error type that explains what happened
HV: BooleanArray + U128Conversions, | ||
{ | ||
// TODO: Maybe move this to the function contract | ||
let contribs: Vec<_> = contributions_stream.try_collect().await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with Daniel - namings should be part of the review. For things not relevant, there needs to be a hint to reviewers to skip certain parts of the code on the review - although I think it makes reviewer's life harder.
for (i, attribution_outputs) in contribs.into_iter().enumerate() { | ||
let record_id = RecordId::from(i); | ||
let ao: SecretSharedAttributionOutputs<BK, TV> = attribution_outputs; // For Rust Analyzer | ||
let bk_share = ao.attributed_breakdown_key_bits; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, IDEs that support Rust language are far from ideal and there are gaps in them. I don't think it warrants code modifications like this though - this code will be read in many IDEs, text editors, web browsers and it is bad for readability.
.narrow(&AggregationStep::RevealStep) | ||
.set_total_records(TotalRecords::Indeterminate); | ||
let revealed_bk: BK = BK::from_array(&bk_share.reveal(reveal_ctx, record_id).await?); | ||
let pos = usize::try_from(revealed_bk.as_u128())?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
heh, it is ok to assume breakdown key fits into usize :)
integer_sat_add::<_, SixteenBitStep, 1>(add_ctx, record_id, &result[pos], &tv).await?; | ||
result[pos] = r; | ||
} | ||
let resp: Vec<Replicated<HV>> = result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps letting compiler to infer the type is the way to go here. Most of the Rust libraries, including IPA use this approach: https://doc.rust-lang.org/rust-by-example/types/inference.html
} | ||
|
||
// This function converts Attribution Outputs obtained from shuffle protocol to OprfReport | ||
pub fn shuffled_to_attribution_outputs<YS, BK, TV>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this may be a good suggestion to keep the code concise for the time being, if that works. I much rather prefer traits over standalone functions to keep the nomenclature of things used somewhat under control
attributed_breakdown_key: u128, | ||
capped_attributed_trigger_value: u128, | ||
#[derive(Debug, Clone, Ord, PartialEq, PartialOrd, Eq)] | ||
pub struct PreAggregationTestOutputInDecimal { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this made it public which made me questioning whether it is only used in tests and should be marked as such?
let mut inputs = Vec::new(); | ||
let mut expectation = Vec::new(); | ||
for _ in 0..BATCHSIZE { | ||
let (i, e) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BA32 can be generated using Rng, so you can simplify this part as well
test_executor::run, | ||
test_fixture::{ipa::TestRawDataRecord, Reconstruct, Runner, TestWorld}, | ||
}; | ||
|
||
fn input_row( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a bit of bikeshedding, but this does generate an input row and expected result, so name can reflect that
.upgraded_semi_honest( | ||
inputs.into_iter(), | ||
|ctx: UpgradedSemiHonestContext<NotSharded, Boolean>, input_rows| async move { | ||
let aos: Vec<_> = input_rows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this variable could have a better name
… mettrics and addressing some comments
This change is still a WIP. Just want to get some baseline numbers from Draft. |
I ran a queries and got the following stats:
Having this in mind I focus on Additions and then Reveal |
With the
|
Using |
Added a new aggregation logic for Trigger Values into the Histogram. With it, aggregation as a whole is about 3x faster than the current one.
|
Digging trough the logs I think vectorizing can make aggregation take 20seconds off aggregation, which is taking around 126 seconds. There might be more gains from more efficient code. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is mostly good to go I think. There are some things to improve, but most importantly we shouldn't replace the existing aggregation just yet. We haven't added DP noise in OPRF and we may need a proof or tight bound on noise added per breakdown key.
Once these figured, we can replace the aggregation in a separate PR
/// Since the addition of 2 TVs returns a newly alloc TV and the number of | ||
/// BKs is small, there's not a lot of gain by doing operations in place with | ||
/// references in this structure. | ||
struct GroupedTriggerValues<HV, const B: usize> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You only need HV
in expand
and there is no runtime check at construction time. You probably want to drop this generic from struct definition and change expand
to expand::<HV>(..)
let atributions = shuffle_attributions::<_, _, B>(&ctx, atributions).await?; | ||
let grouped_tvs = reveal_breakdowns::<HV, _, _, B>(&ctx, atributions).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let atributions = shuffle_attributions::<_, _, B>(&ctx, atributions).await?; | |
let grouped_tvs = reveal_breakdowns::<HV, _, _, B>(&ctx, atributions).await?; | |
let atributions = shuffle_attributions(&ctx, atributions).await?; | |
let grouped_tvs = reveal_breakdowns(&ctx, atributions).await?; |
} | ||
// Should never come to this. Insted it should return from the exit | ||
// condition in the for loop. | ||
Err(Error::Internal) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
impossible states are better modelled with panics imo
let pairs = mem::take(&mut grouped_tvs.pairs); | ||
// Exit condition | ||
if pairs.is_empty() { | ||
let mut r: Vec<Replicated<HV>> = vec![]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let mut r: Vec<Replicated<HV>> = vec![]; | |
let mut r = Vec::<Replicated<HV>>::with_capacity(groupted_tvs.singles.len()) |
let tv_size: usize = TV::BITS.try_into().unwrap(); | ||
let mut grouped_tvs = GroupedTriggerValues::<HV, B>::new(tv_size); | ||
let tvs: Vec<(usize, Replicated<TV>)> = seq_join(reveal_ctx.active_work(), reveal_work) | ||
.try_collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no reason to collect this into a vector. Just add things into grouped_tvs
as they come from the reveal stream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the reason I was doing it is because I didn't wanted to modify grouped_tvs
in an async scope. I wanted all the modifications of grouped_tvs
to happen in the main thread. If I do something like:
seq_join(reveal_ctx.active_work(), reveal_work)
.try_for_each(move |(bk, tv)| {
grouped_tvs.push(bk, tv.to_bits())
})
.await?;
I'm getting an issue with captured variables escaping the closure body
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it works if you pin the stream and try_next it
.narrow(&AggregationStep::Aggregate(d)) | ||
.set_total_records(TotalRecords::specified(pairs.len()).unwrap()); | ||
|
||
let work = pairs.iter().enumerate().map(|(i, p)| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would probably be safer if you consume pairs
at this point
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also, you could do zip(pairs, repeat(add_ctx)).map(...)
to get a fresh context for every row
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you meant into_iter
?
with the existing enumerate
the resulting tuple will look weird: (i, (p, add_ctx))
.
I find the current code more legible.
)) | ||
.await?; | ||
} | ||
// Should never come to this. Insted it should return from the exit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Should never come to this. Insted it should return from the exit | |
// This loop terminates if the number of steps exceeds 64, which means processing more than 2^64 events that shouldn't happen in practice |
return Ok(r); | ||
} | ||
|
||
let can_expand = grouped_tvs.expand(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does this operation have a cost? i.e. are we adding these zeroes after expanding in MPC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're adding 0's to match the results of the non-saturated additions. The actual sum happens after.
I could be wrong, but adding these zeroes is "local", there's no communication required. Each helper just adds a Replicated::ZERO to each item in the singles. This should be very fast.
let op = self.singles[bk].take(); | ||
if let Some(existing_value) = op { | ||
self.pairs.push(Pair { | ||
left: existing_value, | ||
right: value, | ||
bk, | ||
}); | ||
self.singles[bk] = None; | ||
} else { | ||
self.singles[bk] = Some(value); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let op = self.singles[bk].take(); | |
if let Some(existing_value) = op { | |
self.pairs.push(Pair { | |
left: existing_value, | |
right: value, | |
bk, | |
}); | |
self.singles[bk] = None; | |
} else { | |
self.singles[bk] = Some(value); | |
} | |
if let Some(existing_value) = self.singles[bk].take() { | |
self.pairs.push(Pair { | |
left: existing_value, | |
right: value, | |
bk, | |
}); | |
} else { | |
self.singles[bk] = Some(value); | |
} |
} | ||
|
||
#[test] | ||
fn semi_honest_happy_path() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is great, but not enough to confidently say the logic is correct. I'd love to see more comprehensive testing here:
- All breakdowns have more than one event
- Some have one event, some two, some three (testing odd/even)
- Some breakdowns have zero events, while others have many
- Some breakdowns saturate and some do not (use small HV, TV for it)
and resistance testing for errors as well
if let Some(hv_bits) = s { | ||
r.push(hv_bits.collect_bits()); | ||
} else { | ||
r.push(Replicated::ZERO); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
r.push(s.map_or_default(|hv_bits| hv_bits.collect_bits()));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good starting point to keep improving the new aggregation. We should create an issue with all the improvements and action items we need to take before we can make it the default
This PR is a work in progress to enable Breakdown Reveal Aggregation (aka Improved Aggregation). Aggregation steps happen after attribution. The input to aggregation is a stream of tuples of (attributed breakdown key, attributed trigger value). The output of aggregation is a Histogram. Breakdown Keys and Trigger Values are assigned by the advertiser and sent in the input of IPA. Breakdown Keys values are expected to be dense. How breakdown keys and trigger values are defined is out-of-scope.
High level explanation of the protocol:
DP noise to histogram buckets will be added by the caller of this function. This because adding noise is really unrelated to how things are added up.
Following are the improvements to be done before merging in no particular order: