Skip to content

Commit 598911d

Browse files
martin-gRik Heijdens
and
Rik Heijdens
authored
AVRO-3814: Fix schema resolution for records in union types (#2441)
* AVRO-3814: Add a minimal test-case to reproduce Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3814: Fix schema resolution for records in union types The logic for validation records in Value::validate_internal() would be too strict when resolving union types containing a record. This could lead to a situation where schema resolution would fail because the correct schema to use for a union type could not be identified. This commit fixes this by passing a boolean `schema_resolution` to `Value::validate_internal()` which governs whether schema_resolution rules should be applied. * AVRO-3814: Ensure to validate the deserialized value against the schema * AVRO-3814: Extend test case for validate_record * AVRO-3814: Revert whitespace changes * AVRO-3814: Remove confusing comments * AVRO-3786: Add test-cases and fix for AVRO-3786 * AVRO-3786: Revert change to UnionSchema::find_schema_with_known_schemata * AVRO-3814: [Rust] Use types::Value::resolve_internal() instead of validate_internal() ... when looking for the matching schema in an union Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3814: Revert changes to validate_internal() Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> * AVRO-3814: Remove obsolete rustdoc for arguments Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> --------- Signed-off-by: Martin Tzvetanov Grigorov <[email protected]> Co-authored-by: Rik Heijdens <[email protected]>
1 parent 56a0e55 commit 598911d

File tree

3 files changed

+858
-3
lines changed

3 files changed

+858
-3
lines changed

lang/rust/avro/src/schema.rs

+128-2
Original file line numberDiff line numberDiff line change
@@ -837,9 +837,11 @@ impl UnionSchema {
837837
// extend known schemas with just resolved names
838838
collected_names.extend(resolved_names);
839839
let namespace = &schema.namespace().or_else(|| enclosing_namespace.clone());
840+
840841
value
841-
.validate_internal(schema, &collected_names, namespace)
842-
.is_none()
842+
.clone()
843+
.resolve_internal(schema, &collected_names, namespace, &None)
844+
.is_ok()
843845
})
844846
}
845847
}
@@ -5209,4 +5211,128 @@ mod tests {
52095211

52105212
Ok(())
52115213
}
5214+
5215+
#[test]
5216+
fn test_avro_3814_schema_resolution_failure() -> TestResult {
5217+
// Define a reader schema: a nested record with an optional field.
5218+
let reader_schema = json!(
5219+
{
5220+
"type": "record",
5221+
"name": "MyOuterRecord",
5222+
"fields": [
5223+
{
5224+
"name": "inner_record",
5225+
"type": [
5226+
"null",
5227+
{
5228+
"type": "record",
5229+
"name": "MyRecord",
5230+
"fields": [
5231+
{"name": "a", "type": "string"}
5232+
]
5233+
}
5234+
],
5235+
"default": null
5236+
}
5237+
]
5238+
}
5239+
);
5240+
5241+
// Define a writer schema: a nested record with an optional field, which
5242+
// may optionally contain an enum.
5243+
let writer_schema = json!(
5244+
{
5245+
"type": "record",
5246+
"name": "MyOuterRecord",
5247+
"fields": [
5248+
{
5249+
"name": "inner_record",
5250+
"type": [
5251+
"null",
5252+
{
5253+
"type": "record",
5254+
"name": "MyRecord",
5255+
"fields": [
5256+
{"name": "a", "type": "string"},
5257+
{
5258+
"name": "b",
5259+
"type": [
5260+
"null",
5261+
{
5262+
"type": "enum",
5263+
"name": "MyEnum",
5264+
"symbols": ["A", "B", "C"],
5265+
"default": "C"
5266+
}
5267+
],
5268+
"default": null
5269+
},
5270+
]
5271+
}
5272+
]
5273+
}
5274+
],
5275+
"default": null
5276+
}
5277+
);
5278+
5279+
// Use different structs to represent the "Reader" and the "Writer"
5280+
// to mimic two different versions of a producer & consumer application.
5281+
#[derive(Serialize, Deserialize, Debug)]
5282+
struct MyInnerRecordReader {
5283+
a: String,
5284+
}
5285+
5286+
#[derive(Serialize, Deserialize, Debug)]
5287+
struct MyRecordReader {
5288+
inner_record: Option<MyInnerRecordReader>,
5289+
}
5290+
5291+
#[derive(Serialize, Deserialize, Debug)]
5292+
enum MyEnum {
5293+
A,
5294+
B,
5295+
C,
5296+
}
5297+
5298+
#[derive(Serialize, Deserialize, Debug)]
5299+
struct MyInnerRecordWriter {
5300+
a: String,
5301+
b: Option<MyEnum>,
5302+
}
5303+
5304+
#[derive(Serialize, Deserialize, Debug)]
5305+
struct MyRecordWriter {
5306+
inner_record: Option<MyInnerRecordWriter>,
5307+
}
5308+
5309+
let s = MyRecordWriter {
5310+
inner_record: Some(MyInnerRecordWriter {
5311+
a: "foo".to_string(),
5312+
b: None,
5313+
}),
5314+
};
5315+
5316+
// Serialize using the writer schema.
5317+
let writer_schema = Schema::parse(&writer_schema)?;
5318+
let avro_value = crate::to_value(s)?;
5319+
assert!(
5320+
avro_value.validate(&writer_schema),
5321+
"value is valid for schema",
5322+
);
5323+
let datum = crate::to_avro_datum(&writer_schema, avro_value)?;
5324+
5325+
// Now, attempt to deserialize using the reader schema.
5326+
let reader_schema = Schema::parse(&reader_schema)?;
5327+
let mut x = &datum[..];
5328+
5329+
// Deserialization should succeed and we should be able to resolve the schema.
5330+
let deser_value = crate::from_avro_datum(&writer_schema, &mut x, Some(&reader_schema))?;
5331+
assert!(deser_value.validate(&reader_schema));
5332+
5333+
// Verify that we can read a field from the record.
5334+
let d: MyRecordReader = crate::from_value(&deser_value)?;
5335+
assert_eq!(d.inner_record.unwrap().a, "foo".to_string());
5336+
Ok(())
5337+
}
52125338
}

lang/rust/avro/src/types.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -377,6 +377,7 @@ impl Value {
377377
}
378378
}
379379

380+
/// Validates the value against the provided schema.
380381
pub(crate) fn validate_internal<S: std::borrow::Borrow<Schema> + Debug>(
381382
&self,
382383
schema: &Schema,
@@ -516,6 +517,7 @@ impl Value {
516517
let non_nullable_fields_count =
517518
fields.iter().filter(|&rf| !rf.is_nullable()).count();
518519

520+
// If the record contains fewer fields as required fields by the schema, it is invalid.
519521
if record_fields.len() < non_nullable_fields_count {
520522
return Some(format!(
521523
"The value's records length ({}) doesn't match the schema ({} non-nullable fields)",
@@ -603,7 +605,7 @@ impl Value {
603605
self.resolve_internal(schema, rs.get_names(), &enclosing_namespace, &None)
604606
}
605607

606-
fn resolve_internal(
608+
pub(crate) fn resolve_internal(
607609
mut self,
608610
schema: &Schema,
609611
names: &NamesRef,

0 commit comments

Comments
 (0)