Skip to content

Commit

Permalink
flow.proto: Add Projection.write_inference (but don't populate it yet)
Browse files Browse the repository at this point in the history
In the future, the write_inference field will be set if:
 - The collection has separate write & read schemas, AND
 - The write schema has keywords that constrain the projected location.

We intend to use `write_inference` in connectors that wish to use
write-schema inference to identify mapped column types for locations
which the (often inferred) read-schema says are always NULL or cannot
exist.
  • Loading branch information
jgraettinger committed Feb 6, 2025
1 parent de6589b commit f37f0a3
Show file tree
Hide file tree
Showing 15 changed files with 604 additions and 229 deletions.
6 changes: 6 additions & 0 deletions crates/proto-flow/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ pub struct Projection {
/// Inference of this projection.
#[prost(message, optional, tag = "6")]
pub inference: ::core::option::Option<Inference>,
/// Inference of this projection from the collection's write schema.
/// This is set only when:
/// 1) The collection has separate read and write schemas, and
/// 2) The write schema defines constrains over this projection.
#[prost(message, optional, tag = "7")]
pub write_inference: ::core::option::Option<Inference>,
}
/// Inference details type information which is statically known
/// about a given document location.
Expand Down
18 changes: 18 additions & 0 deletions crates/proto-flow/src/flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4684,6 +4684,9 @@ impl serde::Serialize for Projection {
if self.inference.is_some() {
len += 1;
}
if self.write_inference.is_some() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("flow.Projection", len)?;
if !self.ptr.is_empty() {
struct_ser.serialize_field("ptr", &self.ptr)?;
Expand All @@ -4703,6 +4706,9 @@ impl serde::Serialize for Projection {
if let Some(v) = self.inference.as_ref() {
struct_ser.serialize_field("inference", v)?;
}
if let Some(v) = self.write_inference.as_ref() {
struct_ser.serialize_field("writeInference", v)?;
}
struct_ser.end()
}
}
Expand All @@ -4721,6 +4727,8 @@ impl<'de> serde::Deserialize<'de> for Projection {
"is_primary_key",
"isPrimaryKey",
"inference",
"write_inference",
"writeInference",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -4731,6 +4739,7 @@ impl<'de> serde::Deserialize<'de> for Projection {
IsPartitionKey,
IsPrimaryKey,
Inference,
WriteInference,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand Down Expand Up @@ -4758,6 +4767,7 @@ impl<'de> serde::Deserialize<'de> for Projection {
"isPartitionKey" | "is_partition_key" => Ok(GeneratedField::IsPartitionKey),
"isPrimaryKey" | "is_primary_key" => Ok(GeneratedField::IsPrimaryKey),
"inference" => Ok(GeneratedField::Inference),
"writeInference" | "write_inference" => Ok(GeneratedField::WriteInference),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand All @@ -4783,6 +4793,7 @@ impl<'de> serde::Deserialize<'de> for Projection {
let mut is_partition_key__ = None;
let mut is_primary_key__ = None;
let mut inference__ = None;
let mut write_inference__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Ptr => {
Expand Down Expand Up @@ -4821,6 +4832,12 @@ impl<'de> serde::Deserialize<'de> for Projection {
}
inference__ = map_.next_value()?;
}
GeneratedField::WriteInference => {
if write_inference__.is_some() {
return Err(serde::de::Error::duplicate_field("writeInference"));
}
write_inference__ = map_.next_value()?;
}
}
}
Ok(Projection {
Expand All @@ -4830,6 +4847,7 @@ impl<'de> serde::Deserialize<'de> for Projection {
is_partition_key: is_partition_key__.unwrap_or_default(),
is_primary_key: is_primary_key__.unwrap_or_default(),
inference: inference__,
write_inference: write_inference__,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/proto-flow/tests/regression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ fn ex_projections() -> Vec<flow::Projection> {
item_types: vec!["null".to_string(), "integer".to_string()],
}),
}),
write_inference: None,
}]
}

Expand Down
3 changes: 3 additions & 0 deletions crates/validation/src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ fn walk_collection_projections(
is_primary_key: key.iter().any(|k| k == ptr),
is_partition_key: partition,
inference: Some(assemble::inference(r_shape, r_exists)),
write_inference: None,
});
models.insert(field.clone(), projection.clone());
}
Expand Down Expand Up @@ -421,6 +422,7 @@ fn walk_collection_projections(
is_primary_key: true,
is_partition_key: false,
inference: Some(assemble::inference(r_shape, r_exists)),
write_inference: None,
});
}

Expand Down Expand Up @@ -448,6 +450,7 @@ fn walk_collection_projections(
is_primary_key: false,
is_partition_key: false,
inference: Some(assemble::inference(r_shape, r_exists)),
write_inference: None,
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "",
Expand All @@ -72,6 +73,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "/_meta/uuid",
Expand Down Expand Up @@ -101,6 +103,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "/id",
Expand Down Expand Up @@ -130,6 +133,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
],
ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "",
Expand All @@ -101,6 +102,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "/_meta/uuid",
Expand Down Expand Up @@ -130,6 +132,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "/id",
Expand Down Expand Up @@ -159,6 +162,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "/val",
Expand Down Expand Up @@ -188,6 +192,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
],
ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}",
Expand Down Expand Up @@ -425,6 +430,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "",
Expand All @@ -447,6 +453,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "/_meta/uuid",
Expand Down Expand Up @@ -476,6 +483,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "/id",
Expand Down Expand Up @@ -505,6 +513,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "/val",
Expand Down Expand Up @@ -534,6 +543,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
],
ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}",
Expand Down Expand Up @@ -707,6 +717,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "",
Expand All @@ -729,6 +740,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "/_meta/uuid",
Expand Down Expand Up @@ -758,6 +770,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "/id",
Expand Down Expand Up @@ -787,6 +800,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
Projection {
ptr: "/val",
Expand Down Expand Up @@ -816,6 +830,7 @@ Outcome {
array: None,
},
),
write_inference: None,
},
],
ack_template_json: "{\"_meta\":{\"ack\":true,\"uuid\":\"DocUUIDPlaceholder-329Bb50aa48EAa9ef\"}}",
Expand Down
Loading

0 comments on commit f37f0a3

Please sign in to comment.