Skip to content

Commit

Permalink
refactor: [WIP] refactor Snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
Dysprosium0626 committed Feb 17, 2024
1 parent acb4422 commit bc6c183
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 68 deletions.
12 changes: 6 additions & 6 deletions crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,18 +240,18 @@ pub struct TableCreation {
/// TableCommit represents the commit of a table in the catalog.
#[derive(Debug, TypedBuilder)]
#[builder(build_method(vis = "pub(crate)"))]
pub struct TableCommit {
pub struct TableCommit<'a> {
/// The table ident.
ident: TableIdent,
/// The requirements of the table.
///
/// Commit will fail if the requirements are not met.
requirements: Vec<TableRequirement>,
/// The updates of the table.
updates: Vec<TableUpdate>,
updates: Vec<TableUpdate<'a>>,
}

impl TableCommit {
impl<'a> TableCommit<'a> {
/// Return the table identifier.
pub fn identifier(&self) -> &TableIdent {
&self.ident
Expand Down Expand Up @@ -333,7 +333,7 @@ pub enum TableRequirement {
/// TableUpdate represents an update to a table in the catalog.
#[derive(Debug, Serialize, Deserialize, PartialEq)]
#[serde(tag = "action", rename_all = "kebab-case")]
pub enum TableUpdate {
pub enum TableUpdate<'a> {
/// Upgrade table's format version
#[serde(rename_all = "kebab-case")]
UpgradeFormatVersion {
Expand Down Expand Up @@ -387,7 +387,7 @@ pub enum TableUpdate {
#[serde(rename_all = "kebab-case")]
AddSnapshot {
/// Snapshot to add.
snapshot: Snapshot,
snapshot: Snapshot<'a>,
},
/// Set table's snapshot ref.
#[serde(rename_all = "kebab-case")]
Expand Down Expand Up @@ -910,7 +910,7 @@ mod tests {
.with_parent_snapshot_id(Some(3051729675574597000))
.with_timestamp_ms(1555100955770)
.with_sequence_number(1)
.with_manifest_list("s3://a/b/2.avro".to_string())
.with_manifest_list("s3://a/b/2.avro")
.with_schema_id(1)
.with_summary(Summary {
operation: Operation::Append,
Expand Down
20 changes: 10 additions & 10 deletions crates/iceberg/src/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use futures::StreamExt;

/// Builder to create table scan.
pub struct TableScanBuilder<'a> {
table: &'a Table,
table: &'a Table<'a>,
// Empty column names means to select all columns
column_names: Vec<String>,
snapshot_id: Option<i64>,
Expand Down Expand Up @@ -64,7 +64,7 @@ impl<'a> TableScanBuilder<'a> {
}

/// Build the table scan.
pub fn build(self) -> crate::Result<TableScan> {
pub fn build(self) -> crate::Result<TableScan<'a>> {
let snapshot = match self.snapshot_id {
Some(snapshot_id) => self
.table
Expand Down Expand Up @@ -117,9 +117,9 @@ impl<'a> TableScanBuilder<'a> {
/// Table scan.
#[derive(Debug)]
#[allow(dead_code)]
pub struct TableScan {
snapshot: SnapshotRef,
table_metadata: TableMetadataRef,
pub struct TableScan<'a> {
snapshot: SnapshotRef<'a>,
table_metadata: TableMetadataRef<'a>,
file_io: FileIO,
column_names: Vec<String>,
schema: SchemaRef,
Expand All @@ -128,7 +128,7 @@ pub struct TableScan {
/// A stream of [`FileScanTask`].
pub type FileScanTaskStream = BoxStream<'static, crate::Result<FileScanTask>>;

impl TableScan {
impl<'a> TableScan<'a> {
/// Returns a stream of file scan tasks.
pub async fn plan_files(&self) -> crate::Result<FileScanTaskStream> {
let manifest_list = self
Expand Down Expand Up @@ -200,12 +200,12 @@ mod tests {
use tera::{Context, Tera};
use uuid::Uuid;

struct TableTestFixture {
struct TableTestFixture<'a> {
table_location: String,
table: Table,
table: Table<'a>,
}

impl TableTestFixture {
impl<'a> TableTestFixture<'a> {
fn new() -> Self {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().join("table1");
Expand Down Expand Up @@ -403,7 +403,7 @@ mod tests {
fixture
.table
.file_io()
.new_output(current_snapshot.manifest_list_file_path().unwrap())
.new_output(current_snapshot.manifest_list_file_path())
.unwrap(),
current_snapshot.snapshot_id(),
current_snapshot
Expand Down
54 changes: 27 additions & 27 deletions crates/iceberg/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use crate::{Error, ErrorKind};
use _serde::SnapshotV2;

/// Reference to [`Snapshot`].
pub type SnapshotRef = Arc<Snapshot>;
pub type SnapshotRef<'a> = Arc<Snapshot<'a>>;
#[derive(Debug, Serialize, Deserialize, PartialEq, Eq, Clone)]
#[serde(rename_all = "lowercase")]
/// The operation field is used by some operations, like snapshot expiration, to skip processing certain snapshots.
Expand Down Expand Up @@ -69,7 +69,7 @@ impl Default for Operation {
#[serde(from = "SnapshotV2", into = "SnapshotV2")]
#[builder(field_defaults(setter(prefix = "with_")))]
/// A snapshot represents the state of a table at some time and is used to access the complete set of data files in the table.
pub struct Snapshot {
pub struct Snapshot<'a> {
/// A unique long ID
snapshot_id: i64,
/// The snapshot ID of the snapshot’s parent.
Expand All @@ -84,15 +84,17 @@ pub struct Snapshot {
timestamp_ms: i64,
/// The location of a manifest list for this snapshot that
/// tracks manifest files with additional metadata.
manifest_list: String,
/// Currently we only support manifest list file and manifest files are not supported.
#[builder(setter(into))]
manifest_list: &'a str,
/// A string map that summarizes the snapshot changes, including operation.
summary: Summary,
/// ID of the table’s current schema when the snapshot was created.
#[builder(setter(strip_option), default = None)]
schema_id: Option<SchemaId>,
}

impl Snapshot {
impl<'a> Snapshot<'a> {
/// Get the id of the snapshot
#[inline]
pub fn snapshot_id(&self) -> i64 {
Expand All @@ -112,16 +114,16 @@ impl Snapshot {
}
/// Get location of manifest_list file
#[inline]
pub fn manifest_list(&self) -> &String {
pub fn manifest_list(&self) -> &str {
&self.manifest_list
}

/// Return the manifest list file path.
///
/// It will return an error if the manifest list is not a file but a list of manifest file paths.
#[inline]
pub fn manifest_list_file_path(&self) -> Result<&str> {
Ok(&self.manifest_list)
pub fn manifest_list_file_path(&self) -> &str {
self.manifest_list
}
/// Get summary of the snapshot
#[inline]
Expand Down Expand Up @@ -169,7 +171,7 @@ impl Snapshot {
pub async fn load_manifest_list(
&self,
file_io: &FileIO,
table_metadata: &TableMetadata,
table_metadata: &TableMetadata<'a>,
) -> Result<ManifestList> {
let mut manifest_list_content = Vec::new();
file_io
Expand Down Expand Up @@ -213,20 +215,20 @@ pub(super) mod _serde {
use serde::{Deserialize, Serialize};

use crate::spec::SchemaId;
use crate::{Error, ErrorKind};
use crate::Error;

use super::{Operation, Snapshot, Summary};

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "kebab-case")]
/// Defines the structure of a v2 snapshot for serialization/deserialization
pub(crate) struct SnapshotV2 {
pub(crate) struct SnapshotV2<'a> {
pub snapshot_id: i64,
#[serde(skip_serializing_if = "Option::is_none")]
pub parent_snapshot_id: Option<i64>,
pub sequence_number: i64,
pub timestamp_ms: i64,
pub manifest_list: String,
pub manifest_list: &'a str,
pub summary: Summary,
#[serde(skip_serializing_if = "Option::is_none")]
pub schema_id: Option<SchemaId>,
Expand All @@ -243,26 +245,28 @@ pub(super) mod _serde {
#[serde(skip_serializing_if = "Option::is_none")]
pub manifest_list: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub manifests: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub summary: Option<Summary>,
#[serde(skip_serializing_if = "Option::is_none")]
pub schema_id: Option<SchemaId>,
}

impl From<SnapshotV2> for Snapshot {
impl<'a> From<SnapshotV2<'a>> for Snapshot<'a> {
fn from(v2: SnapshotV2) -> Self {
Snapshot {
snapshot_id: v2.snapshot_id,
parent_snapshot_id: v2.parent_snapshot_id,
sequence_number: v2.sequence_number,
timestamp_ms: v2.timestamp_ms,
manifest_list: v2.manifest_list,
manifest_list: &v2.manifest_list,
summary: v2.summary,
schema_id: v2.schema_id,
}
}
}

impl From<Snapshot> for SnapshotV2 {
impl<'a> From<Snapshot<'a>> for SnapshotV2<'a> {
fn from(v2: Snapshot) -> Self {
SnapshotV2 {
snapshot_id: v2.snapshot_id,
Expand All @@ -276,7 +280,7 @@ pub(super) mod _serde {
}
}

impl TryFrom<SnapshotV1> for Snapshot {
impl<'a> TryFrom<SnapshotV1> for Snapshot<'a> {
type Error = Error;

fn try_from(v1: SnapshotV1) -> Result<Self, Self::Error> {
Expand All @@ -285,15 +289,11 @@ pub(super) mod _serde {
parent_snapshot_id: v1.parent_snapshot_id,
sequence_number: 0,
timestamp_ms: v1.timestamp_ms,
manifest_list: match v1.manifest_list {
Some(file) => file,
None => {
return Err(Error::new(
ErrorKind::DataInvalid,
"Neither manifestlist file or manifest files are provided.",
))
}
},
manifest_list: match (v1.manifest_list, v1.manifests) {
(Some(file), None) => &file,
(Some(_), Some(_)) => "Invalid v1 snapshot, when manifest list provided, manifest files should be omitted",
(None, _) => "Unsupported v1 snapshot, only manifest list is supported"
},
summary: v1.summary.unwrap_or(Summary {
operation: Operation::default(),
other: HashMap::new(),
Expand All @@ -303,16 +303,16 @@ pub(super) mod _serde {
}
}

impl From<Snapshot> for SnapshotV1 {
impl<'a> From<Snapshot<'a>> for SnapshotV1 {
fn from(v2: Snapshot) -> Self {
let manifest_list = Some(v2.manifest_list);
SnapshotV1 {
snapshot_id: v2.snapshot_id,
parent_snapshot_id: v2.parent_snapshot_id,
timestamp_ms: v2.timestamp_ms,
manifest_list,
manifest_list: Some(v2.manifest_list.to_string()),
summary: Some(v2.summary),
schema_id: v2.schema_id,
manifests: None,
}
}
}
Expand Down
Loading

0 comments on commit bc6c183

Please sign in to comment.