Skip to content

Commit

Permalink
protocols: bundle a number of protocol extensions
Browse files Browse the repository at this point in the history
These have variously been discussed over the last couple weeks:

* Add resource_path_pointers to materialization Response.Spec, to bring
  materializations toward parity with capture path pointers.

* Add state_json to materialization Request.Apply, and state to
  Response.Applied. This allows a materialization connector to complete
  a post-commit apply under the last MaterializationSpec before applying
  schema changes which could break those staged files.

* Add inactive_transforms to CollectionSpec.Derivation, and
  inactive_bindings to CaptureSpec and MaterializationSpec. We intend to
  retain previously-active Binding states in these fields, to allow
  connectors to ascertain whether a resource is brand new vs active
  in the past; to better initialize resources depending on historical
  activation context, and perhaps to clean up de-activated resources.

* Improve various comments.

None of these new fields are utilized yet, this is just protocol
change groundwork.
  • Loading branch information
jgraettinger committed Feb 13, 2025
1 parent 134d495 commit ed98526
Show file tree
Hide file tree
Showing 38 changed files with 1,237 additions and 655 deletions.
9 changes: 4 additions & 5 deletions crates/agent/src/connector_tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,9 +281,10 @@ async fn spec_materialization(
resource_config_schema_json,
documentation_url,
oauth2,
resource_path_pointers,
} = spec;

let oauth = if let Some(oa) = oauth2 {
let oauth2 = if let Some(oa) = oauth2 {
Some(serde_json::value::to_raw_value(&oa).expect("serializing oauth2 config"))
} else {
None
Expand All @@ -294,10 +295,8 @@ async fn spec_materialization(
.context("parsing endpoint config schema")?,
resource_config_schema: RawValue::from_string(resource_config_schema_json)
.context("parsing resource config schema")?,

// materialization connectors don't currently specify resrouce_path_pointers, though perhaps they should
resource_path_pointers: Vec::new(),
oauth2: oauth,
resource_path_pointers,
oauth2,
})
}

