Skip to content

Commit

Permalink
[ENH] Config to disable compactor on collections (#3469)
Browse files Browse the repository at this point in the history
## Description of changes

*Summarize the changes made by this PR.*
 - Improvements & Bug fixes
- Can specify a list of collection ids that needs to disabled and
compactor will respect that.
- In runtime, can set the env
`CHROMA_COMPACTION_SERVICE__COMPACTOR__DISABLED_COLLECTIONS=[list of
collection ids]` or
`CHROMA_COMPACTION_SERVICE.COMPACTOR.DISABLED_COLLECTIONS=[list of
collection ids]` and the changes will take effect without restarting the
pod

## Test plan
*How are these changes tested?*
- [x] Tests pass locally with `pytest` for python, `yarn test` for js,
`cargo test` for rust

## Documentation Changes
None
  • Loading branch information
sanketkedia authored Jan 13, 2025
1 parent d6c8f20 commit a632f2b
Show file tree
Hide file tree
Showing 5 changed files with 147 additions and 0 deletions.
1 change: 1 addition & 0 deletions rust/worker/chroma_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ compaction_service:
min_compaction_size: 10
max_compaction_size: 10000
max_partition_size: 5000
disabled_collections: [] # uuids to disable compaction for
blockfile_provider:
Arrow:
block_manager_config:
Expand Down
10 changes: 10 additions & 0 deletions rust/worker/src/compactor/compaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,17 @@ use chroma_system::{Component, ComponentContext, ComponentHandle, Handler, Syste
use chroma_types::CollectionUuid;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use std::collections::HashSet;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::str::FromStr;
use std::time::Duration;
use thiserror::Error;
use tracing::instrument;
use tracing::span;
use tracing::Instrument;
use tracing::Span;
use uuid::Uuid;

pub(crate) struct CompactionManager {
system: Option<System>,
Expand Down Expand Up @@ -220,6 +223,11 @@ impl Configurable<CompactionServiceConfig> for CompactionManager {
let min_compaction_size = config.compactor.min_compaction_size;
let max_compaction_size = config.compactor.max_compaction_size;
let max_partition_size = config.compactor.max_partition_size;
let mut disabled_collections =
HashSet::with_capacity(config.compactor.disabled_collections.len());
for collection_id_str in &config.compactor.disabled_collections {
disabled_collections.insert(CollectionUuid(Uuid::from_str(collection_id_str).unwrap()));
}

let assignment_policy_config = &config.assignment_policy;
let assignment_policy = match crate::assignment::from_config(assignment_policy_config).await
Expand All @@ -237,6 +245,7 @@ impl Configurable<CompactionServiceConfig> for CompactionManager {
max_concurrent_jobs,
min_compaction_size,
assignment_policy,
disabled_collections,
);

let blockfile_provider = BlockfileProvider::try_from_config(&(
Expand Down Expand Up @@ -519,6 +528,7 @@ mod tests {
max_concurrent_jobs,
min_compaction_size,
assignment_policy,
HashSet::new(),
);
// Set memberlist
scheduler.set_memberlist(vec![my_member_id.clone()]);
Expand Down
1 change: 1 addition & 0 deletions rust/worker/src/compactor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ pub(crate) struct CompactorConfig {
pub(crate) min_compaction_size: usize,
pub(crate) max_compaction_size: usize,
pub(crate) max_partition_size: usize,
pub(crate) disabled_collections: Vec<String>,
}
89 changes: 89 additions & 0 deletions rust/worker/src/compactor/scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
use std::collections::HashSet;
use std::str::FromStr;

use chroma_types::CollectionUuid;
use figment::providers::Env;
use figment::Figment;
use serde::Deserialize;
use uuid::Uuid;

use crate::assignment::assignment_policy::AssignmentPolicy;
use crate::compactor::scheduler_policy::SchedulerPolicy;
use crate::compactor::types::CompactionJob;
Expand All @@ -17,9 +26,16 @@ pub(crate) struct Scheduler {
min_compaction_size: usize,
memberlist: Option<Memberlist>,
assignment_policy: Box<dyn AssignmentPolicy>,
disabled_collections: HashSet<CollectionUuid>,
}

#[derive(Deserialize, Debug)]
struct RunTimeConfig {
disabled_collections: Vec<String>,
}

impl Scheduler {
#[allow(clippy::too_many_arguments)]
pub(crate) fn new(
my_ip: String,
log: Box<Log>,
Expand All @@ -28,6 +44,7 @@ impl Scheduler {
max_concurrent_jobs: usize,
min_compaction_size: usize,
assignment_policy: Box<dyn AssignmentPolicy>,
disabled_collections: HashSet<CollectionUuid>,
) -> Scheduler {
Scheduler {
my_ip,
Expand All @@ -39,6 +56,7 @@ impl Scheduler {
max_concurrent_jobs,
memberlist: None,
assignment_policy,
disabled_collections,
}
}

Expand All @@ -63,6 +81,16 @@ impl Scheduler {
) -> Vec<CollectionRecord> {
let mut collection_records = Vec::new();
for collection_info in collections {
if self
.disabled_collections
.contains(&collection_info.collection_id)
{
tracing::info!(
"Ignoring collection: {:?} because it disabled for compaction",
collection_info.collection_id
);
continue;
}
let collection_id = Some(collection_info.collection_id);
// TODO: add a cache to avoid fetching the same collection multiple times
let result = self
Expand Down Expand Up @@ -161,13 +189,39 @@ impl Scheduler {
self.job_queue.extend(jobs);
}

pub(crate) fn recompute_disabled_collections(&mut self) {
let config = Figment::new()
.merge(
Env::prefixed("CHROMA_")
.map(|k| k.as_str().replace("__", ".").into())
.map(|k| {
if k == "COMPACTION_SERVICE.COMPACTOR.DISABLED_COLLECTIONS" {
k["COMPACTION_SERVICE.COMPACTOR.".len()..].into()
} else {
k.into()
}
})
.only(&["DISABLED_COLLECTIONS"]),
)
.extract::<RunTimeConfig>();
if let Ok(config) = config {
self.disabled_collections = config
.disabled_collections
.iter()
.map(|collection| CollectionUuid(Uuid::from_str(collection).unwrap()))
.collect();
}
}

pub(crate) async fn schedule(&mut self) {
// For now, we clear the job queue every time, assuming we will not have any pending jobs running
self.job_queue.clear();
if self.memberlist.is_none() || self.memberlist.as_ref().unwrap().is_empty() {
tracing::error!("Memberlist is not set or empty. Cannot schedule compaction jobs.");
return;
}
// Recompute disabled list.
self.recompute_disabled_collections();
let collections = self.get_collections_with_new_data().await;
if collections.is_empty() {
return;
Expand Down Expand Up @@ -300,6 +354,7 @@ mod tests {
max_concurrent_jobs,
1,
assignment_policy,
HashSet::new(),
);
// Scheduler does nothing without memberlist
scheduler.schedule().await;
Expand Down Expand Up @@ -338,6 +393,39 @@ mod tests {
assert_eq!(jobs[0].collection_id, collection_uuid_2,);
assert_eq!(jobs[1].collection_id, collection_uuid_1,);

// Set disable list.
std::env::set_var(
"CHROMA_COMPACTION_SERVICE__COMPACTOR__DISABLED_COLLECTIONS",
"[\"00000000-0000-0000-0000-000000000001\"]",
);
scheduler.schedule().await;
let jobs = scheduler.get_jobs();
let jobs = jobs.collect::<Vec<&CompactionJob>>();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].collection_id, collection_uuid_2,);
std::env::set_var(
"CHROMA_COMPACTION_SERVICE__COMPACTOR__DISABLED_COLLECTIONS",
"[]",
);
// Even . should work.
std::env::set_var(
"CHROMA_COMPACTION_SERVICE.COMPACTOR.DISABLED_COLLECTIONS",
"[\"00000000-0000-0000-0000-000000000002\"]",
);
std::env::set_var(
"CHROMA_COMPACTION_SERVICE.IRRELEVANT",
"[\"00000000-0000-0000-0000-000000000001\"]",
);
scheduler.schedule().await;
let jobs = scheduler.get_jobs();
let jobs = jobs.collect::<Vec<&CompactionJob>>();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].collection_id, collection_uuid_1,);
std::env::set_var(
"CHROMA_COMPACTION_SERVICE.COMPACTOR.DISABLED_COLLECTIONS",
"[]",
);

// Test filter_collections
let member_1 = "1".to_string();
let member_2 = "5".to_string();
Expand Down Expand Up @@ -477,6 +565,7 @@ mod tests {
max_concurrent_jobs,
1,
assignment_policy,
HashSet::new(),
);

scheduler.set_memberlist(vec![my_ip.clone()]);
Expand Down
46 changes: 46 additions & 0 deletions rust/worker/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ mod tests {
use super::*;
use figment::Jail;
use serial_test::serial;
use uuid::Uuid;

#[test]
#[serial]
Expand Down Expand Up @@ -261,6 +262,7 @@ mod tests {
min_compaction_size: 10
max_compaction_size: 10000
max_partition_size: 5000
disabled_collections: ["74b3240e-a2b0-43d7-8adb-f55a394964a1", "496db4aa-fbe1-498a-b60b-81ec0fe59792"]
blockfile_provider:
Arrow:
block_manager_config:
Expand Down Expand Up @@ -289,6 +291,24 @@ mod tests {
"compaction-service-0"
);
assert_eq!(config.compaction_service.my_port, 50051);
assert_eq!(
config
.compaction_service
.compactor
.disabled_collections
.len(),
2
);
assert_eq!(
Uuid::parse_str(&config.compaction_service.compactor.disabled_collections[0])
.unwrap(),
Uuid::parse_str("74b3240e-a2b0-43d7-8adb-f55a394964a1").unwrap()
);
assert_eq!(
Uuid::parse_str(&config.compaction_service.compactor.disabled_collections[1])
.unwrap(),
Uuid::parse_str("496db4aa-fbe1-498a-b60b-81ec0fe59792").unwrap()
);
Ok(())
});
}
Expand Down Expand Up @@ -407,6 +427,7 @@ mod tests {
min_compaction_size: 10
max_compaction_size: 10000
max_partition_size: 5000
disabled_collections: []
blockfile_provider:
Arrow:
block_manager_config:
Expand Down Expand Up @@ -571,6 +592,7 @@ mod tests {
min_compaction_size: 10
max_compaction_size: 10000
max_partition_size: 5000
disabled_collections: []
blockfile_provider:
Arrow:
block_manager_config:
Expand Down Expand Up @@ -629,6 +651,10 @@ mod tests {
"CHROMA_COMPACTION_SERVICE__STORAGE__S3__REQUEST_TIMEOUT_MS",
1000,
);
jail.set_env(
"CHROMA_COMPACTION_SERVICE__COMPACTOR__DISABLED_COLLECTIONS",
"[\"74b3240e-a2b0-43d7-8adb-f55a394964a1\",\"496db4aa-fbe1-498a-b60b-81ec0fe59792\"]",
);
let _ = jail.create_file(
"chroma_config.yaml",
r#"
Expand Down Expand Up @@ -722,6 +748,7 @@ mod tests {
min_compaction_size: 10
max_compaction_size: 10000
max_partition_size: 5000
disabled_collections: ["c92e4d75-eb25-4295-82d8-7c53dbd33258"]
blockfile_provider:
Arrow:
block_manager_config:
Expand Down Expand Up @@ -763,6 +790,24 @@ mod tests {
}
_ => panic!("Invalid storage config"),
}
assert_eq!(
config
.compaction_service
.compactor
.disabled_collections
.len(),
2
);
assert_eq!(
Uuid::parse_str(&config.compaction_service.compactor.disabled_collections[0])
.unwrap(),
Uuid::parse_str("74b3240e-a2b0-43d7-8adb-f55a394964a1").unwrap()
);
assert_eq!(
Uuid::parse_str(&config.compaction_service.compactor.disabled_collections[1])
.unwrap(),
Uuid::parse_str("496db4aa-fbe1-498a-b60b-81ec0fe59792").unwrap()
);
Ok(())
});
}
Expand Down Expand Up @@ -884,6 +929,7 @@ mod tests {
min_compaction_size: 10
max_compaction_size: 10000
max_partition_size: 5000
disabled_collections: []
blockfile_provider:
Arrow:
block_manager_config:
Expand Down

0 comments on commit a632f2b

Please sign in to comment.