Skip to content

Commit

Permalink
[ENH] Use the log position from sysdb to pull log
Browse files Browse the repository at this point in the history
  • Loading branch information
Ishiihara committed May 3, 2024
1 parent eb5f3e2 commit 03ce652
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 9 deletions.
4 changes: 3 additions & 1 deletion rust/worker/src/compactor/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ impl Scheduler {
}

// TODO: make querying the last compaction time in batch
let log_position_in_collecion = collection[0].log_position;
let tenant_ids = vec![collection[0].tenant.clone()];
let tenant = self.sysdb.get_last_compaction_time(tenant_ids).await;

Expand All @@ -101,8 +102,9 @@ impl Scheduler {
tenant_id: collection[0].tenant.clone(),
last_compaction_time,
first_record_time: collection_info.first_log_ts,
offset: collection_info.first_log_offset,
offset_in_log: collection_info.first_log_offset,
collection_version: collection[0].version,
offset_in_sysdb: log_position_in_collecion,
});
}
Err(e) => {
Expand Down
38 changes: 35 additions & 3 deletions rust/worker/src/compactor/scheduler_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,21 @@ impl SchedulerPolicy for LasCompactionTimeSchedulerPolicy {
};
let mut tasks = Vec::new();
for collection in &collections[0..number_tasks as usize] {
// ofsset in log is the first offset in the log that has not been compacted. Note that
// since the offset is the first offset of log we get from the log service, we should
// use this offset to pull data from the log service.
let mut offset = collection.offset_in_log;
if collection.offset_in_sysdb < offset {
panic!("offset in sysdb is less than offset in log, this should not happen!");
} else {
// The offset in sysdb is the last offset that has been compacted.
// We need to start from the next offset.
offset = collection.offset_in_sysdb + 1;
}
tasks.push(CompactionJob {
collection_id: collection.id.clone(),
tenant_id: collection.tenant_id.clone(),
offset: collection.offset,
offset,
collection_version: collection.collection_version,
});
}
Expand All @@ -71,15 +82,17 @@ mod tests {
tenant_id: "test".to_string(),
last_compaction_time: 1,
first_record_time: 1,
offset: 0,
offset_in_log: 0,
offset_in_sysdb: 0,
collection_version: 0,
},
CollectionRecord {
id: collection_uuid_2,
tenant_id: "test".to_string(),
last_compaction_time: 0,
first_record_time: 0,
offset: 0,
offset_in_log: 0,
offset_in_sysdb: 0,
collection_version: 0,
},
];
Expand All @@ -92,4 +105,23 @@ mod tests {
assert_eq!(jobs[0].collection_id, collection_uuid_2);
assert_eq!(jobs[1].collection_id, collection_uuid_1);
}

#[test]
#[should_panic(
expected = "offset in sysdb is less than offset in log, this should not happen!"
)]
fn test_scheduler_policy_panic() {
let collection_uuid_1 = Uuid::from_str("00000000-0000-0000-0000-000000000001").unwrap();
let scheduler_policy = LasCompactionTimeSchedulerPolicy {};
let collections = vec![CollectionRecord {
id: collection_uuid_1,
tenant_id: "test".to_string(),
last_compaction_time: 1,
first_record_time: 1,
offset_in_log: 1,
offset_in_sysdb: 0,
collection_version: 0,
}];
let _jobs = scheduler_policy.determine(collections.clone(), 1);
}
}
1 change: 0 additions & 1 deletion rust/worker/src/execution/operators/pull_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::log::log::Log;
use crate::log::log::PullLogsError;
use crate::types::LogRecord;
use async_trait::async_trait;
use tracing::debug;
use tracing::trace;
use uuid::Uuid;

Expand Down
5 changes: 2 additions & 3 deletions rust/worker/src/execution/orchestration/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ impl CompactOrchestrator {
// TODO: It is possible that the offset_id from the compaction job is wrong since the log service
// can have an outdated view of the offset. We should filter out entries from the log based on the start offset
// of the segment, and not fully respect the offset_id from the compaction job

async fn pull_logs(&mut self, self_address: Box<dyn Receiver<PullLogsResult>>) {
self.state = ExecutionState::PullLogs;
let operator = PullLogsOperator::new(self.log.clone());
Expand Down Expand Up @@ -268,7 +267,7 @@ impl CompactOrchestrator {
}
}

async fn flush_sysdb(
async fn register(
&mut self,
log_position: i64,
segment_flush_info: Arc<[SegmentFlushInfo]>,
Expand Down Expand Up @@ -522,7 +521,7 @@ impl Handler<FlushS3Result> for CompactOrchestrator {
match message {
Ok(msg) => {
// Unwrap should be safe here as we are guaranteed to have a value by construction
self.flush_sysdb(
self.register(
self.pulled_log_offset.unwrap(),
msg.segment_flush_info,
_ctx.sender.as_receiver(),
Expand Down
3 changes: 2 additions & 1 deletion rust/worker/src/log/log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ pub(crate) struct CollectionRecord {
pub(crate) tenant_id: String,
pub(crate) last_compaction_time: i64,
pub(crate) first_record_time: i64,
pub(crate) offset: i64,
pub(crate) offset_in_log: i64,
pub(crate) offset_in_sysdb: i64,
pub(crate) collection_version: i32,
}

Expand Down

0 comments on commit 03ce652

Please sign in to comment.