-
Notifications
You must be signed in to change notification settings - Fork 252
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
opt: farmer cache flatten piece_caches #2925
Conversation
This is just an internal structural change that our existing test cases can cover. |
d2d0551
to
3ca4a28
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that it became simpler and even faster for lookups since we no longer need to iterate over caches (though there should not have been many of them anyway).
However, there was a reason it was implemented the way it was, let me explain.
Offsets were stored in flat data structures such that we do not need to store cache index in every single entry unnecessarily.
For example for 1T worth of pieces we'll store 1M of 2-byte cache index values, which means we use 2G of RAM just on those numbers while previously it was not using any memory at all.
In fact right now for 1T of cache it'll likely use 4G of RAM due to memory alignment of FarmerCacheOffset
data stucture caused by the fact that another field is u32
and that causes the whole data structure to be aligned to 4 bytes even though 2 out of 8 bytes will not be used, though this can be mitigated by forcing alignment of the data structure to 2 bytes.
Another preformance concern is the fact that free offsets are no longer distributed across different caches, which means both worse cache read performance, potential farming issues and slower worst case piece cache re-sync when one of the existing farms is removed from farmer and cache wasn't filled fully.
I think we can find a creative way of fixing the second issue, but the first issue with offsets is kind of inherent.
I'm not sure how big of a deal it is in practice, it might still be worth doing, but it was not arbitrary that it was all stored the way it was.
At the same time 1T of cache for local farmer by default means ~100T of space pledged, so it may not be a horrible thing to have +2G of memory usage on top of already quite high memory usage.
@@ -176,38 +190,38 @@ where | |||
// TODO: Consider implementing optional re-sync of the piece instead of just forgetting | |||
WorkerCommand::ForgetKey { key } => { | |||
let mut caches = self.piece_caches.write().await; | |||
let Some(offset) = caches.stored_pieces.remove(&key) else { | |||
// key not exist. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: please use this formatting going forward:
// key not exist. | |
// Key not exist |
To be consistent with existing comments
Ok(None) => { | ||
warn!( | ||
%cache_index, | ||
cache_offset = %offset.piece_offset, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd extract piece_offset
into a variable like cache_index
and call it piece_offset
here as well. It is harder to debug when the same thing is called piece_offset
in one place and cache_offset
in another.
stored_pieces.push(state.stored_pieces); | ||
state.free_offsets.clear(); | ||
free_offsets.push(state.free_offsets); | ||
const MAX_CACHES_NUM: usize = u16::MAX as usize; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd create a type alias type CacheIndex = u16
and used it both here (you'll be able to move this constant out BTW) and in FarmerCacheOffset
data structure, such that reviewer can see that those things are in fact related more easily. For farms we even have a generic that is u8
for local farms and u16
for cluster setup because type size here impacts RAM usage.
let mut backends = Vec::new(); | ||
#[allow(clippy::mutable_key_type)] | ||
let mut stored_pieces = HashMap::new(); | ||
let mut free_offsets = VecDeque::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please do not do ::new()
if you can avoid it, it will result in bad performance and bad memory usage. See how previous code was trying to carefully preallocate all data structures to correct size beforehand, in fact it was even reusing previous memory allocations rather than doing fresh allocations. I think it might be good to preserve that.
|
||
return; | ||
// Build cache state of all backends | ||
for (index, new_cache) in new_piece_caches.into_iter().enumerate() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand that now there is a single data structure containing everything, but it would be really nice for initialization performance to remain high.
You have removed run_future_in_dedicated_thread
, meaning all the caches will now be processed sequentially instead of concurrently, which will massively slow down cache initialization performance, especially for large farmers.
if let Some(capacity_used) = | ||
piece_caches_capacity_used.get_mut(usize::from(offset.cache_index)) | ||
{ | ||
*capacity_used += 1; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd write it as this due to pre-allocation above:
if let Some(capacity_used) = | |
piece_caches_capacity_used.get_mut(usize::from(offset.cache_index)) | |
{ | |
*capacity_used += 1; | |
} | |
piece_caches_capacity_used[usize::from(offset.cache_index)] += 1; |
Thought this is not a correct logic, it should only be increased if the key is not a duplicate (which is exactly what piece_indices_to_store.remove(key).is_none()
is for below.
let Some(offset) = cache.free_offsets.pop_front() else { | ||
return false; | ||
}; | ||
let Some(offset) = caches.free_offsets.pop_front() else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will work, but it loses the property where pieces were distributed across multiple caches for performance reasons. Unless offsets interleave this will likely result in higher read load on one disk and lower on another, which may negatively impact farming performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me think about it, I should somehow make the cache load as balanced as possible, and I think we can do that.
); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fact that one backend has errors shouldn't prevent pieces from being stored in other backends. See how previous code was iterating over all caches to try and find one that is operational. We still need that loop here, but the issue is that we can't iterate over different backends anymore, so it will be less performant either way (will likely need to cache problematic backend indices to avoid hitting it over and over again repeatedly).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. It could be a disaster here.
); | ||
cache.stored_pieces.insert(record_key, offset); | ||
} | ||
// for (cache_index, cache) in caches.iter_mut().enumerate() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Don't commit commented-out code, please
}; | ||
let cache_index = usize::from(offset.cache_index); | ||
let piece_offset = offset.piece_offset; | ||
let Some(backend) = caches.backends.get(cache_index).cloned() else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it need to be cloned though?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
beckend is wrapped in Arc. In this clone in order to avoid lending immutable borrowed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to avoid it? BTW you can leave multiple comments as "Review" if you switch to "Files changed" tab of the PR instead, it decreases number of notifications for repository maintainers greatly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh yeah, there's no need to avoid it, its a mistake.
BTW you can leave multiple comments as "Review" if you switch to "Files changed" tab of the PR instead, it decreases number of notifications for repository maintainers greatly.
Okay, I'll take care of that. (Also, I'm very sorry, but I submitted the commit with wrong author, can I just override that?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, please force-push with name change and no other changes so I don't need to re-review it again
I thought about it some more and I think we can keep free offsets separate for balancing purposes, but combine stored pieces. This will also reduce memory usage for large farmers in the meantime since most of the offsets will be free for them and will use more efficient layout in memory. |
@nazar-pc I actually had the crazy idea of stitching together all the stored and free cache to get a fixed length, contiguous array (0..piece_cache_capacity_total). This can be compactly stored in a bitmap. By storing an additional Vec<max_num_elements>, the backend can be deduced from the index in the bitmap (because of the contiguous splicing). Also 01 can be used to mark stored or free. 1T cache -> 2^20 piece -> 2^20b -> 2^17B -> 2^7KiB |
But let me fix the problem mentioned above first |
Stored pieces need to be in hashmap due to lookups, we can't replace hashmap with a vector and still get efficient lookups. As for knowing free offsets we can potentially compact it to a single number per cache backend by simply storing how many offsets we've occupied because we occupy offsets sequentially. The only issue there is that we'll not be able to store offsets that became free due to read errors, we'll need to use a separate data structure for that, which can be tiny and only used in this emergency situation. Let's probably not do that as part of this PR though, one step at a time so reviewing is manageable. |
Yeah, I agree. I'll clean up the code for review and subsequent submission. |
3ca4a28
to
392c91a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it looks mostly good, but I still have two requests here.
I'd like to see yet another PR where you do the refactoring separately from features
For example in last commit you have done both refactoring of code and some renaming and new features all at once.
There is a lot of noise that is hard to review and new code even though looks like old code is subtly different that makes diff unnecessarily larger and harder to review.
For example despite similar structure new code uses backend
instead of new_cache
, and cache_index
instead of index
, it also extracted new_cache.max_num_elements()
into max_num_elements
variable.
While I agree with all of those changes, they are useless noise when I want to just see what logic you have changed (if any). Currently both diff of the commit and final diff are both annoyingly hard to read.
See for example how #2912 moved things around without actually changing anything in terms of how code works such that subsequent PRs are easier to read.
Or #2920 that was built on top with a few commits that intentionally change a few things logically so you don't need to analyze changes in all 15 files at once.
Or see how #2626 has a few things renamed, but renamings are isolated to separate commits, so they don't really need to be reviewed.
If you create a separate PR that renames things into what they will be named in the upcoming PR, it will decrease the diff and time to review significantly.
Right now my brain simply wants to give up reading because things have changed where they didn't need to, at least not at the same time with other changes.
I can ignore whitespace changes in diff, but I can't ignore renamings unfortunately.
Since we will want to refactor the way free offsets work, we don't need to change them in this PR
Free offsets will likely be replaced with a number/pair of numbers for each cache rather than a flat vector of actual offsets.
As such, we don't need to change that part of the logic in this PR and have a temporary degradation of cache performance, we can delay this to a separate PR that we already know we'll want to do.
@@ -86,7 +86,7 @@ impl KnownCaches { | |||
pub(super) async fn maintain_caches( | |||
cache_group: &str, | |||
nats_client: &NatsClient, | |||
farmer_cache: FarmerCache, | |||
farmer_cache: FarmerCache<u16>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use type alias just like we do for FarmIndex
. Simple type CacheIndex = u16
will do. It is annoying to have u16
sprinkled in multiple places without direct link together.
where | ||
FarmIndex: Hash + Eq + Copy + fmt::Debug + Send + Sync + 'static, | ||
usize: From<FarmIndex>, | ||
CacheIndex: Hash + Eq + Copy + fmt::Debug + Default + Send + Sync + 'static, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is Default
required?
I'm very sorry for the disaster my bad habits have caused you. I'll take care of this, and keep my commit as small as possible. |
3d00acd
to
175ab9c
Compare
175ab9c
to
5a43d33
Compare
I squashed your commits into one, added one more before to simplify squashed diff (you can see force push diff in https://github.com/subspace/subspace/compare/175ab9cd19b48fdce7864942fe8507a8d1ada307..5a43d331935b1ddc4a5eb4fc7f7b539ac25e328f). I also pushed one more commit on top that fixes locking (piece cache read lock was held unnecessarily before taking plot cache lock) and reordered code closer to what it was before, such that looking at the final diff code is very similar and only actually changed part related to piece cache is now shown as changed. I will squash that last commit in, just wanted to show what I have changed and why. Now I think we're close to merging this, but free offsets need to be refactored first or else we'll have a regression, here is how I think that should be done (and can be pushed into this PR):
No force pushes with rebase here, please, it makes reviewing much harder than it needs to be. |
9bd96bf
to
231ab78
Compare
Do I understand correctly that you just reased it on |
I pushed him from the wrong branch, and after I found out I reset it back to. @nazar-pc Only the last commit makes sense, the previous ones were just tiny refactorings and renames |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there, just accounting for used capacity/dangling free offsets is incorrect
@@ -367,6 +431,7 @@ where | |||
let offset = FarmerCacheOffset::new(cache_index, piece_offset); | |||
match maybe_piece_index { | |||
Some(piece_index) => { | |||
*used_capacity = piece_offset.0 + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This naive logic is actually not correct. See, you get maybe_piece_index
, there is nothing in API that guarantees that used pieces offsets do not have unused offsets in between them. In fact this is exactly what dangling offsets are supposed to represent: free offsets dangling inside of otherwise used capacity.
So used capacity is the whole range of used indices even if there are "holes" in it, while this is only increasing index for non-dangling offsets, which will result in inconsistent data structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so what's actually being done here is to record the last_used_offset, as well as all free_offsets for the current cache_backend (including dangling_free_offset, you can see line 439, just below. ). And later on, we'll unify to compressing them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm... I see. This does work correctly, though it will potentially allocate a large cache_stored_pieces
that would otherwise be empty or near-empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will calculate dangling_free_offsets
inline. But the handling of dangling_offsets I think is correct.
@@ -367,6 +431,7 @@ where | |||
let offset = FarmerCacheOffset::new(cache_index, piece_offset); | |||
match maybe_piece_index { | |||
Some(piece_index) => { | |||
*used_capacity = piece_offset.0 + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, so what's actually being done here is to record the last_used_offset, as well as all free_offsets for the current cache_backend (including dangling_free_offset, you can see line 439, just below. ). And later on, we'll unify to compressing them.
@@ -414,7 +479,8 @@ where | |||
let backend = cache.backend; | |||
let free_offsets = cache.cache_free_offsets; | |||
stored_pieces.extend(cache.cache_stored_pieces.into_iter()); | |||
dangling_free_offsets.extend(free_offsets.into_iter()); | |||
dangling_free_offsets | |||
.extend(backend.dangling_free_offsets(free_offsets).into_iter()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nazar-pc Here we extract all free_offsets that are before last_used_offset (they are dangling_offsets)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may massage this a bit before merging, but it looks good otherwise, thanks a lot!
@@ -367,6 +431,7 @@ where | |||
let offset = FarmerCacheOffset::new(cache_index, piece_offset); | |||
match maybe_piece_index { | |||
Some(piece_index) => { | |||
*used_capacity = piece_offset.0 + 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm... I see. This does work correctly, though it will potentially allocate a large cache_stored_pieces
that would otherwise be empty or near-empty.
c3d136e
to
9cc17ea
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let go, thank you!
Thank you very much for your contributions @tediou5 through this PR and several other PRs you've submitted throughout July! As a token of our appreciation, we would love to reward you with some USDC as a part of our Contribution Contest . Please fill out this form and we will send the reward your way. Sorry about the delay in sending you USDC for your contributions past month, we're going to sum them up! Thanks again and looking forward to your future contributions! |
This is the first step of #1769 , which is to collect all the
stored_pieces
andfree_offsets
of all the caches together, so that we don't need to resort toUniqueRecordBinaryHeap
to manage the synchronisation of the individual caches.Code contributor checklist: