Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify MemoryManager #4522

Merged
merged 12 commits into from
Dec 19, 2022
Merged
8 changes: 4 additions & 4 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2032,7 +2032,7 @@ mod tests {
use super::*;
use crate::assert_batches_eq;
use crate::execution::context::QueryPlanner;
use crate::execution::memory_pool::TrackedAllocation;
use crate::execution::memory_pool::MemoryConsumer;
use crate::execution::runtime_env::RuntimeConfig;
use crate::physical_plan::expressions::AvgAccumulator;
use crate::test;
Expand Down Expand Up @@ -2060,8 +2060,8 @@ mod tests {
// configure with same memory / disk manager
let memory_pool = ctx1.runtime_env().memory_pool.clone();

let mut allocation = TrackedAllocation::new(&memory_pool, "test".to_string());
allocation.grow(100);
let mut reservation = MemoryConsumer::new("test").register(&memory_pool);
reservation.grow(100);

let disk_manager = ctx1.runtime_env().disk_manager.clone();

Expand All @@ -2071,7 +2071,7 @@ mod tests {
assert_eq!(ctx1.runtime_env().memory_pool.allocated(), 100);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test had to be rewitten as pointer equality of fat pointers such as &dyn is funky

assert_eq!(ctx2.runtime_env().memory_pool.allocated(), 100);

drop(allocation);
drop(reservation);

assert_eq!(ctx1.runtime_env().memory_pool.allocated(), 0);
assert_eq!(ctx2.runtime_env().memory_pool.allocated(), 0);
Expand Down
97 changes: 42 additions & 55 deletions datafusion/core/src/execution/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,43 +25,43 @@ pub mod proxy;

pub use pool::*;

/// The pool of memory from which [`TrackedAllocation`] allocate
/// The pool of memory on which [`MemoryReservation`] record their memory usage
pub trait MemoryPool: Send + Sync + std::fmt::Debug {
/// Records the creation of a new [`TrackedAllocation`] with [`AllocationOptions`]
fn allocate(&self, _options: &AllocationOptions) {}
/// Records the creation of a new [`MemoryReservation`] with [`MemoryConsumer`]
fn register(&self, _consumer: &MemoryConsumer) {}

/// Records the destruction of a [`TrackedAllocation`] with [`AllocationOptions`]
fn free(&self, _options: &AllocationOptions) {}
/// Records the destruction of a [`MemoryReservation`] with [`MemoryConsumer`]
fn unregister(&self, _consumer: &MemoryConsumer) {}

/// Infallibly grow the provided `allocation` by `additional` bytes
/// Infallibly grow the provided `reservation` by `additional` bytes
///
/// This must always succeed
fn grow(&self, allocation: &TrackedAllocation, additional: usize);
fn grow(&self, reservation: &MemoryReservation, additional: usize);

/// Infallibly shrink the provided `allocation` by `shrink` bytes
fn shrink(&self, allocation: &TrackedAllocation, shrink: usize);
/// Infallibly shrink the provided `reservation` by `shrink` bytes
fn shrink(&self, reservation: &MemoryReservation, shrink: usize);

/// Attempt to grow the provided `allocation` by `additional` bytes
/// Attempt to grow the provided `reservation` by `additional` bytes
///
/// On error the `allocation` will not be increased in size
fn try_grow(&self, allocation: &TrackedAllocation, additional: usize) -> Result<()>;
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;

/// Return the total amount of memory allocated
fn allocated(&self) -> usize;
}

/// Options associated with a [`TrackedAllocation`]
/// A memory consumer that can be tracked by [`MemoryReservation`] in a [`MemoryPool`]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

#[derive(Debug)]
pub struct AllocationOptions {
pub struct MemoryConsumer {
name: String,
can_spill: bool,
}

impl AllocationOptions {
/// Create a new [`AllocationOptions`]
pub fn new(name: String) -> Self {
impl MemoryConsumer {
/// Create a new empty [`MemoryConsumer`] that can be grown using [`MemoryReservation`]
pub fn new(name: impl Into<String>) -> Self {
Self {
name,
name: name.into(),
can_spill: false,
}
}
Expand All @@ -80,47 +80,35 @@ impl AllocationOptions {
pub fn name(&self) -> &str {
&self.name
}

/// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning
/// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation
pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
Comment on lines +89 to +90
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice interface

pool.register(&self);
MemoryReservation {
consumer: self,
size: 0,
policy: Arc::clone(pool),
}
}
}

/// A [`TrackedAllocation`] tracks a reservation of memory in a [`MemoryPool`]
/// A [`MemoryReservation`] tracks a reservation of memory in a [`MemoryPool`]
/// that is freed back to the pool on drop
#[derive(Debug)]
pub struct TrackedAllocation {
options: AllocationOptions,
pub struct MemoryReservation {
consumer: MemoryConsumer,
size: usize,
policy: Arc<dyn MemoryPool>,
}

impl TrackedAllocation {
/// Create a new [`TrackedAllocation`] in the provided [`MemoryPool`]
pub fn new(pool: &Arc<dyn MemoryPool>, name: String) -> Self {
Self::new_with_options(pool, AllocationOptions::new(name))
}

/// Create a new [`TrackedAllocation`] in the provided [`MemoryPool`]
pub fn new_with_options(
pool: &Arc<dyn MemoryPool>,
options: AllocationOptions,
) -> Self {
pool.allocate(&options);
Self {
options,
size: 0,
policy: Arc::clone(pool),
}
}

/// Returns the size of this [`TrackedAllocation`] in bytes
impl MemoryReservation {
/// Returns the size of this reservation in bytes
pub fn size(&self) -> usize {
self.size
}

/// Returns this allocations [`AllocationOptions`]
pub fn options(&self) -> &AllocationOptions {
&self.options
}

/// Frees all bytes from this allocation returning the number of bytes freed
/// Frees all bytes from this reservation returning the number of bytes freed
pub fn free(&mut self) -> usize {
let size = self.size;
if size != 0 {
Expand All @@ -129,7 +117,7 @@ impl TrackedAllocation {
size
}

/// Frees `capacity` bytes from this allocation
/// Frees `capacity` bytes from this reservation
///
/// # Panics
///
Expand All @@ -140,7 +128,7 @@ impl TrackedAllocation {
self.size = new_size
}

/// Sets the size of this allocation to `capacity`
/// Sets the size of this reservation to `capacity`
pub fn resize(&mut self, capacity: usize) {
use std::cmp::Ordering;
match capacity.cmp(&self.size) {
Expand All @@ -150,24 +138,23 @@ impl TrackedAllocation {
}
}

/// Increase the size of this by `capacity` bytes
/// Increase the size of this reservation by `capacity` bytes
pub fn grow(&mut self, capacity: usize) {
self.policy.grow(self, capacity);
self.size += capacity;
}

/// Try to increase the size of this [`TrackedAllocation`] by `capacity` bytes
/// Try to increase the size of this reservation by `capacity` bytes
pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
self.policy.try_grow(self, capacity)?;
self.size += capacity;
Ok(())
}
}

impl Drop for TrackedAllocation {
impl Drop for MemoryReservation {
fn drop(&mut self) {
self.free();
self.policy.free(&self.options);
self.policy.unregister(&self.consumer);
}
}

Expand Down Expand Up @@ -202,7 +189,7 @@ mod tests {
#[test]
fn test_memory_pool_underflow() {
let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
let mut a1 = TrackedAllocation::new(&pool, "a1".to_string());
let mut a1 = MemoryConsumer::new("a1").register(&pool);
assert_eq!(pool.allocated(), 0);

a1.grow(100);
Expand All @@ -217,7 +204,7 @@ mod tests {
a1.try_grow(30).unwrap();
assert_eq!(pool.allocated(), 30);

let mut a2 = TrackedAllocation::new(&pool, "a2".to_string());
let mut a2 = MemoryConsumer::new("a2").register(&pool);
a2.try_grow(25).unwrap_err();
assert_eq!(pool.allocated(), 30);

Expand Down
Loading