From 09765db611a65a21b88e839d781780c75924e560 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 5 Feb 2024 09:45:23 +0800 Subject: [PATCH] feat: Bump hive_metastore to use pure rust thrift impl `volo` (#174) --- Cargo.toml | 3 +- crates/catalog/hms/Cargo.toml | 10 ++-- crates/catalog/hms/src/catalog.rs | 76 +++++++++++++++---------------- crates/catalog/hms/src/utils.rs | 17 ++++++- 4 files changed, 59 insertions(+), 47 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5c01a90ca..c2388b0fa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,5 +67,6 @@ typed-builder = "^0.18" url = "2" urlencoding = "2" uuid = "1.6.1" - +volo-thrift = "0.9.2" +hive_metastore = "0.0.2" tera = "1" diff --git a/crates/catalog/hms/Cargo.toml b/crates/catalog/hms/Cargo.toml index 693d4e947..e03733b74 100644 --- a/crates/catalog/hms/Cargo.toml +++ b/crates/catalog/hms/Cargo.toml @@ -28,13 +28,9 @@ license = { workspace = true } keywords = ["iceberg", "hive", "catalog"] [dependencies] +anyhow = { workspace = true } async-trait = { workspace = true } -hive_metastore = "0.0.1" +hive_metastore = { workspace = true } iceberg = { workspace = true } -# the thrift upstream suffered from no regular rust release. -# -# [test-rs](https://github.com/tent-rs) is an organization that helps resolves this -# issue. And [tent-thrift](https://github.com/tent-rs/thrift) is a fork of the thrift -# crate, built from the thrift upstream with only version bumped. -thrift = { package = "tent-thrift", version = "0.18.1" } typed-builder = { workspace = true } +volo-thrift = { workspace = true } diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 2b1fe2cc4..f3eab62ea 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -17,16 +17,16 @@ use super::utils::*; use async_trait::async_trait; -use hive_metastore::{TThriftHiveMetastoreSyncClient, ThriftHiveMetastoreSyncClient}; +use hive_metastore::ThriftHiveMetastoreClient; +use hive_metastore::ThriftHiveMetastoreClientBuilder; use iceberg::table::Table; -use iceberg::{Catalog, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, TableIdent}; +use iceberg::{ + Catalog, Error, ErrorKind, Namespace, NamespaceIdent, Result, TableCommit, TableCreation, + TableIdent, +}; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; -use std::sync::{Arc, Mutex}; -use thrift::protocol::{TBinaryInputProtocol, TBinaryOutputProtocol}; -use thrift::transport::{ - ReadHalf, TBufferedReadTransport, TBufferedWriteTransport, TIoChannel, WriteHalf, -}; +use std::net::ToSocketAddrs; use typed_builder::TypedBuilder; /// Hive metastore Catalog configuration. @@ -35,24 +35,7 @@ pub struct HmsCatalogConfig { address: String, } -/// TODO: We only support binary protocol for now. -type HmsClientType = ThriftHiveMetastoreSyncClient< - TBinaryInputProtocol>>, - TBinaryOutputProtocol>>, ->; - -/// # TODO -/// -/// we are using the same connection everytime, we should support connection -/// pool in the future. -struct HmsClient(Arc>); - -impl HmsClient { - fn call(&self, f: impl FnOnce(&mut HmsClientType) -> thrift::Result) -> Result { - let mut client = self.0.lock().unwrap(); - f(&mut client).map_err(from_thrift_error) - } -} +struct HmsClient(ThriftHiveMetastoreClient); /// Hive metastore Catalog. pub struct HmsCatalog { @@ -71,19 +54,29 @@ impl Debug for HmsCatalog { impl HmsCatalog { /// Create a new hms catalog. pub fn new(config: HmsCatalogConfig) -> Result { - let mut channel = thrift::transport::TTcpChannel::new(); - channel - .open(config.address.as_str()) - .map_err(from_thrift_error)?; - let (i_chan, o_chan) = channel.split().map_err(from_thrift_error)?; - let i_chan = TBufferedReadTransport::new(i_chan); - let o_chan = TBufferedWriteTransport::new(o_chan); - let i_proto = TBinaryInputProtocol::new(i_chan, true); - let o_proto = TBinaryOutputProtocol::new(o_chan, true); - let client = ThriftHiveMetastoreSyncClient::new(i_proto, o_proto); + let address = config + .address + .as_str() + .to_socket_addrs() + .map_err(from_io_error)? + .next() + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + format!("invalid address: {}", config.address), + ) + })?; + + let client = ThriftHiveMetastoreClientBuilder::new("hms") + .address(address) + // Framed thrift rpc is not enabled by default in HMS, we use + // buffered instead. + .make_codec(volo_thrift::codec::default::DefaultMakeCodec::buffered()) + .build(); + Ok(Self { config, - client: HmsClient(Arc::new(Mutex::new(client))), + client: HmsClient(client), }) } } @@ -103,10 +96,17 @@ impl Catalog for HmsCatalog { let dbs = if parent.is_some() { return Ok(vec![]); } else { - self.client.call(|client| client.get_all_databases())? + self.client + .0 + .get_all_databases() + .await + .map_err(from_thrift_error)? }; - Ok(dbs.into_iter().map(NamespaceIdent::new).collect()) + Ok(dbs + .into_iter() + .map(|v| NamespaceIdent::new(v.into())) + .collect()) } async fn create_namespace( diff --git a/crates/catalog/hms/src/utils.rs b/crates/catalog/hms/src/utils.rs index 0daa52aa1..1b0ff33df 100644 --- a/crates/catalog/hms/src/utils.rs +++ b/crates/catalog/hms/src/utils.rs @@ -15,13 +15,28 @@ // specific language governing permissions and limitations // under the License. +use anyhow::anyhow; use iceberg::{Error, ErrorKind}; +use std::fmt::Debug; +use std::io; /// Format a thrift error into iceberg error. -pub fn from_thrift_error(error: thrift::Error) -> Error { +pub fn from_thrift_error(error: volo_thrift::error::ResponseError) -> Error +where + T: Debug, +{ Error::new( ErrorKind::Unexpected, "operation failed for hitting thrift error".to_string(), ) + .with_source(anyhow!("thrift error: {:?}", error)) +} + +/// Format an io error into iceberg error. +pub fn from_io_error(error: io::Error) -> Error { + Error::new( + ErrorKind::Unexpected, + "operation failed for hitting io error".to_string(), + ) .with_source(error) }