diff --git a/crates/agent/src/controllers/collection.rs b/crates/agent/src/controllers/collection.rs index 28b3dc5946..820e9601e1 100644 --- a/crates/agent/src/controllers/collection.rs +++ b/crates/agent/src/controllers/collection.rs @@ -111,39 +111,6 @@ pub async fn update_inferred_schema( .await .context("fetching inferred schema")?; - // If the read schema includes a bundled write schema, remove it. - // TODO: remove this code once all production collections have been updated. - let must_remove_write_schema = read_schema_bundles_write_schema(collection_def); - if must_remove_write_schema { - let mut pending_pub = PendingPublication::new(); - let draft = pending_pub.start_spec_update(state, "removing bundled write schema"); - let draft_row = draft.collections.get_or_insert_with(&collection_name, || { - tables::DraftCollection { - collection: collection_name.clone(), - scope: tables::synthetic_scope(models::CatalogType::Collection, &collection_name), - expect_pub_id: Some(state.last_pub_id), - model: Some(collection_def.clone()), - is_touch: false, // We intend to update the model - } - }); - let (removed, new_schema) = collection_def - .read_schema - .as_ref() - .unwrap() - .remove_bundled_write_schema(); - if removed { - draft_row.model.as_mut().unwrap().read_schema = Some(new_schema); - tracing::info!("removing bundled write schema"); - } else { - tracing::warn!("bundled write schema was not removed"); - } - pending_pub - .finish(state, publication_status, control_plane) - .await? - .error_for_status()?; - return Ok(true); - } - if let Some(inferred_schema) = maybe_inferred_schema { let mut pending_pub = PendingPublication::new(); let tables::InferredSchema { @@ -193,20 +160,6 @@ pub async fn update_inferred_schema( Ok(false) } -fn read_schema_bundles_write_schema(model: &models::CollectionDef) -> bool { - let Some(read_schema) = &model.read_schema else { - return false; - }; - // This is a little hacky, but works to identify schemas that bundle the write schema - // without needing to actually parse the entire schema. The three expected occurrences - // of the url are: the key in `$defs`, the `$id` of the bundled schema, and the `$ref`. - read_schema - .get() - .matches(models::Schema::REF_WRITE_SCHEMA_URL) - .count() - >= 3 -} - pub fn uses_inferred_schema(collection: &models::CollectionDef) -> bool { collection .read_schema @@ -214,103 +167,3 @@ pub fn uses_inferred_schema(collection: &models::CollectionDef) -> bool { .map(models::Schema::references_inferred_schema) .unwrap_or(false) } - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_read_schema_bundles_write_schema() { - let collection_json = r##"{ - "writeSchema": { - "properties": { - "id": { - "type": "string" - } - }, - "type": "object", - "x-infer-schema": true - }, - "readSchema": { - "$defs": { - "flow://inferred-schema": { - "$id": "flow://inferred-schema", - "$schema": "https://json-schema.org/draft/2019-09/schema", - "additionalProperties": false, - "properties": { - "id": { "type": "string" }, - "a": { "type": "string" }, - "hello": { "type": "string" } - }, - "required": [ - "aa", - "hello", - "id" - ], - "type": "object" - }, - "flow://write-schema": { - "$id": "flow://write-schema", - "properties": { - "id": { "type": "string" } - }, - "required": [ - "id" - ], - "type": "object", - "x-infer-schema": true - } - }, - "allOf": [ - { - "$ref": "flow://write-schema" - }, - { - "$ref": "flow://inferred-schema" - } - ] - }, - "key": [ - "/id" - ] - }"##; - let mut collection: models::CollectionDef = serde_json::from_str(collection_json).unwrap(); - assert!(read_schema_bundles_write_schema(&collection)); - - collection.read_schema = Some(models::Schema::new( - models::RawValue::from_str( - r##"{ - "$defs": { - "flow://inferred-schema": { - "$id": "flow://inferred-schema", - "$schema": "https://json-schema.org/draft/2019-09/schema", - "additionalProperties": false, - "properties": { - "id": { "type": "string" }, - "a": { "type": "string" }, - "hello": { "type": "string" } - }, - "required": [ - "aa", - "hello", - "id" - ], - "type": "object" - } - }, - "allOf": [ - { - "$ref": "flow://write-schema" - }, - { - "$ref": "flow://inferred-schema" - } - ] - }"##, - ) - .unwrap(), - )); - - assert!(!read_schema_bundles_write_schema(&collection)); - } -} diff --git a/crates/agent/src/integration_tests/schema_evolution.rs b/crates/agent/src/integration_tests/schema_evolution.rs index 56993f929a..1301f57710 100644 --- a/crates/agent/src/integration_tests/schema_evolution.rs +++ b/crates/agent/src/integration_tests/schema_evolution.rs @@ -1,11 +1,7 @@ use std::collections::{BTreeMap, BTreeSet}; use super::harness::{draft_catalog, mock_inferred_schema, FailBuild, TestHarness}; -use crate::{ - controllers::ControllerState, - publications::{JobStatus, UncommittedBuild}, - ControlPlane, -}; +use crate::{controllers::ControllerState, publications::UncommittedBuild, ControlPlane}; use models::CatalogType; use proto_flow::materialize::response::validated::constraint::Type as ConstraintType; use tables::BuiltRow; @@ -176,26 +172,6 @@ async fn test_schema_evolution() { assert!(totes_spec .read_schema_json .contains("inferredSchemaIsNotAvailable")); - // Assert that the bundled write schema has been removed. We expect one reference to - // the write schema url, down from 3 originally. - // TODO: we can remove these assertions (and the bundled write schema in the setup) once - // all the collections have been updated. - let totes_model = totes_state - .live_spec - .as_ref() - .unwrap() - .as_collection() - .unwrap(); - assert_eq!( - 1, - totes_model - .read_schema - .as_ref() - .unwrap() - .get() - .matches(models::Schema::REF_WRITE_SCHEMA_URL) - .count() - ); // Assert that the schema in the built spec _does_ contain the bundled write schema assert_eq!( 3, diff --git a/crates/flow-web/src/lib.rs b/crates/flow-web/src/lib.rs index 46d8c9758e..14bc90a26a 100644 --- a/crates/flow-web/src/lib.rs +++ b/crates/flow-web/src/lib.rs @@ -142,6 +142,7 @@ pub fn extend_read_bundle(input: JsValue) -> Result { write: models::Schema, inferred: Option, } + let relaxed: Option; let Input { read, @@ -150,9 +151,43 @@ pub fn extend_read_bundle(input: JsValue) -> Result { } = serde_json::from_value(input) .map_err(|err| JsValue::from_str(&format!("invalid input: {:?}", err)))?; - let output = models::Schema::extend_read_bundle(&read, Some(&write), inferred.as_ref()); + let mut defs = Vec::new(); - serde_wasm_bindgen::to_value(&output).map_err(|err| JsValue::from_str(&format!("{err:?}"))) + if read.references_write_schema() { + defs.push(models::schemas::AddDef { + id: models::Schema::REF_WRITE_SCHEMA_URL, + schema: &write, + overwrite: true, + }); + } + if read.references_relaxed_write_schema() { + relaxed = inferred + .is_some() + .then(|| write.to_relaxed_schema()) + .transpose() + .map_err(|err| JsValue::from_str(&format!("{err:?}")))?; + + defs.push(models::schemas::AddDef { + id: models::Schema::REF_RELAXED_WRITE_SCHEMA_URL, + schema: relaxed.as_ref().unwrap_or(&write), + overwrite: true, + }); + } + if read.references_inferred_schema() { + defs.push(models::schemas::AddDef { + id: models::Schema::REF_INFERRED_SCHEMA_URL, + schema: inferred + .as_ref() + .unwrap_or(models::Schema::inferred_schema_placeholder()), + overwrite: false, + }) + } + + let outcome = read + .add_defs(&defs) + .map_err(|err| JsValue::from_str(&format!("{err:?}")))?; + + serde_wasm_bindgen::to_value(&outcome).map_err(|err| JsValue::from_str(&format!("{err:?}"))) } #[wasm_bindgen] diff --git a/crates/models/src/fixture.schema.json b/crates/models/src/fixture.schema.json new file mode 100644 index 0000000000..e7ffc735c4 --- /dev/null +++ b/crates/models/src/fixture.schema.json @@ -0,0 +1,185 @@ +{ + "$defs": { + "PublicFoobar": { + "type": "object", + "required": [ + "id" + ], + "$anchor": "PublicFoobar", + "properties": { + "an_int": { + "description": "(source type: int4)", + "type": [ + "integer", + "null" + ] + }, + "a_string": { + "description": "(source type: varchar)", + "type": [ + "string", + "null" + ], + "minLength": 16 + }, + "an_array_of_timestamps": { + "type": "array", + "items": { + "type": "string", + "format": "date-time" + }, + "minItems": 128, + "maxItems": 128 + }, + "id": { + "type": "integer", + "description": "(source type: non-nullable int4)" + }, + "complex_subproperty": { + "if": { + "type": "integer" + }, + "then": { + "type": "integer" + }, + "else": { + "type": "string" + }, + "anyOf": [ + { + "type": "bool" + }, + { + "required": [ + "foo" + ] + } + ], + "oneOf": [ + { + "type": "string" + }, + { + "required": [ + "bar" + ] + } + ], + "allOf": [ + { + "type": "integer", + "title": "this is relaxed" + }, + { + "required": [ + "quib" + ], + "title": "so is this" + } + ], + "description": "We should not relax these complex and non-standard sub-schemas, except allOf" + } + } + } + }, + "allOf": [ + { + "if": { + "properties": { + "_meta": { + "properties": { + "op": { + "const": "d" + } + } + } + } + }, + "then": { + "reduce": { + "delete": true, + "strategy": "merge" + } + }, + "else": { + "reduce": { + "strategy": "merge" + } + }, + "required": [ + "_meta" + ], + "properties": { + "_meta": { + "type": "object", + "required": [ + "op", + "source" + ], + "properties": { + "before": { + "$ref": "#PublicFoobar", + "description": "Record state immediately before this change was applied.", + "reduce": { + "strategy": "firstWriteWins" + } + }, + "op": { + "enum": [ + "c", + "d", + "u" + ], + "description": "Change operation type: 'c' Create/Insert, 'u' Update, 'd' Delete." + }, + "source": { + "properties": { + "ts_ms": { + "type": "integer", + "description": "Unix timestamp (in millis) at which this event was recorded by the database." + }, + "schema": { + "type": "string", + "description": "Database schema (namespace) of the event." + }, + "snapshot": { + "type": "boolean", + "description": "Snapshot is true if the record was produced from an initial table backfill and unset if produced from the replication log." + }, + "table": { + "type": "string", + "description": "Database table of the event." + }, + "loc": { + "items": { + "type": "integer" + }, + "type": "array", + "maxItems": 3, + "minItems": 3, + "description": "Location of this WAL event as [last Commit.EndLSN; event LSN; current Begin.FinalLSN]. See https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html" + }, + "txid": { + "type": "integer", + "description": "The 32-bit transaction ID assigned by Postgres to the commit which produced this change." + } + }, + "type": "object", + "required": [ + "schema", + "table", + "loc" + ] + } + }, + "reduce": { + "strategy": "merge" + } + } + } + }, + { + "$ref": "#PublicFoobar" + } + ] +} \ No newline at end of file diff --git a/crates/models/src/lib.rs b/crates/models/src/lib.rs index 8dd907ecfc..1ef9921c55 100644 --- a/crates/models/src/lib.rs +++ b/crates/models/src/lib.rs @@ -19,7 +19,7 @@ mod materializations; pub mod publications; mod raw_value; mod references; -mod schemas; +pub mod schemas; mod shards; mod source; mod source_capture; diff --git a/crates/models/src/schemas.rs b/crates/models/src/schemas.rs index ca87dea4c3..2d33b58b82 100644 --- a/crates/models/src/schemas.rs +++ b/crates/models/src/schemas.rs @@ -33,10 +33,6 @@ impl std::ops::DerefMut for Schema { } } -type Skim = BTreeMap; -const KEYWORD_DEF: &str = "$defs"; -const KEYWORD_ID: &str = "$id"; - impl Schema { pub fn new(v: RawValue) -> Self { Self(v) @@ -53,6 +49,8 @@ impl Schema { pub const REF_INFERRED_SCHEMA_URL: &'static str = "flow://inferred-schema"; // URL for referencing the write schema of a collection, which may be used within a read schema. pub const REF_WRITE_SCHEMA_URL: &'static str = "flow://write-schema"; + // URL for referencing the write schema of a collection, which may be used within a read schema. + pub const REF_RELAXED_WRITE_SCHEMA_URL: &'static str = "flow://relaxed-write-schema"; /// Returns true if this Schema references the canonical inferred schema URL. pub fn references_inferred_schema(&self) -> bool { @@ -62,6 +60,10 @@ impl Schema { pub fn references_write_schema(&self) -> bool { REF_WRITE_SCHEMA_RE.is_match(self.get()) } + /// Returns true if this Schema references the canonical write schema URL. + pub fn references_relaxed_write_schema(&self) -> bool { + REF_RELAXED_WRITE_SCHEMA_RE.is_match(self.get()) + } pub fn example_absolute() -> Self { from_value(json!("http://example/schema#/$defs/subPath")).unwrap() @@ -80,21 +82,37 @@ impl Schema { .unwrap() } + /// Default collection `readSchema`, + /// used when schema inference is desired. pub fn default_inferred_read_schema() -> Self { let read_schema = serde_json::json!({ "allOf": [ - {"$ref": "flow://write-schema"}, - {"$ref": "flow://inferred-schema"} + {"$ref": Self::REF_WRITE_SCHEMA_URL}, // TODO(johnny): Switch to Self::REF_RELAXED_WRITE_SCHEMA_URL + {"$ref": Self::REF_INFERRED_SCHEMA_URL}, ], }); - let read_bundle = Self(crate::RawValue::from_value(&read_schema)); - Self::extend_read_bundle(&read_bundle, None, None) + Self(crate::RawValue::from_value(&read_schema)) + } + + /// Placeholder definition for `flow://inferred-schema`, + /// used when an actual inferred schema is not yet available. + pub fn inferred_schema_placeholder() -> &'static Self { + &INFERRED_SCHEMA_PLACEHOLDER + } + + /// Transform this Schema into a relaxed form which removes all `type`, + /// `require`, and `format` keywords of most (but not all) recursive + /// sub-schemas, while preserving other keywords. + /// + /// The primary purpose of relaxed schemas is to transform a collection + /// write-schema into a relaxation which is likely to function well when + /// intersected with the collection's inferred schema. + pub fn to_relaxed_schema(&self) -> serde_json::Result { + let relaxed = serde_json::from_str::(self.get())?; + Ok(Self(serde_json::value::to_raw_value(&relaxed)?.into())) } - /// Extend a bundled Flow read schema, which may include references to the - /// canonical collection write schema URI and inferred schema URI, - /// with inline definitions that fully resolve these references. - /// If an inferred schema is not available then `{}` is used. + /// TODO(johnny): This is deprecated and will be removed. pub fn extend_read_bundle( read_bundle: &Self, write_bundle: Option<&Self>, @@ -127,8 +145,11 @@ impl Schema { } Schema::add_defs(read_bundle, &defs) + .ok() + .unwrap_or_else(|| read_bundle.clone()) } + /// TODO(johnny): This is deprecated and will be removed. pub fn build_read_schema_bundle(read_schema: &Schema, write_schema: &Schema) -> Schema { let mut defs = Vec::new(); if read_schema.references_write_schema() { @@ -148,83 +169,113 @@ impl Schema { }); } Schema::add_defs(read_schema, &defs) + .ok() + .unwrap_or_else(|| read_schema.clone()) } - fn add_id(id: &str, schema: &Schema) -> RawValue { - let mut skim: Skim = serde_json::from_str(schema.get()).unwrap(); - - _ = skim.insert( - KEYWORD_ID.to_string(), - RawValue::from_value(&serde_json::Value::String(id.to_string())), - ); - serde_json::value::to_raw_value(&skim).unwrap().into() - } - - fn add_defs(target: &Schema, defs: &[AddDef]) -> Schema { + /// Extend this Schema with added $defs definitions. + /// The $id keyword of definition is set to its id, and its id is also + /// used to key the property of $defs under which the sub-schema lives. + pub fn add_defs(&self, defs: &[AddDef]) -> serde_json::Result { use serde_json::value::to_raw_value; - let mut read_schema: Skim = serde_json::from_str(target.get()).unwrap(); - let mut read_defs: Skim = read_schema + type Skim = BTreeMap; + const KEYWORD_DEF: &str = "$defs"; + const KEYWORD_ID: &str = "$id"; + + let mut schema = serde_json::from_str::(self.get())?; + let mut schema_defs = schema .remove(KEYWORD_DEF) - .map(|d| serde_json::from_str(d.get()).unwrap()) + .map(|d| serde_json::from_str::(d.get())) + .transpose()? .unwrap_or_default(); for AddDef { id, - schema, + schema: sub_schema, overwrite, } in defs { - if !overwrite && read_defs.contains_key(*id) { + if !overwrite && schema_defs.contains_key(*id) { continue; } - let with_id = Schema::add_id(id, schema); - read_defs.insert(id.to_string(), with_id); + let mut sub_schema = serde_json::from_str::(sub_schema.get())?; + _ = sub_schema.insert( + KEYWORD_ID.to_string(), + RawValue::from_value(&serde_json::Value::String(id.to_string())), + ); + schema_defs.insert(id.to_string(), to_raw_value(&sub_schema)?.into()); } // Skip adding defs if they are empty (which means `defs` was empty and there were no // pre-existing `$defs` in the schema). - if !read_defs.is_empty() { - // Re-serialize the updated definitions of the read schema. - _ = read_schema.insert( + if !schema_defs.is_empty() { + _ = schema.insert( KEYWORD_DEF.to_string(), - serde_json::value::to_raw_value(&read_defs).unwrap().into(), + serde_json::value::to_raw_value(&schema_defs)?.into(), ); } - Self(to_raw_value(&read_schema).unwrap().into()) + + Ok(Self(to_raw_value(&schema)?.into())) } +} - /// Removes the bundled write schema from the `$defs` of `self`, returning - /// a new schema with the value removed, and a boolean indicating whether the write - /// schema def was actually present. We used to bundle the write schema as part of the - /// read schema, just like the inferred schema. We're no longer doing that because it's - /// confusing to users, so this function removes the bundled write schema. This function - /// should only be needed for long enough to update all the inferred schemas, and can then - /// be safely removed. - pub fn remove_bundled_write_schema(&self) -> (bool, Self) { - use serde_json::value::to_raw_value; +// AddDef instances parameterize sub-schemas to be added via Schema::add_defs(). +pub struct AddDef<'a> { + // Canonical $id of this schema, which also keys the schema in $defs. + pub id: &'a str, + // Sub-schema to be inlined into $defs. + pub schema: &'a Schema, + // Should this definition overwrite one that's already present? + pub overwrite: bool, +} - let mut read_schema: Skim = serde_json::from_str(self.0.get()).unwrap(); - let mut read_defs: Skim = read_schema - .get(KEYWORD_DEF) - .map(|d| serde_json::from_str(d.get()).unwrap()) - .unwrap_or_default(); - let had_write_schema = read_defs.remove(Schema::REF_WRITE_SCHEMA_URL).is_some(); - read_schema.insert( - KEYWORD_DEF.to_string(), - to_raw_value(&read_defs).unwrap().into(), - ); - ( - had_write_schema, - Self(to_raw_value(&read_schema).unwrap().into()), - ) - } +/// RelaxedSchema is an opinionated relaxation of a JSON-Schema, which removes +/// the `type`, `required`, and `format` keywords from most (but not all) +/// recursive sub-schemas. It's purpose is to transform collection write schemas +/// into relaxed forms which are likely to function well when intersected with +/// an inferred schema. +#[derive(serde::Serialize, serde::Deserialize)] +#[serde(untagged)] +enum RelaxedSchema { + Bool(bool), + Obj(RelaxedSchemaObj), + Vec(Vec), } -struct AddDef<'a> { - id: &'a str, - schema: &'a Schema, - overwrite: bool, +#[derive(serde::Serialize, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +struct RelaxedSchemaObj { + #[serde( + rename = "$defs", + alias = "definitions", + default, + skip_serializing_if = "BTreeMap::is_empty" + )] + defs: BTreeMap, + #[serde(default, skip_serializing_if = "Option::is_none")] + all_of: Option>, + + #[serde(default, skip_serializing_if = "BTreeMap::is_empty")] + properties: BTreeMap, + #[serde(default, skip_serializing_if = "Option::is_none")] + items: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + additional_items: Option>, + #[serde(default, skip_serializing_if = "Option::is_none")] + additional_properties: Option>, + + // Keywords which are removed from a relaxed schema. + #[serde(rename = "type", skip_serializing)] + _type: Option, + #[serde(rename = "required", default, skip_serializing)] + _required: Vec, + #[serde(rename = "format", default, skip_serializing)] + _format: String, + + // Other keywords are passed-through. + #[serde(flatten)] + pass_through: BTreeMap, } // These patterns let us cheaply detect if a collection schema references the @@ -242,6 +293,9 @@ lazy_static::lazy_static! { static ref REF_WRITE_SCHEMA_RE: regex::Regex = regex::Regex::new( &[r#""\$ref"\p{Z}*:\p{Z}*""#, ®ex::escape(Schema::REF_WRITE_SCHEMA_URL), "\""].concat() ).unwrap(); + static ref REF_RELAXED_WRITE_SCHEMA_RE: regex::Regex = regex::Regex::new( + &[r#""\$ref"\p{Z}*:\p{Z}*""#, ®ex::escape(Schema::REF_RELAXED_WRITE_SCHEMA_URL), "\""].concat() + ).unwrap(); /// Placeholder used to resolve the `flow://inferred-schema` reference when the actual schema /// is not yet known. @@ -263,7 +317,7 @@ lazy_static::lazy_static! { #[cfg(test)] mod test { - use super::{RawValue, Schema}; + use super::{AddDef, RawValue, Schema}; use serde_json::json; macro_rules! schema { @@ -301,8 +355,8 @@ mod test { } #[test] - fn test_build_read_schema_bundle() { - let write_schema = schema!({ + fn test_references_and_add_defs() { + let sub_schema = schema!({ "type": "object", "properties": { "a": { "type": "integer" }, @@ -310,72 +364,48 @@ mod test { } }); - // Assert that inferred schema placeholder gets added if needed - let read_schema = schema!({ + let schema = schema!({ "$defs": { - "existing": { "properties": { "f": { "type": "string" }}} + "replaced": { "properties": { "f": { "type": "string" }}}, + "not_overwritten": { "properties": { "f": { "type": "string" }}}, + "extra": { "properties": { "f": { "type": "string" }}}, }, "allOf": [ {"$ref": "flow://inferred-schema"}, {"$ref": "flow://write-schema"}, ] }); - let result = Schema::build_read_schema_bundle(&read_schema, &write_schema); - insta::assert_json_snapshot!(result.to_value(), @r###" - { - "$defs": { - "existing": { - "properties": { - "f": { - "type": "string" - } - } + + assert!(!schema.references_relaxed_write_schema()); + assert!(schema.references_inferred_schema()); + assert!(schema.references_write_schema()); + + let outcome = schema.add_defs(&[ + AddDef { + id: Schema::REF_WRITE_SCHEMA_URL, + schema: &sub_schema, + overwrite: true, }, - "flow://inferred-schema": { - "$id": "flow://inferred-schema", - "properties": { - "_meta": { - "properties": { - "inferredSchemaIsNotAvailable": { - "const": true, - "description": "An inferred schema is not yet available because no documents have been written to this collection.\nThis place-holder causes document validations to fail at read time, so that the task can be updated once an inferred schema is ready." - } - }, - "required": [ - "inferredSchemaIsNotAvailable" - ] - } - }, - "required": [ - "_meta" - ] + AddDef { + id: Schema::REF_INFERRED_SCHEMA_URL, + schema: &sub_schema, + overwrite: true, }, - "flow://write-schema": { - "$id": "flow://write-schema", - "properties": { - "a": { - "type": "integer" - }, - "b": { - "type": "string" - } - }, - "type": "object" - } - }, - "allOf": [ - { - "$ref": "flow://inferred-schema" + AddDef { + id: "replaced", + schema: &sub_schema, + overwrite: true, }, - { - "$ref": "flow://write-schema" - } - ] - } - "###); + AddDef { + id: "not_overwritten", + schema: &sub_schema, + overwrite: false, + }, + ]); + + insta::assert_json_snapshot!(outcome.unwrap().to_value()); - // Assert that existing defs are unchanged when read schema does not ref anything - let read_schema = schema!({ + let no_ref_schema = schema!({ "$defs": { "existing": { "properties": { "f": { "type": "string" }}} }, @@ -385,304 +415,14 @@ mod test { "d": { "type": "string" } } }); - let result = Schema::build_read_schema_bundle(&read_schema, &write_schema); - insta::assert_json_snapshot!(result.to_value(), @r###" - { - "$defs": { - "existing": { - "properties": { - "f": { - "type": "string" - } - } - } - }, - "properties": { - "c": { - "type": "integer" - }, - "d": { - "type": "string" - } - }, - "type": "object" - } - "###); - - // Assert that no defs are added when read schema does not ref anything - let read_schema = schema!({ - "type": "object", - "properties": { - "c": { "type": "integer" }, - "d": { "type": "string" } - } - }); - let result = Schema::build_read_schema_bundle(&read_schema, &write_schema); - insta::assert_json_snapshot!(result.to_value(), @r###" - { - "properties": { - "c": { - "type": "integer" - }, - "d": { - "type": "string" - } - }, - "type": "object" - } - "###); - - // Assert that existing inferred schema def is not overwritten, but that - // the write schema def is. Note that in practice any - // `flow://write-schema` defs are removed by the publisher prior to - // validation, so it should generally not exist when this function is - // called. But there's still some collection specs out there that have - // inlined write schema defs. Not overwriting the inferred schema def is - // necessary in order to avoid replacing an existing - // `flow://inferred-schema` def with the placeholder. - let read_schema = schema!({ - "$defs": { - "flow://inferred-schema": { - "type": "object", - "properties": { - "c": { "type": "integer" } - } - }, - "flow://write-schema": { - "properties": { - "c": { "const": "should be overwritten" } - } - }, - }, - "allOf": [ - {"$ref": "flow://inferred-schema"}, - {"$ref": "flow://write-schema"}, - ] - }); - let result = Schema::build_read_schema_bundle(&read_schema, &write_schema); - insta::assert_json_snapshot!(result.to_value(), @r###" - { - "$defs": { - "flow://inferred-schema": { - "properties": { - "c": { - "type": "integer" - } - }, - "type": "object" - }, - "flow://write-schema": { - "$id": "flow://write-schema", - "properties": { - "a": { - "type": "integer" - }, - "b": { - "type": "string" - } - }, - "type": "object" - } - }, - "allOf": [ - { - "$ref": "flow://inferred-schema" - }, - { - "$ref": "flow://write-schema" - } - ] - } - "###); + assert!(!no_ref_schema.references_inferred_schema()); + assert!(!no_ref_schema.references_relaxed_write_schema()); + assert!(!no_ref_schema.references_write_schema()); } #[test] - fn test_extend_read_schema() { - let read_schema = Schema::new(RawValue::from_value(&json!({ - "$defs": { - "existing://def": {"type": "array"}, - }, - "maxProperties": 10, - "allOf": [ - {"$ref": "flow://inferred-schema"}, - {"$ref": "flow://write-schema"}, - ] - }))); - let write_schema = Schema::new(RawValue::from_value(&json!({ - "$id": "old://value", - "required": ["a_key"], - }))); - let inferred_schema = Schema::new(RawValue::from_value(&json!({ - "$id": "old://value", - "minProperties": 5, - }))); - - insta::assert_json_snapshot!(Schema::extend_read_bundle(&read_schema, Some(&write_schema), Some(&inferred_schema)).to_value(), @r###" - { - "$defs": { - "existing://def": { - "type": "array" - }, - "flow://inferred-schema": { - "$id": "flow://inferred-schema", - "minProperties": 5 - }, - "flow://write-schema": { - "$id": "flow://write-schema", - "required": [ - "a_key" - ] - } - }, - "allOf": [ - { - "$ref": "flow://inferred-schema" - }, - { - "$ref": "flow://write-schema" - } - ], - "maxProperties": 10 - } - "###); - - // Case: no inferred schema is available. - insta::assert_json_snapshot!(Schema::extend_read_bundle(&read_schema, Some(&write_schema), None).to_value(), @r###" - { - "$defs": { - "existing://def": { - "type": "array" - }, - "flow://inferred-schema": { - "$id": "flow://inferred-schema", - "properties": { - "_meta": { - "properties": { - "inferredSchemaIsNotAvailable": { - "const": true, - "description": "An inferred schema is not yet available because no documents have been written to this collection.\nThis place-holder causes document validations to fail at read time, so that the task can be updated once an inferred schema is ready." - } - }, - "required": [ - "inferredSchemaIsNotAvailable" - ] - } - }, - "required": [ - "_meta" - ] - }, - "flow://write-schema": { - "$id": "flow://write-schema", - "required": [ - "a_key" - ] - } - }, - "allOf": [ - { - "$ref": "flow://inferred-schema" - }, - { - "$ref": "flow://write-schema" - } - ], - "maxProperties": 10 - } - "###); - - // Case: pass `write_schema` which has no references. - insta::assert_json_snapshot!(Schema::extend_read_bundle(&write_schema, Some(&write_schema), None).to_value(), @r###" - { - "$id": "old://value", - "required": [ - "a_key" - ] - } - "###); - - // Case: don't include `write_schema` - insta::assert_json_snapshot!(Schema::extend_read_bundle(&read_schema, None, Some(&inferred_schema)).to_value(), @r###" - { - "$defs": { - "existing://def": { - "type": "array" - }, - "flow://inferred-schema": { - "$id": "flow://inferred-schema", - "minProperties": 5 - } - }, - "allOf": [ - { - "$ref": "flow://inferred-schema" - }, - { - "$ref": "flow://write-schema" - } - ], - "maxProperties": 10 - } - "###); - } - - #[test] - fn test_removing_bundled_write_schema() { - let read_schema = Schema::new(RawValue::from_value(&json!({ - "$defs": { - "existing://def": {"type": "array"}, - }, - "maxProperties": 10, - "allOf": [ - {"$ref": "flow://inferred-schema"}, - {"$ref": "flow://write-schema"}, - ] - }))); - let write_schema = Schema::new(RawValue::from_value(&json!({ - "$id": "old://value", - "required": ["a_key"], - }))); - let inferred_schema = Schema::new(RawValue::from_value(&json!({ - "$id": "old://value", - "minProperties": 5, - }))); - - let bundle = - Schema::extend_read_bundle(&read_schema, Some(&write_schema), Some(&inferred_schema)); - assert_eq!( - 3, - bundle.get().matches(Schema::REF_WRITE_SCHEMA_URL).count(), - "schema should contain 'flow://write-schema' 3 times, for $ref, $defs key, and $id" - ); - let (was_removed, new_bundle) = bundle.remove_bundled_write_schema(); - assert!(was_removed); - insta::assert_json_snapshot!(new_bundle.to_value(), @r###" - { - "$defs": { - "existing://def": { - "type": "array" - }, - "flow://inferred-schema": { - "$id": "flow://inferred-schema", - "minProperties": 5 - } - }, - "allOf": [ - { - "$ref": "flow://inferred-schema" - }, - { - "$ref": "flow://write-schema" - } - ], - "maxProperties": 10 - } - "###); - - let (was_removed, _) = new_bundle.remove_bundled_write_schema(); - assert!( - !was_removed, - "expected write schema to have already been removed" - ); + fn test_relaxation() { + let fixture = Schema::new(RawValue::from_str(include_str!("fixture.schema.json")).unwrap()); + insta::assert_json_snapshot!(fixture.to_relaxed_schema().unwrap().to_value()) } } diff --git a/crates/models/src/snapshots/models__schemas__test__references_and_add_defs.snap b/crates/models/src/snapshots/models__schemas__test__references_and_add_defs.snap new file mode 100644 index 0000000000..c3ac5b15fb --- /dev/null +++ b/crates/models/src/snapshots/models__schemas__test__references_and_add_defs.snap @@ -0,0 +1,66 @@ +--- +source: crates/models/src/schemas.rs +expression: outcome.to_value() +--- +{ + "$defs": { + "extra": { + "properties": { + "f": { + "type": "string" + } + } + }, + "flow://inferred-schema": { + "$id": "flow://inferred-schema", + "properties": { + "a": { + "type": "integer" + }, + "b": { + "type": "string" + } + }, + "type": "object" + }, + "flow://write-schema": { + "$id": "flow://write-schema", + "properties": { + "a": { + "type": "integer" + }, + "b": { + "type": "string" + } + }, + "type": "object" + }, + "not_overwritten": { + "properties": { + "f": { + "type": "string" + } + } + }, + "replaced": { + "$id": "replaced", + "properties": { + "a": { + "type": "integer" + }, + "b": { + "type": "string" + } + }, + "type": "object" + } + }, + "allOf": [ + { + "$ref": "flow://inferred-schema" + }, + { + "$ref": "flow://write-schema" + } + ] +} diff --git a/crates/models/src/snapshots/models__schemas__test__relaxation.snap b/crates/models/src/snapshots/models__schemas__test__relaxation.snap new file mode 100644 index 0000000000..af85b5cbf5 --- /dev/null +++ b/crates/models/src/snapshots/models__schemas__test__relaxation.snap @@ -0,0 +1,146 @@ +--- +source: crates/models/src/schemas.rs +expression: fixture.to_relaxed_schema().to_value() +--- +{ + "$defs": { + "PublicFoobar": { + "$anchor": "PublicFoobar", + "properties": { + "a_string": { + "description": "(source type: varchar)", + "minLength": 16 + }, + "an_array_of_timestamps": { + "items": {}, + "maxItems": 128, + "minItems": 128 + }, + "an_int": { + "description": "(source type: int4)" + }, + "complex_subproperty": { + "allOf": [ + { + "title": "this is relaxed" + }, + { + "title": "so is this" + } + ], + "anyOf": [ + { + "type": "bool" + }, + { + "required": [ + "foo" + ] + } + ], + "description": "We should not relax these complex and non-standard sub-schemas, except allOf", + "else": { + "type": "string" + }, + "if": { + "type": "integer" + }, + "oneOf": [ + { + "type": "string" + }, + { + "required": [ + "bar" + ] + } + ], + "then": { + "type": "integer" + } + }, + "id": { + "description": "(source type: non-nullable int4)" + } + } + } + }, + "allOf": [ + { + "else": { + "reduce": { + "strategy": "merge" + } + }, + "if": { + "properties": { + "_meta": { + "properties": { + "op": { + "const": "d" + } + } + } + } + }, + "properties": { + "_meta": { + "properties": { + "before": { + "$ref": "#PublicFoobar", + "description": "Record state immediately before this change was applied.", + "reduce": { + "strategy": "firstWriteWins" + } + }, + "op": { + "description": "Change operation type: 'c' Create/Insert, 'u' Update, 'd' Delete.", + "enum": [ + "c", + "d", + "u" + ] + }, + "source": { + "properties": { + "loc": { + "description": "Location of this WAL event as [last Commit.EndLSN; event LSN; current Begin.FinalLSN]. See https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html", + "items": {}, + "maxItems": 3, + "minItems": 3 + }, + "schema": { + "description": "Database schema (namespace) of the event." + }, + "snapshot": { + "description": "Snapshot is true if the record was produced from an initial table backfill and unset if produced from the replication log." + }, + "table": { + "description": "Database table of the event." + }, + "ts_ms": { + "description": "Unix timestamp (in millis) at which this event was recorded by the database." + }, + "txid": { + "description": "The 32-bit transaction ID assigned by Postgres to the commit which produced this change." + } + } + } + }, + "reduce": { + "strategy": "merge" + } + } + }, + "then": { + "reduce": { + "delete": true, + "strategy": "merge" + } + } + }, + { + "$ref": "#PublicFoobar" + } + ] +}