Skip to content

Commit

Permalink
Merge branch 'main' into fix-avro-schema
Browse files Browse the repository at this point in the history
  • Loading branch information
Fokko authored Feb 5, 2024
2 parents bfbb699 + 6929b79 commit 7f565e4
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 23 deletions.
2 changes: 1 addition & 1 deletion crates/iceberg/src/spec/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ mod _const_schema {
])),
)),
];
let schema = Schema::builder().with_fields(fields).build().unwrap();
let schema = Schema::builder().with_fields(fields).build()?;
schema_to_avro_schema("manifest_entry", &schema)
}

Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ impl ManifestListWriter {
pub async fn close(self) -> Result<()> {
let data = self.avro_writer.into_inner()?;
let mut writer = self.output_file.writer().await?;
writer.write_all(&data).await.unwrap();
writer.close().await.unwrap();
writer.write_all(&data).await?;
writer.close().await?;
Ok(())
}
}
Expand Down
56 changes: 37 additions & 19 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ pub(crate) static INITIAL_SEQUENCE_NUMBER: i64 = 0;
/// Reference to [`TableMetadata`].
pub type TableMetadataRef = Arc<TableMetadata>;

#[derive(Debug, PartialEq, Serialize, Deserialize, Eq, Clone)]
#[serde(try_from = "TableMetadataEnum", into = "TableMetadataEnum")]
#[derive(Debug, PartialEq, Deserialize, Eq, Clone)]
#[serde(try_from = "TableMetadataEnum")]
/// Fields for the version 2 of the table metadata.
///
/// We assume that this data structure is always valid, so we will panic when invalid error happens.
Expand Down Expand Up @@ -379,6 +379,19 @@ pub(super) mod _serde {
#[derive(Debug, PartialEq, Eq)]
pub(super) struct VersionNumber<const V: u8>;

impl Serialize for TableMetadata {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
// we must do a clone here
let table_metadata_enum: TableMetadataEnum =
self.clone().try_into().map_err(serde::ser::Error::custom)?;

table_metadata_enum.serialize(serializer)
}
}

impl<const V: u8> Serialize for VersionNumber<V> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
Expand Down Expand Up @@ -412,12 +425,13 @@ pub(super) mod _serde {
}
}

impl From<TableMetadata> for TableMetadataEnum {
fn from(value: TableMetadata) -> Self {
match value.format_version {
impl TryFrom<TableMetadata> for TableMetadataEnum {
type Error = Error;
fn try_from(value: TableMetadata) -> Result<Self, Error> {
Ok(match value.format_version {
FormatVersion::V2 => TableMetadataEnum::V2(value.into()),
FormatVersion::V1 => TableMetadataEnum::V1(value.into()),
}
FormatVersion::V1 => TableMetadataEnum::V1(value.try_into()?),
})
}
}

Expand Down Expand Up @@ -676,9 +690,10 @@ pub(super) mod _serde {
}
}

impl From<TableMetadata> for TableMetadataV1 {
fn from(v: TableMetadata) -> Self {
TableMetadataV1 {
impl TryFrom<TableMetadata> for TableMetadataV1 {
type Error = Error;
fn try_from(v: TableMetadata) -> Result<Self, Error> {
Ok(TableMetadataV1 {
format_version: VersionNumber::<1>,
table_uuid: Some(v.table_uuid),
location: v.location,
Expand All @@ -687,7 +702,10 @@ pub(super) mod _serde {
schema: v
.schemas
.get(&v.current_schema_id)
.unwrap()
.ok_or(Error::new(
ErrorKind::Unexpected,
"current_schema_id not found in schemas",
))?
.as_ref()
.clone()
.into(),
Expand Down Expand Up @@ -748,7 +766,7 @@ pub(super) mod _serde {
.collect(),
),
default_sort_order_id: Some(v.default_sort_order_id),
}
})
}
}
}
Expand Down Expand Up @@ -867,12 +885,12 @@ mod tests {
{
"spec-id": 1,
"fields": [
{
"source-id": 4,
"field-id": 1000,
"name": "ts_day",
{
"source-id": 4,
"field-id": 1000,
"name": "ts_day",
"transform": "day"
}
}
]
}
],
Expand All @@ -882,8 +900,8 @@ mod tests {
"commit.retry.num-retries": "1"
},
"metadata-log": [
{
"metadata-file": "s3://bucket/.../v1.json",
{
"metadata-file": "s3://bucket/.../v1.json",
"timestamp-ms": 1515100
}
],
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ impl<'a> ReplaceSortOrderAction<'a> {
.table
.metadata()
.default_sort_order()
.unwrap()
.expect("default sort order impossible to be None")
.order_id,
},
];
Expand Down

0 comments on commit 7f565e4

Please sign in to comment.