Skip to content

Commit

Permalink
Allow the LogStore to self-identify to make it easier to test
Browse files Browse the repository at this point in the history
This also makes sure the DefaultLogStore is used for non-dynamodb
locking s3 instances
  • Loading branch information
rtyler committed Dec 29, 2023
1 parent fbfa83f commit 7de7f0b
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 31 deletions.
58 changes: 43 additions & 15 deletions crates/deltalake-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod logstore;
pub mod storage;

use lazy_static::lazy_static;
use log::*;
use regex::Regex;
use std::{
collections::HashMap,
Expand All @@ -13,9 +14,9 @@ use std::{
time::{Duration, SystemTime},
};

use deltalake_core::logstore::{logstores, LogStore, LogStoreFactory};
use deltalake_core::storage::{factories, url_prefix_handler, ObjectStoreRef, StorageOptions};
use deltalake_core::{DeltaResult, Path};
use deltalake_core::logstore::{LogStoreFactory, LogStore, logstores};
use deltalake_core::storage::{factories, ObjectStoreRef, StorageOptions, url_prefix_handler};
use rusoto_core::{HttpClient, Region, RusotoError};
use rusoto_credential::AutoRefreshingProvider;
use rusoto_dynamodb::{
Expand All @@ -27,33 +28,44 @@ use rusoto_sts::WebIdentityProvider;
use url::Url;

use errors::{DynamoDbConfigError, LockClientError};
use storage::{S3StorageOptions, S3ObjectStoreFactory};
use storage::{S3ObjectStoreFactory, S3StorageOptions};

#[derive(Clone, Debug, Default)]
struct S3LogStoreFactory {}

impl LogStoreFactory for S3LogStoreFactory {
fn with_options(&self, store: ObjectStoreRef, location: &Url, options: &StorageOptions) -> DeltaResult<Arc<dyn LogStore>> {
let store = url_prefix_handler(store,
Path::parse(location.path())?)?;
fn with_options(
&self,
store: ObjectStoreRef,
location: &Url,
options: &StorageOptions,
) -> DeltaResult<Arc<dyn LogStore>> {
let store = url_prefix_handler(store, Path::parse(location.path())?)?;
let s3_options = S3StorageOptions::from_map(&options.0);

if s3_options.locking_provider.as_deref() != Some("dynamodb") {
println!("RETURNING A DEFAAULT");
debug!("S3LogStoreFactory has been asked to create a LogStore without the dynamodb locking provider");
return Ok(deltalake_core::logstore::default_logstore(
store, location, options,
));
}

Ok(Arc::new(logstore::S3DynamoDbLogStore::try_new(
location.clone(),
options.clone(),
&S3StorageOptions::default(),
store)?))
&s3_options,
store,
)?))
}
}

/// Register an [ObjectStoreFactory] for common S3 [Url] schemes
pub fn register_handlers(_additional_prefixes: Option<Url>) {
for scheme in ["s3", "s3a"].iter() {
let url = Url::parse(&format!("{}://", scheme)).unwrap();
factories().insert(url.clone(),
Arc::new(S3ObjectStoreFactory::default()),
);
logstores().insert(url.clone(),
Arc::new(S3LogStoreFactory::default()),
);
factories().insert(url.clone(), Arc::new(S3ObjectStoreFactory::default()));
logstores().insert(url.clone(), Arc::new(S3LogStoreFactory::default()));
}
}

