Skip to content

Commit

Permalink
models: refactor Schema APIs and introduce relaxation
Browse files Browse the repository at this point in the history
Define a new flow://relaxed-write-schema reference for use within read
schemas, and a Schema::to_relaxed_schema() routine for relaxing a Schema.

Make Schema::add_defs() public and the primary interface for adding
definitions. Begin to deprecate Schema::extend_read_bundle() and
Schema::build_read_schema_bundle(). These will be replaced with direct
usage of Schema::add_defs().

Update flow-web to use Schema::add_defs() and to support
flow://relaxed-write-schema.

Update the agent to drop support for removing a historically inlined
flow://write-schema instances, as we've already removed these.
  • Loading branch information
jgraettinger committed Feb 14, 2025
1 parent ed98526 commit 99cb868
Show file tree
Hide file tree
Showing 8 changed files with 593 additions and 592 deletions.
147 changes: 0 additions & 147 deletions crates/agent/src/controllers/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,39 +111,6 @@ pub async fn update_inferred_schema<C: ControlPlane>(
.await
.context("fetching inferred schema")?;

// If the read schema includes a bundled write schema, remove it.
// TODO: remove this code once all production collections have been updated.
let must_remove_write_schema = read_schema_bundles_write_schema(collection_def);
if must_remove_write_schema {
let mut pending_pub = PendingPublication::new();
let draft = pending_pub.start_spec_update(state, "removing bundled write schema");
let draft_row = draft.collections.get_or_insert_with(&collection_name, || {
tables::DraftCollection {
collection: collection_name.clone(),
scope: tables::synthetic_scope(models::CatalogType::Collection, &collection_name),
expect_pub_id: Some(state.last_pub_id),
model: Some(collection_def.clone()),
is_touch: false, // We intend to update the model
}
});
let (removed, new_schema) = collection_def
.read_schema
.as_ref()
.unwrap()
.remove_bundled_write_schema();
if removed {
draft_row.model.as_mut().unwrap().read_schema = Some(new_schema);
tracing::info!("removing bundled write schema");
} else {
tracing::warn!("bundled write schema was not removed");
}
pending_pub
.finish(state, publication_status, control_plane)
.await?
.error_for_status()?;
return Ok(true);
}

if let Some(inferred_schema) = maybe_inferred_schema {
let mut pending_pub = PendingPublication::new();
let tables::InferredSchema {
Expand Down Expand Up @@ -193,124 +160,10 @@ pub async fn update_inferred_schema<C: ControlPlane>(
Ok(false)
}

fn read_schema_bundles_write_schema(model: &models::CollectionDef) -> bool {
let Some(read_schema) = &model.read_schema else {
return false;
};
// This is a little hacky, but works to identify schemas that bundle the write schema
// without needing to actually parse the entire schema. The three expected occurrences
// of the url are: the key in `$defs`, the `$id` of the bundled schema, and the `$ref`.
read_schema
.get()
.matches(models::Schema::REF_WRITE_SCHEMA_URL)
.count()
>= 3
}

pub fn uses_inferred_schema(collection: &models::CollectionDef) -> bool {
collection
.read_schema
.as_ref()
.map(models::Schema::references_inferred_schema)
.unwrap_or(false)
}

#[cfg(test)]
mod test {
use super::*;

#[test]
fn test_read_schema_bundles_write_schema() {
let collection_json = r##"{
"writeSchema": {
"properties": {
"id": {
"type": "string"
}
},
"type": "object",
"x-infer-schema": true
},
"readSchema": {
"$defs": {
"flow://inferred-schema": {
"$id": "flow://inferred-schema",
"$schema": "https://json-schema.org/draft/2019-09/schema",
"additionalProperties": false,
"properties": {
"id": { "type": "string" },
"a": { "type": "string" },
"hello": { "type": "string" }
},
"required": [
"aa",
"hello",
"id"
],
"type": "object"
},
"flow://write-schema": {
"$id": "flow://write-schema",
"properties": {
"id": { "type": "string" }
},
"required": [
"id"
],
"type": "object",
"x-infer-schema": true
}
},
"allOf": [
{
"$ref": "flow://write-schema"
},
{
"$ref": "flow://inferred-schema"
}
]
},
"key": [
"/id"
]
}"##;
let mut collection: models::CollectionDef = serde_json::from_str(collection_json).unwrap();
assert!(read_schema_bundles_write_schema(&collection));

collection.read_schema = Some(models::Schema::new(
models::RawValue::from_str(
r##"{
"$defs": {
"flow://inferred-schema": {
"$id": "flow://inferred-schema",
"$schema": "https://json-schema.org/draft/2019-09/schema",
"additionalProperties": false,
"properties": {
"id": { "type": "string" },
"a": { "type": "string" },
"hello": { "type": "string" }
},
"required": [
"aa",
"hello",
"id"
],
"type": "object"
}
},
"allOf": [
{
"$ref": "flow://write-schema"
},
{
"$ref": "flow://inferred-schema"
}
]
}"##,
)
.unwrap(),
));

assert!(!read_schema_bundles_write_schema(&collection));
}
}
26 changes: 1 addition & 25 deletions crates/agent/src/integration_tests/schema_evolution.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use std::collections::{BTreeMap, BTreeSet};

use super::harness::{draft_catalog, mock_inferred_schema, FailBuild, TestHarness};
use crate::{
controllers::ControllerState,
publications::{JobStatus, UncommittedBuild},
ControlPlane,
};
use crate::{controllers::ControllerState, publications::UncommittedBuild, ControlPlane};
use models::CatalogType;
use proto_flow::materialize::response::validated::constraint::Type as ConstraintType;
use tables::BuiltRow;
Expand Down Expand Up @@ -176,26 +172,6 @@ async fn test_schema_evolution() {
assert!(totes_spec
.read_schema_json
.contains("inferredSchemaIsNotAvailable"));
// Assert that the bundled write schema has been removed. We expect one reference to
// the write schema url, down from 3 originally.
// TODO: we can remove these assertions (and the bundled write schema in the setup) once
// all the collections have been updated.
let totes_model = totes_state
.live_spec
.as_ref()
.unwrap()
.as_collection()
.unwrap();
assert_eq!(
1,
totes_model
.read_schema
.as_ref()
.unwrap()
.get()
.matches(models::Schema::REF_WRITE_SCHEMA_URL)
.count()
);
// Assert that the schema in the built spec _does_ contain the bundled write schema
assert_eq!(
3,
Expand Down
39 changes: 37 additions & 2 deletions crates/flow-web/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ pub fn extend_read_bundle(input: JsValue) -> Result<JsValue, JsValue> {
write: models::Schema,
inferred: Option<models::Schema>,
}
let relaxed: Option<models::Schema>;

let Input {
read,
Expand All @@ -150,9 +151,43 @@ pub fn extend_read_bundle(input: JsValue) -> Result<JsValue, JsValue> {
} = serde_json::from_value(input)
.map_err(|err| JsValue::from_str(&format!("invalid input: {:?}", err)))?;

let output = models::Schema::extend_read_bundle(&read, Some(&write), inferred.as_ref());
let mut defs = Vec::new();

serde_wasm_bindgen::to_value(&output).map_err(|err| JsValue::from_str(&format!("{err:?}")))
if read.references_write_schema() {
defs.push(models::schemas::AddDef {
id: models::Schema::REF_WRITE_SCHEMA_URL,
schema: &write,
overwrite: true,
});
}
if read.references_relaxed_write_schema() {
relaxed = inferred
.is_some()
.then(|| write.to_relaxed_schema())
.transpose()
.map_err(|err| JsValue::from_str(&format!("{err:?}")))?;

defs.push(models::schemas::AddDef {
id: models::Schema::REF_RELAXED_WRITE_SCHEMA_URL,
schema: relaxed.as_ref().unwrap_or(&write),
overwrite: true,
});
}
if read.references_inferred_schema() {
defs.push(models::schemas::AddDef {
id: models::Schema::REF_INFERRED_SCHEMA_URL,
schema: inferred
.as_ref()
.unwrap_or(models::Schema::inferred_schema_placeholder()),
overwrite: false,
})
}

let outcome = read
.add_defs(&defs)
.map_err(|err| JsValue::from_str(&format!("{err:?}")))?;

serde_wasm_bindgen::to_value(&outcome).map_err(|err| JsValue::from_str(&format!("{err:?}")))
}

#[wasm_bindgen]
Expand Down
Loading

0 comments on commit 99cb868

Please sign in to comment.