Skip to content

Commit e06db13

Browse files
committed
adapter: deoptionalize oracle_write_ts in Catalog::transact
The conditions under which oracle_write_ts was allowed to be None in calls to Catalog::transact were not documented. Make the API more typesafe by removing the option, as an oracle-derived write timestamp is available in all production codepaths.
1 parent c6a8756 commit e06db13

File tree

5 files changed

+64
-40
lines changed

5 files changed

+64
-40
lines changed

src/adapter/benches/catalog.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ fn bench_transact(c: &mut Criterion) {
125125
public_schema_oid: id,
126126
}];
127127
catalog
128-
.transact(Some(mz_repr::Timestamp::MIN), None, ops, |_| Ok(()))
128+
.transact(mz_repr::Timestamp::MIN, None, ops, |_| Ok(()))
129129
.await
130130
.unwrap();
131131
})

src/adapter/src/catalog.rs

+13-9
Original file line numberDiff line numberDiff line change
@@ -1251,7 +1251,7 @@ impl CatalogState {
12511251
// passing tx, session, and builtin_table_updates?
12521252
fn add_to_audit_log<S: Append>(
12531253
&self,
1254-
oracle_write_ts: Option<mz_repr::Timestamp>,
1254+
oracle_write_ts: mz_repr::Timestamp,
12551255
session: Option<&Session>,
12561256
tx: &mut storage::Transaction<S>,
12571257
builtin_table_updates: &mut Vec<BuiltinTableUpdate>,
@@ -1261,7 +1261,7 @@ impl CatalogState {
12611261
details: EventDetails,
12621262
) -> Result<(), Error> {
12631263
let user = session.map(|session| session.user().name.to_string());
1264-
let occurred_at = oracle_write_ts.expect("must exist").into();
1264+
let occurred_at = oracle_write_ts.into();
12651265
let id = tx.get_and_increment_id(storage::AUDIT_LOG_ID_ALLOC_KEY.to_string())?;
12661266
let event = VersionedEvent::new(id, event_type, object_type, details, user, occurred_at);
12671267
builtin_table_updates.push(self.pack_audit_log_update(&event)?);
@@ -2739,7 +2739,12 @@ impl<S: Append> Catalog<S> {
27392739
bootstrap_system_parameters: BTreeMap<String, String>,
27402740
system_parameter_frontend: Option<Arc<SystemParameterFrontend>>,
27412741
) -> Result<(), AdapterError> {
2742-
let system_config = self.storage().await.load_system_configuration().await?;
2742+
let (system_config, boot_ts) = {
2743+
let mut storage = self.storage().await;
2744+
let system_config = storage.load_system_configuration().await?;
2745+
let boot_ts = storage.boot_ts();
2746+
(system_config, boot_ts)
2747+
};
27432748
for (name, value) in &bootstrap_system_parameters {
27442749
if !system_config.contains_key(name) {
27452750
self.state.insert_system_configuration(name, value)?;
@@ -2810,8 +2815,7 @@ impl<S: Append> Catalog<S> {
28102815
Op::UpdateSystemConfiguration { name, value }
28112816
}))
28122817
.collect::<Vec<_>>();
2813-
2814-
self.transact(None, None, ops, |_| Ok(())).await.unwrap();
2818+
self.transact(boot_ts, None, ops, |_| Ok(())).await.unwrap();
28152819
tracing::info!("parameter sync on boot: end sync");
28162820
} else {
28172821
tracing::info!("parameter sync on boot: skipping sync as config has synced once");
@@ -3850,7 +3854,7 @@ impl<S: Append> Catalog<S> {
38503854
#[tracing::instrument(level = "debug", skip_all)]
38513855
pub async fn transact<F, R>(
38523856
&mut self,
3853-
oracle_write_ts: Option<mz_repr::Timestamp>,
3857+
oracle_write_ts: mz_repr::Timestamp,
38543858
session: Option<&Session>,
38553859
ops: Vec<Op>,
38563860
f: F,
@@ -3915,7 +3919,7 @@ impl<S: Append> Catalog<S> {
39153919
}
39163920

39173921
fn transact_inner(
3918-
oracle_write_ts: Option<mz_repr::Timestamp>,
3922+
oracle_write_ts: mz_repr::Timestamp,
39193923
session: Option<&Session>,
39203924
ops: Vec<Op>,
39213925
temporary_ids: Vec<GlobalId>,
@@ -6428,7 +6432,7 @@ mod tests {
64286432
assert_eq!(catalog.transient_revision(), 1);
64296433
catalog
64306434
.transact(
6431-
Some(mz_repr::Timestamp::MIN),
6435+
mz_repr::Timestamp::MIN,
64326436
None,
64336437
vec![Op::CreateDatabase {
64346438
name: "test".to_string(),
@@ -6681,7 +6685,7 @@ mod tests {
66816685
.clone();
66826686
catalog
66836687
.transact(
6684-
Some(mz_repr::Timestamp::MIN),
6688+
mz_repr::Timestamp::MIN,
66856689
None,
66866690
vec![Op::CreateItem {
66876691
id,

src/adapter/src/catalog/storage.rs

+48-28
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,7 @@ pub struct BootstrapArgs {
529529
pub struct Connection<S> {
530530
stash: S,
531531
consolidations_tx: mpsc::UnboundedSender<Vec<mz_stash::Id>>,
532+
boot_ts: mz_repr::Timestamp,
532533
}
533534

534535
impl<S: Append> Connection<S> {
@@ -555,29 +556,46 @@ impl<S: Append> Connection<S> {
555556

556557
// Initialize connection.
557558
initialize_stash(&mut stash).await?;
559+
560+
// Choose a time at which to boot. This is the time at which we will run
561+
// internal migrations, and is also exposed upwards in case higher
562+
// layers want to run their own migrations at the same timestamp.
563+
//
564+
// This time is usually the current system time, but with protection
565+
// against backwards time jumps, even across restarts.
566+
let previous_now_ts = try_get_persisted_timestamp(&mut stash, &Timeline::EpochMilliseconds)
567+
.await?
568+
.unwrap_or(mz_repr::Timestamp::MIN);
569+
let boot_ts = timeline::monotonic_now(now, previous_now_ts);
570+
558571
let mut conn = Connection {
559572
stash,
560573
consolidations_tx,
574+
boot_ts,
561575
};
562576

563577
if !conn.stash.is_readonly() {
564-
// Choose a time at which to apply migrations. This is usually the
565-
// current system time, but with protection against backwards time
566-
// jumps, even across restarts.
567-
let previous_now_ts = conn
568-
.try_get_persisted_timestamp(&Timeline::EpochMilliseconds)
569-
.await?
570-
.unwrap_or(mz_repr::Timestamp::MIN);
571-
let now_ts = timeline::monotonic_now(now, previous_now_ts);
572578
// IMPORTANT: we durably record the new timestamp before using it.
573-
conn.persist_timestamp(&Timeline::EpochMilliseconds, now_ts)
579+
conn.persist_timestamp(&Timeline::EpochMilliseconds, boot_ts)
574580
.await?;
575-
576-
migrate(&mut conn.stash, skip, now_ts.into(), bootstrap_args).await?;
581+
migrate(&mut conn.stash, skip, boot_ts.into(), bootstrap_args).await?;
577582
}
578583

579584
Ok(conn)
580585
}
586+
587+
/// Returns the timestamp at which the storage layer booted.
588+
///
589+
/// This is the timestamp that will have been used to write any data during
590+
/// migrations. It is exposed so that higher layers performing their own
591+
/// migrations can write data at the same timestamp, if desired.
592+
///
593+
/// The boot timestamp is derived from the durable timestamp oracle and is
594+
/// guaranteed to never go backwards, even in the face of backwards time
595+
/// jumps across restarts.
596+
pub fn boot_ts(&self) -> mz_repr::Timestamp {
597+
self.boot_ts
598+
}
581599
}
582600

583601
impl<S: Append> Connection<S> {
@@ -917,23 +935,6 @@ impl<S: Append> Connection<S> {
917935
.collect())
918936
}
919937

920-
/// Get a global timestamp for a timeline that has been persisted to disk.
921-
///
922-
/// Returns `None` if no persisted timestamp for the specified timeline
923-
/// exists.
924-
pub async fn try_get_persisted_timestamp(
925-
&mut self,
926-
timeline: &Timeline,
927-
) -> Result<Option<mz_repr::Timestamp>, Error> {
928-
let key = TimestampKey {
929-
id: timeline.to_string(),
930-
};
931-
Ok(COLLECTION_TIMESTAMP
932-
.peek_key_one(&mut self.stash, &key)
933-
.await?
934-
.map(|v| v.ts))
935-
}
936-
937938
/// Persist new global timestamp for a timeline to disk.
938939
#[tracing::instrument(level = "debug", skip(self))]
939940
pub async fn persist_timestamp(
@@ -971,6 +972,25 @@ impl<S: Append> Connection<S> {
971972
}
972973
}
973974

975+
/// Gets a global timestamp for a timeline that has been persisted to disk.
976+
///
977+
/// Returns `None` if no persisted timestamp for the specified timeline exists.
978+
async fn try_get_persisted_timestamp<S>(
979+
stash: &mut S,
980+
timeline: &Timeline,
981+
) -> Result<Option<mz_repr::Timestamp>, Error>
982+
where
983+
S: Append,
984+
{
985+
let key = TimestampKey {
986+
id: timeline.to_string(),
987+
};
988+
Ok(COLLECTION_TIMESTAMP
989+
.peek_key_one(stash, &key)
990+
.await?
991+
.map(|v| v.ts))
992+
}
993+
974994
#[tracing::instrument(level = "trace", skip_all)]
975995
pub async fn transaction<'a, S: Append>(stash: &'a mut S) -> Result<Transaction<'a, S>, Error> {
976996
let databases = COLLECTION_DATABASE.peek_one(stash).await?;

src/adapter/src/coord/ddl.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ impl<S: Append + 'static> Coordinator<S> {
234234
result,
235235
} = self
236236
.catalog
237-
.transact(Some(oracle_write_ts), session, ops, |catalog| {
237+
.transact(oracle_write_ts, session, ops, |catalog| {
238238
f(CatalogTxn {
239239
dataflow_client: &self.controller,
240240
catalog,

src/adapter/tests/sql.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ async fn datadriven() {
139139
.clone();
140140
catalog
141141
.transact(
142-
Some(mz_repr::Timestamp::MIN),
142+
mz_repr::Timestamp::MIN,
143143
None,
144144
vec![Op::CreateItem {
145145
id,

0 commit comments

Comments
 (0)