Expand Down Expand Up @@ -549,8 +561,9 @@ fn extract_version_from_filename(name: &str) -> Option<i64> {

#[cfg(test)]
mod tests {

use super::*;
use object_store::memory::InMemory;
use serial_test::serial;

fn commit_entry_roundtrip(c: &CommitEntry) -> Result<(), LockClientError> {
let item_data: HashMap<String, AttributeValue> = create_value_map(c, "some_table");
Expand Down Expand Up @@ -582,4 +595,19 @@ mod tests {
})?;
Ok(())
}

/// In cases where there is no dynamodb specified locking provider, this should get a default
/// logstore
#[test]
#[serial]
fn test_logstore_factory_default() {
let factory = S3LogStoreFactory::default();
let store = InMemory::new();
let url = Url::parse("s3://test-bucket").unwrap();
std::env::remove_var(storage::s3_constants::AWS_S3_LOCKING_PROVIDER);
let logstore = factory
.with_options(Arc::new(store), &url, &StorageOptions::from(HashMap::new()))
.unwrap();
assert_eq!(logstore.name(), "DefaultLogStore");
}
}
4 changes: 4 additions & 0 deletions crates/deltalake-aws/src/logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,10 @@ impl S3DynamoDbLogStore {

#[async_trait::async_trait]
impl LogStore for S3DynamoDbLogStore {
fn name(&self) -> String {
"S3DynamoDbLogStore".into()
}

fn root_uri(&self) -> String {
self.table_path.clone()
}
Expand Down
5 changes: 1 addition & 4 deletions crates/deltalake-aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use deltalake_core::storage::object_store::{
aws::AmazonS3ConfigKey, parse_url_opts, GetOptions, GetResult, ListResult, MultipartId,
ObjectMeta, ObjectStore, Result as ObjectStoreResult,
};
use deltalake_core::storage::{
str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions,
};
use deltalake_core::storage::{str_is_truthy, ObjectStoreFactory, ObjectStoreRef, StorageOptions};
use deltalake_core::{DeltaResult, ObjectStoreError, Path};
use futures::stream::BoxStream;
use rusoto_core::Region;
Expand All @@ -22,7 +20,6 @@ use url::Url;

const STORE_NAME: &str = "DeltaS3ObjectStore";


#[derive(Clone, Default, Debug)]
pub struct S3ObjectStoreFactory {}

Expand Down
9 changes: 0 additions & 9 deletions crates/deltalake-aws/tests/integration_read.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
#![cfg(feature = "integration_test")]

use deltalake_aws::storage::*;
use deltalake_core::{DeltaTableBuilder, Path};
use deltalake_test::utils::*;
use serial_test::serial;
use url::Url;

mod common;
use common::*;
Expand Down Expand Up @@ -97,13 +95,6 @@ async fn read_encoded_table(integration: &IntegrationContext, root_path: &str) -

async fn read_simple_table(integration: &IntegrationContext) -> TestResult {
let table_uri = integration.uri_for_table(TestTables::Simple);
// the s3 options don't hurt us for other integrations ...
#[cfg(any(feature = "s3", feature = "s3-native-tls"))]
let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.load()
.await?;
#[cfg(not(any(feature = "s3", feature = "s3-native-tls")))]
let table = DeltaTableBuilder::from_uri(table_uri)
.with_allow_http(true)
.load()
Expand Down
4 changes: 4 additions & 0 deletions crates/deltalake-core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ impl DefaultLogStore {

#[async_trait::async_trait]
impl LogStore for DefaultLogStore {
fn name(&self) -> String {
"DefaultLogStore".into()
}

async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>> {
super::read_commit_entry(self.storage.as_ref(), version).await
}
Expand Down
14 changes: 11 additions & 3 deletions crates/deltalake-core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,19 @@ pub trait LogStoreFactory: Send + Sync {
location: &Url,
options: &StorageOptions,
) -> DeltaResult<Arc<dyn LogStore>> {
Ok(Arc::new(default_logstore::DefaultLogStore::new(
Ok(default_logstore(store, location, options))
}
}

/// Return the [DefaultLogStore] implementation with the provided configuration options
pub fn default_logstore(store: ObjectStoreRef, location: &Url, options: &StorageOptions) -> Arc<dyn LogStore> {
Arc::new(default_logstore::DefaultLogStore::new(
store,
LogStoreConfig {
location: location.clone(),
options: options.clone(),
},
)))
}
))
}

#[derive(Clone, Debug, Default)]
Expand Down Expand Up @@ -156,6 +161,9 @@ pub struct LogStoreConfig {
/// become visible immediately.
#[async_trait::async_trait]
pub trait LogStore: Sync + Send {
/// Return the name of this LogStore implementation
fn name(&self) -> String;

/// Read data for commit entry with the given version.
async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>>;

Expand Down

0 comments on commit 7de7f0b

Please sign in to comment.