Expand Down
1 change: 1 addition & 0 deletions crates/dekaf/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ pub async fn unary_materialize(
"https://docs.estuary.dev/guides/dekaf_reading_collections_from_kafka"
.to_string(),
oauth2: None,
resource_path_pointers: vec!["/topic_name".to_string()],
}),
..Default::default()
});
Expand Down
1 change: 1 addition & 0 deletions crates/flowctl/src/raw/materialize_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ pub async fn do_materialize_fixture(
version: "test".to_string(),
last_materialization: None,
last_version: String::new(),
state_json: String::new(),
}),
..Default::default()
});
Expand Down
31 changes: 16 additions & 15 deletions crates/proto-flow/src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,14 @@ pub mod request {
pub backfill: u32,
}
}
/// Apply a capture configuration and bindings to its endpoint.
/// Apply is run out-of-band with ongoing connector invocations,
/// and may be run many times for a single capture name,
/// where each invocation has varying bindings, or even no bindings.
/// The connector performs any required setup or cleanup.
/// Apply an updated capture specification to its endpoint,
/// in preparation for an Open of a capture session.
/// Apply is run by the leader shard of a capture task
/// (having key_begin: 0) while the capture is quiescent.
/// Apply may be called multiple times for a given `version` and
/// `last_version`, even if a prior call succeeded from the connector's
/// perspective, so implementations must be idempotent. However, the next
/// session will not Open until it's preceding Apply has durably completed.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Apply {
Expand Down Expand Up @@ -192,23 +195,21 @@ pub mod response {
/// JSON schema of the connector's configuration.
#[prost(string, tag = "2")]
pub config_schema_json: ::prost::alloc::string::String,
/// JSON schema of the connecor's resource configuration.
/// JSON schema of the connector's resource configuration.
#[prost(string, tag = "3")]
pub resource_config_schema_json: ::prost::alloc::string::String,
/// URL for connector's documention.
/// URL for connector's documentation.
#[prost(string, tag = "4")]
pub documentation_url: ::prost::alloc::string::String,
/// Optional OAuth2 configuration.
#[prost(message, optional, tag = "5")]
pub oauth2: ::core::option::Option<super::super::flow::OAuth2>,
/// One or more JSON pointers, which are used to extract the `resource_path`
/// from a given `resource` of this connector. For example, a database
/// capture connector might have a `resource` that's represented like:
/// `{"schema": "foo", "table": "bar", "otherConfig": true}`. In that case
/// it could use `resource_path_pointers: \["/schema", "/table"\]`, which
/// would result in a `resource_path` of `\["foo", "bar"\]`. This allows
/// `otherConfig` to be changed by the user without impacting the identity of
/// the resource.
/// One or more JSON pointers, which are used to extract resource paths
/// from resource configurations of this connector. For example,
/// a database connector might have a resource config like:
/// {"schema": "foo", "table": "bar", "other": "config", "answer": 42}
/// The connector would specify `resource_path_pointers: \["/schema", "/table"\]`,
/// which would result in a `resource_path` of `\["foo", "bar"\]`.
#[prost(string, repeated, tag = "6")]
pub resource_path_pointers: ::prost::alloc::vec::Vec<
::prost::alloc::string::String,
Expand Down
27 changes: 20 additions & 7 deletions crates/proto-flow/src/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ pub mod collection_spec {
/// Network ports of this derivation.
#[prost(message, repeated, tag = "7")]
pub network_ports: ::prost::alloc::vec::Vec<super::NetworkPort>,
/// Transforms which were previously active for this task, but are no longer.
/// Inactive transforms are unique by their transform name, and have no
/// overlap with active transforms.
#[prost(message, repeated, tag = "8")]
pub inactive_transforms: ::prost::alloc::vec::Vec<derivation::Transform>,
}
/// Nested message and enum types in `Derivation`.
pub mod derivation {
Expand All @@ -309,7 +314,7 @@ pub mod collection_spec {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Transform {
/// Stable name of this transform.
/// Stable and unique name of this transform.
#[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
/// Source collection which is read by this transform.
Expand Down Expand Up @@ -498,12 +503,16 @@ pub struct CaptureSpec {
/// Network ports of this capture.
#[prost(message, repeated, tag = "8")]
pub network_ports: ::prost::alloc::vec::Vec<NetworkPort>,
/// Bindings which were previously active for this task, but are no longer.
/// Inactive bindings are unique by their resource path, and have no
/// overlap with active bindings.
#[prost(message, repeated, tag = "9")]
pub inactive_bindings: ::prost::alloc::vec::Vec<capture_spec::Binding>,
}
/// Nested message and enum types in `CaptureSpec`.
pub mod capture_spec {
/// Bindings of endpoint resources and collections into which they're captured.
/// Bindings are ordered and unique on the bound collection name,
/// and are also unique on the resource path.
/// Bindings between endpoint resources, uniquely identified by their
/// resource path, and the collections into which they're captured.
///
/// Next tag: 6.
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down Expand Up @@ -594,12 +603,16 @@ pub struct MaterializationSpec {
/// Network ports of this materialization.
#[prost(message, repeated, tag = "7")]
pub network_ports: ::prost::alloc::vec::Vec<NetworkPort>,
/// Bindings which were previously active for this task, but are no longer.
/// Inactive bindings are unique by their resource path, and have no
/// overlap with active bindings.
#[prost(message, repeated, tag = "9")]
pub inactive_bindings: ::prost::alloc::vec::Vec<materialization_spec::Binding>,
}
/// Nested message and enum types in `MaterializationSpec`.
pub mod materialization_spec {
/// Bindings of endpoint resources and collections from which they're
/// materialized. Bindings are ordered and unique on the bound collection name,
/// and are also unique on the resource path.
/// Bindings between endpoint resources, uniquely identified by their
/// resource path, and the collections from which they're materialized.
///
/// Next tag: 14.
#[allow(clippy::derive_partial_eq_without_eq)]
Expand Down
54 changes: 54 additions & 0 deletions crates/proto-flow/src/flow.serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,9 @@ impl serde::Serialize for CaptureSpec {
if !self.network_ports.is_empty() {
len += 1;
}
if !self.inactive_bindings.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("flow.CaptureSpec", len)?;
if !self.name.is_empty() {
struct_ser.serialize_field("name", &self.name)?;
Expand Down Expand Up @@ -478,6 +481,9 @@ impl serde::Serialize for CaptureSpec {
if !self.network_ports.is_empty() {
struct_ser.serialize_field("networkPorts", &self.network_ports)?;
}
if !self.inactive_bindings.is_empty() {
struct_ser.serialize_field("inactiveBindings", &self.inactive_bindings)?;
}
struct_ser.end()
}
}
Expand All @@ -502,6 +508,8 @@ impl<'de> serde::Deserialize<'de> for CaptureSpec {
"recoveryLogTemplate",
"network_ports",
"networkPorts",
"inactive_bindings",
"inactiveBindings",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -514,6 +522,7 @@ impl<'de> serde::Deserialize<'de> for CaptureSpec {
ShardTemplate,
RecoveryLogTemplate,
NetworkPorts,
InactiveBindings,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand Down Expand Up @@ -543,6 +552,7 @@ impl<'de> serde::Deserialize<'de> for CaptureSpec {
"shardTemplate" | "shard_template" => Ok(GeneratedField::ShardTemplate),
"recoveryLogTemplate" | "recovery_log_template" => Ok(GeneratedField::RecoveryLogTemplate),
"networkPorts" | "network_ports" => Ok(GeneratedField::NetworkPorts),
"inactiveBindings" | "inactive_bindings" => Ok(GeneratedField::InactiveBindings),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand Down Expand Up @@ -570,6 +580,7 @@ impl<'de> serde::Deserialize<'de> for CaptureSpec {
let mut shard_template__ = None;
let mut recovery_log_template__ = None;
let mut network_ports__ = None;
let mut inactive_bindings__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Name => {
Expand Down Expand Up @@ -622,6 +633,12 @@ impl<'de> serde::Deserialize<'de> for CaptureSpec {
}
network_ports__ = Some(map_.next_value()?);
}
GeneratedField::InactiveBindings => {
if inactive_bindings__.is_some() {
return Err(serde::de::Error::duplicate_field("inactiveBindings"));
}
inactive_bindings__ = Some(map_.next_value()?);
}
}
}
Ok(CaptureSpec {
Expand All @@ -633,6 +650,7 @@ impl<'de> serde::Deserialize<'de> for CaptureSpec {
shard_template: shard_template__,
recovery_log_template: recovery_log_template__,
network_ports: network_ports__.unwrap_or_default(),
inactive_bindings: inactive_bindings__.unwrap_or_default(),
})
}
}
Expand Down Expand Up @@ -1156,6 +1174,9 @@ impl serde::Serialize for collection_spec::Derivation {
if !self.network_ports.is_empty() {
len += 1;
}
if !self.inactive_transforms.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("flow.CollectionSpec.Derivation", len)?;
if self.connector_type != 0 {
let v = collection_spec::derivation::ConnectorType::try_from(self.connector_type)
Expand Down Expand Up @@ -1184,6 +1205,9 @@ impl serde::Serialize for collection_spec::Derivation {
if !self.network_ports.is_empty() {
struct_ser.serialize_field("networkPorts", &self.network_ports)?;
}
if !self.inactive_transforms.is_empty() {
struct_ser.serialize_field("inactiveTransforms", &self.inactive_transforms)?;
}
struct_ser.end()
}
}
Expand All @@ -1207,6 +1231,8 @@ impl<'de> serde::Deserialize<'de> for collection_spec::Derivation {
"recoveryLogTemplate",
"network_ports",
"networkPorts",
"inactive_transforms",
"inactiveTransforms",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -1218,6 +1244,7 @@ impl<'de> serde::Deserialize<'de> for collection_spec::Derivation {
ShardTemplate,
RecoveryLogTemplate,
NetworkPorts,
InactiveTransforms,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand Down Expand Up @@ -1246,6 +1273,7 @@ impl<'de> serde::Deserialize<'de> for collection_spec::Derivation {
"shardTemplate" | "shard_template" => Ok(GeneratedField::ShardTemplate),
"recoveryLogTemplate" | "recovery_log_template" => Ok(GeneratedField::RecoveryLogTemplate),
"networkPorts" | "network_ports" => Ok(GeneratedField::NetworkPorts),
"inactiveTransforms" | "inactive_transforms" => Ok(GeneratedField::InactiveTransforms),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand All @@ -1272,6 +1300,7 @@ impl<'de> serde::Deserialize<'de> for collection_spec::Derivation {
let mut shard_template__ = None;
let mut recovery_log_template__ = None;
let mut network_ports__ = None;
let mut inactive_transforms__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::ConnectorType => {
Expand Down Expand Up @@ -1316,6 +1345,12 @@ impl<'de> serde::Deserialize<'de> for collection_spec::Derivation {
}
network_ports__ = Some(map_.next_value()?);
}
GeneratedField::InactiveTransforms => {
if inactive_transforms__.is_some() {
return Err(serde::de::Error::duplicate_field("inactiveTransforms"));
}
inactive_transforms__ = Some(map_.next_value()?);
}
}
}
Ok(collection_spec::Derivation {
Expand All @@ -1326,6 +1361,7 @@ impl<'de> serde::Deserialize<'de> for collection_spec::Derivation {
shard_template: shard_template__,
recovery_log_template: recovery_log_template__,
network_ports: network_ports__.unwrap_or_default(),
inactive_transforms: inactive_transforms__.unwrap_or_default(),
})
}
}
Expand Down Expand Up @@ -3567,6 +3603,9 @@ impl serde::Serialize for MaterializationSpec {
if !self.network_ports.is_empty() {
len += 1;
}
if !self.inactive_bindings.is_empty() {
len += 1;
}
let mut struct_ser = serializer.serialize_struct("flow.MaterializationSpec", len)?;
if !self.name.is_empty() {
struct_ser.serialize_field("name", &self.name)?;
Expand All @@ -3591,6 +3630,9 @@ impl serde::Serialize for MaterializationSpec {
if !self.network_ports.is_empty() {
struct_ser.serialize_field("networkPorts", &self.network_ports)?;
}
if !self.inactive_bindings.is_empty() {
struct_ser.serialize_field("inactiveBindings", &self.inactive_bindings)?;
}
struct_ser.end()
}
}
Expand All @@ -3613,6 +3655,8 @@ impl<'de> serde::Deserialize<'de> for MaterializationSpec {
"recoveryLogTemplate",
"network_ports",
"networkPorts",
"inactive_bindings",
"inactiveBindings",
];

#[allow(clippy::enum_variant_names)]
Expand All @@ -3624,6 +3668,7 @@ impl<'de> serde::Deserialize<'de> for MaterializationSpec {
ShardTemplate,
RecoveryLogTemplate,
NetworkPorts,
InactiveBindings,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
fn deserialize<D>(deserializer: D) -> std::result::Result<GeneratedField, D::Error>
Expand Down Expand Up @@ -3652,6 +3697,7 @@ impl<'de> serde::Deserialize<'de> for MaterializationSpec {
"shardTemplate" | "shard_template" => Ok(GeneratedField::ShardTemplate),
"recoveryLogTemplate" | "recovery_log_template" => Ok(GeneratedField::RecoveryLogTemplate),
"networkPorts" | "network_ports" => Ok(GeneratedField::NetworkPorts),
"inactiveBindings" | "inactive_bindings" => Ok(GeneratedField::InactiveBindings),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
}
Expand All @@ -3678,6 +3724,7 @@ impl<'de> serde::Deserialize<'de> for MaterializationSpec {
let mut shard_template__ = None;
let mut recovery_log_template__ = None;
let mut network_ports__ = None;
let mut inactive_bindings__ = None;
while let Some(k) = map_.next_key()? {
match k {
GeneratedField::Name => {
Expand Down Expand Up @@ -3722,6 +3769,12 @@ impl<'de> serde::Deserialize<'de> for MaterializationSpec {
}
network_ports__ = Some(map_.next_value()?);
}
GeneratedField::InactiveBindings => {
if inactive_bindings__.is_some() {
return Err(serde::de::Error::duplicate_field("inactiveBindings"));
}
inactive_bindings__ = Some(map_.next_value()?);
}
}
}
Ok(MaterializationSpec {
Expand All @@ -3732,6 +3785,7 @@ impl<'de> serde::Deserialize<'de> for MaterializationSpec {
shard_template: shard_template__,
recovery_log_template: recovery_log_template__,
network_ports: network_ports__.unwrap_or_default(),
inactive_bindings: inactive_bindings__.unwrap_or_default(),
})
}
}
Expand Down
Loading

0 comments on commit ed98526

Please sign in to comment.