From 92ff41aef71b292b1e21ffb9b3929787ee0f1870 Mon Sep 17 00:00:00 2001 From: alexandreyc Date: Wed, 1 May 2024 16:28:47 +0200 Subject: [PATCH 01/31] Add the driver manager --- rust/core/Cargo.toml | 6 +- rust/core/src/driver_manager.rs | 1207 +++++++++++++++++++++++++++++++ rust/core/src/error.rs | 12 - rust/core/src/ffi/types.rs | 5 +- rust/core/src/lib.rs | 4 +- 5 files changed, 1217 insertions(+), 17 deletions(-) create mode 100644 rust/core/src/driver_manager.rs diff --git a/rust/core/Cargo.toml b/rust/core/Cargo.toml index fc5519946a..fd7bc88208 100644 --- a/rust/core/Cargo.toml +++ b/rust/core/Cargo.toml @@ -14,7 +14,6 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. - [package] name = "adbc_core" description = "Public abstract API, driver manager and driver exporter" @@ -25,5 +24,8 @@ license = { workspace = true } [dependencies] arrow = { workspace = true } -libloading = "0.8" +libloading = { version = "0.8", optional = true } once_cell = "1.19.0" + +[features] +driver_manager = ["dep:libloading"] diff --git a/rust/core/src/driver_manager.rs b/rust/core/src/driver_manager.rs new file mode 100644 index 0000000000..87e8670b2c --- /dev/null +++ b/rust/core/src/driver_manager.rs @@ -0,0 +1,1207 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Load and use ADBC drivers. +//! +//! The driver manager provides an implementation of the ADBC interface which +//! uses FFI to wrap an object file implementation of +//! [`adbc.h`](https://github.com/apache/arrow-adbc/blob/main/adbc.h). +//! +//! There are three ways that drivers can be loaded: +//! 1. By statically linking the driver implementation using +//! [ManagedDriver::load_static]. +//! 2. By dynamically linking the driver implementation using +//! [ManagedDriver::load_static]. +//! 3. By loading the driver implementation at runtime (with +//! `dlopen/LoadLibrary`) using [ManagedDriver::load_dynamic]. +//! +//! Drivers are initialized using a function provided by the driver as a main +//! entrypoint, canonically called `AdbcDriverInit`. Although many will use a +//! different name to support statically linking multiple drivers within the +//! same program. +//! +//! ## Using across threads +//! +//! [ManagedDriver], [ManagedDatabase], [ManagedConnection] and [ManagedStatement] +//! can be used across threads though all of their operations are serialized +//! under the hood. They hold their inner implementations within [std::sync::Arc], +//! so they are cheaply clonable. +//! +//! ## Example +//! +//! ```rust +//! # use std::sync::Arc; +//! # use arrow::{ +//! # array::{Array, StringArray, Int64Array, Float64Array}, +//! # record_batch::{RecordBatch, RecordBatchReader}, +//! # datatypes::{Field, Schema, DataType}, +//! # compute::concat_batches, +//! # }; +//! # use adbc_core::{ +//! # driver_manager::ManagedDriver, +//! # options::{AdbcVersion, OptionDatabase, OptionStatement}, +//! # Connection, Database, Driver, Statement, Optionable +//! # }; +//! # fn main() -> Result<(), Box> { +//! let opts = [(OptionDatabase::Uri, ":memory:".into())]; +//! let mut driver = ManagedDriver::load_dynamic("adbc_driver_sqlite", None, AdbcVersion::V100)?; +//! let mut database = driver.new_database_with_opts(opts)?; +//! let mut connection = database.new_connection()?; +//! let mut statement = connection.new_statement()?; +//! +//! // Define some data. +//! # let columns: Vec> = vec![ +//! # Arc::new(Int64Array::from(vec![1, 2, 3, 4])), +//! # Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])), +//! # Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), +//! # ]; +//! # let schema = Schema::new(vec![ +//! # Field::new("a", DataType::Int64, true), +//! # Field::new("b", DataType::Float64, true), +//! # Field::new("c", DataType::Utf8, true), +//! # ]); +//! let input: RecordBatch = RecordBatch::try_new(Arc::new(schema), columns)?; +//! +//! // Ingest data. +//! statement.set_option(OptionStatement::TargetTable, "my_table".into())?; +//! statement.bind(input.clone())?; +//! statement.execute_update()?; +//! +//! // Extract data. +//! statement.set_sql_query("select * from my_table")?; +//! let output = statement.execute()?; +//! let schema = output.schema(); +//! let output: Result, _> = output.collect(); +//! let output = concat_batches(&schema, &output?)?; +//! assert_eq!(input, output); +//! +//! # Ok(()) +//! # } +//! ``` + +use std::collections::HashSet; +use std::ffi::{CStr, CString}; +use std::ops::DerefMut; +use std::os::raw::{c_char, c_void}; +use std::ptr::{null, null_mut}; +use std::sync::{Arc, Mutex}; + +use arrow::array::{Array, RecordBatch, RecordBatchReader, StructArray}; +use arrow::ffi::{to_ffi, FFI_ArrowSchema}; +use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; + +use crate::{ + error::{Error, Status}, + options::{self, AdbcVersion, InfoCode, OptionValue}, + PartitionedResult, Result, +}; +use crate::{ffi, ffi::types::driver_method, Optionable}; +use crate::{Connection, Database, Driver, Statement}; + +const ERR_ONLY_STRING_OPT: &str = "Only string option value are supported with ADBC 1.0.0"; +const ERR_CANCEL_UNSUPPORTED: &str = + "Canceling connection or statement is not supported with ADBC 1.0.0"; +const ERR_STATISTICS_UNSUPPORTED: &str = "Statistics are not supported with ADBC 1.0.0"; + +fn check_status(status: ffi::FFI_AdbcStatusCode, error: ffi::FFI_AdbcError) -> Result<()> { + match status { + ffi::constants::ADBC_STATUS_OK => Ok(()), + _ => { + let mut error: Error = error.try_into()?; + error.status = status.try_into()?; + Err(error) + } + } +} + +impl From for Error { + fn from(value: libloading::Error) -> Self { + Self { + message: format!("Error with dynamic library: {value}"), + status: Status::Internal, + vendor_code: 0, + sqlstate: [0; 5], + details: None, + } + } +} + +struct DriverManagerInner { + driver: Mutex, + version: AdbcVersion, // Driver version + _library: Option, +} + +/// Implementation of [Driver]. +#[derive(Clone)] +pub struct ManagedDriver { + inner: Arc, +} + +impl ManagedDriver { + /// Load a driver from an initialization function. + pub fn load_static(init: &ffi::FFI_AdbcDriverInitFunc, version: AdbcVersion) -> Result { + let driver = Self::load_impl(init, version)?; + let inner = Arc::new(DriverManagerInner { + driver: Mutex::new(driver), + version, + _library: None, + }); + Ok(ManagedDriver { inner }) + } + + /// Load a driver from a dynamic library. + /// + /// Will attempt to load the dynamic library with the given `name`, find the + /// symbol with name `entrypoint` (defaults to `AdbcDriverInit` if `None`), + /// and then call create the driver using the resolved function. + /// + /// The `name` should not include any platform-specific prefixes or suffixes. + /// For example, use `adbc_driver_sqlite` rather than `libadbc_driver_sqlite.so`. + pub fn load_dynamic( + name: &str, + entrypoint: Option<&[u8]>, + version: AdbcVersion, + ) -> Result { + let entrypoint = entrypoint.unwrap_or(b"AdbcDriverInit"); + let library = unsafe { libloading::Library::new(libloading::library_filename(name))? }; + let init: libloading::Symbol = + unsafe { library.get(entrypoint)? }; + let driver = Self::load_impl(&init, version)?; + let inner = Arc::new(DriverManagerInner { + driver: Mutex::new(driver), + version, + _library: Some(library), + }); + Ok(ManagedDriver { inner }) + } + + fn load_impl( + init: &ffi::FFI_AdbcDriverInitFunc, + version: AdbcVersion, + ) -> Result { + let mut error = ffi::FFI_AdbcError::default(); + let mut driver = ffi::FFI_AdbcDriver::default(); + let status = unsafe { + init( + version.into(), + &mut driver as *mut ffi::FFI_AdbcDriver as *mut c_void, + &mut error, + ) + }; + check_status(status, error)?; + Ok(driver) + } +} + +impl Driver for ManagedDriver { + type DatabaseType = ManagedDatabase; + + fn new_database(&mut self) -> Result { + self.new_database_with_opts([]) + } + + fn new_database_with_opts( + &mut self, + opts: impl IntoIterator::Option, OptionValue)>, + ) -> Result { + let driver = &self.inner.driver.lock().unwrap(); + let mut database = ffi::FFI_AdbcDatabase::default(); + + // DatabaseNew + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, DatabaseNew); + let status = unsafe { method(&mut database, &mut error) }; + check_status(status, error)?; + + // DatabaseSetOption + for (key, value) in opts { + set_option_database(driver, &mut database, self.inner.version, key, value)?; + } + + // DatabaseInit + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, DatabaseInit); + let status = unsafe { method(&mut database, &mut error) }; + check_status(status, error)?; + + let inner = Arc::new(ManagedDatabaseInner { + database: Mutex::new(database), + driver: self.inner.clone(), + }); + Ok(Self::DatabaseType { inner }) + } +} + +fn set_option_database( + driver: &ffi::FFI_AdbcDriver, + database: &mut ffi::FFI_AdbcDatabase, + version: AdbcVersion, + key: impl AsRef, + value: OptionValue, +) -> Result<()> { + let key = CString::new(key.as_ref())?; + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let status = match (version, value) { + (_, OptionValue::String(value)) => { + let value = CString::new(value)?; + let method = driver_method!(driver, DatabaseSetOption); + unsafe { method(database, key.as_ptr(), value.as_ptr(), &mut error) } + } + (AdbcVersion::V110, OptionValue::Bytes(value)) => { + let method = driver_method!(driver, DatabaseSetOptionBytes); + unsafe { + method( + database, + key.as_ptr(), + value.as_ptr(), + value.len(), + &mut error, + ) + } + } + (AdbcVersion::V110, OptionValue::Int(value)) => { + let method = driver_method!(driver, DatabaseSetOptionInt); + unsafe { method(database, key.as_ptr(), value, &mut error) } + } + (AdbcVersion::V110, OptionValue::Double(value)) => { + let method = driver_method!(driver, DatabaseSetOptionDouble); + unsafe { method(database, key.as_ptr(), value, &mut error) } + } + (AdbcVersion::V100, _) => Err(Error::with_message_and_status( + ERR_ONLY_STRING_OPT, + Status::NotImplemented, + ))?, + }; + check_status(status, error) +} + +fn get_option_bytes( + key: impl AsRef, + mut populate: F, + driver: &ffi::FFI_AdbcDriver, +) -> Result> +where + F: FnMut( + *const c_char, + *mut u8, + *mut usize, + *mut ffi::FFI_AdbcError, + ) -> ffi::FFI_AdbcStatusCode, +{ + const DEFAULT_LENGTH: usize = 128; + let key = CString::new(key.as_ref())?; + let mut run = |length| { + let mut value = vec![0u8; length]; + let mut length: usize = value.len(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + ( + populate(key.as_ptr(), value.as_mut_ptr(), &mut length, &mut error), + length, + value, + error, + ) + }; + + let (status, length, value, error) = run(DEFAULT_LENGTH); + check_status(status, error)?; + + if length <= DEFAULT_LENGTH { + Ok(value[..length].to_vec()) + } else { + let (status, _, value, error) = run(length); + check_status(status, error)?; + Ok(value) + } +} + +fn get_option_string( + key: impl AsRef, + mut populate: F, + driver: &ffi::FFI_AdbcDriver, +) -> Result +where + F: FnMut( + *const c_char, + *mut c_char, + *mut usize, + *mut ffi::FFI_AdbcError, + ) -> ffi::FFI_AdbcStatusCode, +{ + const DEFAULT_LENGTH: usize = 128; + let key = CString::new(key.as_ref())?; + let mut run = |length| { + let mut value: Vec = vec![0; length]; + let mut length: usize = value.len(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + ( + populate(key.as_ptr(), value.as_mut_ptr(), &mut length, &mut error), + length, + value, + error, + ) + }; + + let (status, length, value, error) = run(DEFAULT_LENGTH); + check_status(status, error)?; + + let value = if length <= DEFAULT_LENGTH { + value[..length].to_vec() + } else { + let (status, _, value, error) = run(length); + check_status(status, error)?; + value + }; + + let value = unsafe { CStr::from_ptr(value.as_ptr()) }; + Ok(value.to_string_lossy().to_string()) +} + +struct ManagedDatabaseInner { + database: Mutex, + driver: Arc, +} + +impl Drop for ManagedDatabaseInner { + fn drop(&mut self) { + let driver = &self.driver.driver.lock().unwrap(); + let mut database = self.database.lock().unwrap(); + let method = driver_method!(driver, DatabaseRelease); + // TODO(alexandreyc): how should we handle `DatabaseRelease` failing? + // See: https://github.com/apache/arrow-adbc/pull/1742#discussion_r1574388409 + unsafe { method(database.deref_mut(), null_mut()) }; + } +} + +/// Implementation of [Database]. +#[derive(Clone)] +pub struct ManagedDatabase { + inner: Arc, +} + +impl ManagedDatabase { + fn driver_version(&self) -> AdbcVersion { + self.inner.driver.version + } +} + +impl Optionable for ManagedDatabase { + type Option = options::OptionDatabase; + + fn get_option_bytes(&self, key: Self::Option) -> Result> { + let driver = &self.inner.driver.driver.lock().unwrap(); + let database = &mut self.inner.database.lock().unwrap(); + let method = driver_method!(driver, DatabaseGetOptionBytes); + let populate = |key: *const c_char, + value: *mut u8, + length: *mut usize, + error: *mut ffi::FFI_AdbcError| unsafe { + method(database.deref_mut(), key, value, length, error) + }; + get_option_bytes(key, populate, driver) + } + + fn get_option_double(&self, key: Self::Option) -> Result { + let driver = &self.inner.driver.driver.lock().unwrap(); + let mut database = self.inner.database.lock().unwrap(); + let key = CString::new(key.as_ref())?; + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let mut value: f64 = 0.0; + let method = driver_method!(driver, DatabaseGetOptionDouble); + let status = unsafe { method(database.deref_mut(), key.as_ptr(), &mut value, &mut error) }; + check_status(status, error)?; + Ok(value) + } + + fn get_option_int(&self, key: Self::Option) -> Result { + let driver = &self.inner.driver.driver.lock().unwrap(); + let mut database = self.inner.database.lock().unwrap(); + let key = CString::new(key.as_ref())?; + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let mut value: i64 = 0; + let method = driver_method!(driver, DatabaseGetOptionInt); + let status = unsafe { method(database.deref_mut(), key.as_ptr(), &mut value, &mut error) }; + check_status(status, error)?; + Ok(value) + } + + fn get_option_string(&self, key: Self::Option) -> Result { + let driver = &self.inner.driver.driver.lock().unwrap(); + let mut database = self.inner.database.lock().unwrap(); + let method = driver_method!(driver, DatabaseGetOption); + let populate = |key: *const c_char, + value: *mut c_char, + length: *mut usize, + error: *mut ffi::FFI_AdbcError| unsafe { + method(database.deref_mut(), key, value, length, error) + }; + get_option_string(key, populate, driver) + } + + fn set_option(&mut self, key: Self::Option, value: OptionValue) -> Result<()> { + let driver = &self.inner.driver.driver.lock().unwrap(); + let mut database = self.inner.database.lock().unwrap(); + set_option_database( + driver, + database.deref_mut(), + self.driver_version(), + key, + value, + ) + } +} + +impl Database for ManagedDatabase { + type ConnectionType = ManagedConnection; + + fn new_connection(&mut self) -> Result { + self.new_connection_with_opts([]) + } + + fn new_connection_with_opts( + &mut self, + opts: impl IntoIterator::Option, OptionValue)>, + ) -> Result { + let driver = &self.inner.driver.driver.lock().unwrap(); + let mut database = self.inner.database.lock().unwrap(); + let mut connection = ffi::FFI_AdbcConnection::default(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, ConnectionNew); + let status = unsafe { method(&mut connection, &mut error) }; + check_status(status, error)?; + + for (key, value) in opts { + set_option_connection(driver, &mut connection, self.driver_version(), key, value)?; + } + + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, ConnectionInit); + let status = unsafe { method(&mut connection, database.deref_mut(), &mut error) }; + check_status(status, error)?; + + let inner = ManagedConnectionInner { + connection: Mutex::new(connection), + database: self.inner.clone(), + }; + + Ok(Self::ConnectionType { + inner: Arc::new(inner), + }) + } +} + +fn set_option_connection( + driver: &ffi::FFI_AdbcDriver, + connection: &mut ffi::FFI_AdbcConnection, + version: AdbcVersion, + key: impl AsRef, + value: OptionValue, +) -> Result<()> { + let key = CString::new(key.as_ref())?; + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let status = match (version, value) { + (_, OptionValue::String(value)) => { + let value = CString::new(value)?; + let method = driver_method!(driver, ConnectionSetOption); + unsafe { method(connection, key.as_ptr(), value.as_ptr(), &mut error) } + } + (AdbcVersion::V110, OptionValue::Bytes(value)) => { + let method = driver_method!(driver, ConnectionSetOptionBytes); + unsafe { + method( + connection, + key.as_ptr(), + value.as_ptr(), + value.len(), + &mut error, + ) + } + } + (AdbcVersion::V110, OptionValue::Int(value)) => { + let method = driver_method!(driver, ConnectionSetOptionInt); + unsafe { method(connection, key.as_ptr(), value, &mut error) } + } + (AdbcVersion::V110, OptionValue::Double(value)) => { + let method = driver_method!(driver, ConnectionSetOptionDouble); + unsafe { method(connection, key.as_ptr(), value, &mut error) } + } + (AdbcVersion::V100, _) => Err(Error::with_message_and_status( + ERR_ONLY_STRING_OPT, + Status::NotImplemented, + ))?, + }; + check_status(status, error) +} + +struct ManagedConnectionInner { + connection: Mutex, + database: Arc, +} + +impl Drop for ManagedConnectionInner { + fn drop(&mut self) { + let driver = &self.database.driver.driver.lock().unwrap(); + let mut connection = self.connection.lock().unwrap(); + let method = driver_method!(driver, ConnectionRelease); + // TODO(alexandreyc): how should we handle `ConnectionRelease` failing? + // See: https://github.com/apache/arrow-adbc/pull/1742#discussion_r1574388409 + unsafe { method(connection.deref_mut(), null_mut()) }; + } +} + +/// Implementation of [Connection]. +#[derive(Clone)] +pub struct ManagedConnection { + inner: Arc, +} + +impl ManagedConnection { + fn driver_version(&self) -> AdbcVersion { + self.inner.database.driver.version + } +} + +impl Optionable for ManagedConnection { + type Option = options::OptionConnection; + + fn get_option_bytes(&self, key: Self::Option) -> Result> { + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let method = driver_method!(driver, ConnectionGetOptionBytes); + let populate = |key: *const c_char, + value: *mut u8, + length: *mut usize, + error: *mut ffi::FFI_AdbcError| unsafe { + method(connection.deref_mut(), key, value, length, error) + }; + get_option_bytes(key, populate, driver) + } + + fn get_option_double(&self, key: Self::Option) -> Result { + let key = CString::new(key.as_ref())?; + let mut value: f64 = 0.0; + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, ConnectionGetOptionDouble); + let status = + unsafe { method(connection.deref_mut(), key.as_ptr(), &mut value, &mut error) }; + check_status(status, error)?; + Ok(value) + } + + fn get_option_int(&self, key: Self::Option) -> Result { + let key = CString::new(key.as_ref())?; + let mut value: i64 = 0; + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, ConnectionGetOptionInt); + let status = + unsafe { method(connection.deref_mut(), key.as_ptr(), &mut value, &mut error) }; + check_status(status, error)?; + Ok(value) + } + + fn get_option_string(&self, key: Self::Option) -> Result { + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let method = driver_method!(driver, ConnectionGetOption); + let populate = |key: *const c_char, + value: *mut c_char, + length: *mut usize, + error: *mut ffi::FFI_AdbcError| unsafe { + method(connection.deref_mut(), key, value, length, error) + }; + get_option_string(key, populate, driver) + } + + fn set_option(&mut self, key: Self::Option, value: OptionValue) -> Result<()> { + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + set_option_connection( + driver, + connection.deref_mut(), + self.driver_version(), + key, + value, + ) + } +} + +impl Connection for ManagedConnection { + type StatementType = ManagedStatement; + + fn new_statement(&mut self) -> Result { + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let mut statement = ffi::FFI_AdbcStatement::default(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, StatementNew); + let status = unsafe { method(connection.deref_mut(), &mut statement, &mut error) }; + check_status(status, error)?; + + let inner = Arc::new(ManagedStatementInner { + statement: Mutex::new(statement), + connection: self.inner.clone(), + }); + + Ok(Self::StatementType { inner }) + } + + fn cancel(&mut self) -> Result<()> { + if let AdbcVersion::V100 = self.driver_version() { + return Err(Error::with_message_and_status( + ERR_CANCEL_UNSUPPORTED, + Status::NotImplemented, + )); + } + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, ConnectionCancel); + let status = unsafe { method(connection.deref_mut(), &mut error) }; + check_status(status, error) + } + + fn commit(&mut self) -> Result<()> { + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, ConnectionCommit); + let status = unsafe { method(connection.deref_mut(), &mut error) }; + check_status(status, error) + } + + fn rollback(&mut self) -> Result<()> { + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, ConnectionRollback); + let status = unsafe { method(connection.deref_mut(), &mut error) }; + check_status(status, error) + } + + fn get_info(&self, codes: Option>) -> Result { + let mut stream = FFI_ArrowArrayStream::empty(); + let codes: Option> = + codes.map(|codes| codes.iter().map(|code| code.into()).collect()); + let (codes_ptr, codes_len) = codes + .as_ref() + .map(|c| (c.as_ptr(), c.len())) + .unwrap_or((null(), 0)); + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, ConnectionGetInfo); + let status = unsafe { + method( + connection.deref_mut(), + codes_ptr, + codes_len, + &mut stream, + &mut error, + ) + }; + check_status(status, error)?; + let reader = ArrowArrayStreamReader::try_new(stream)?; + Ok(reader) + } + + fn get_objects( + &self, + depth: crate::options::ObjectDepth, + catalog: Option<&str>, + db_schema: Option<&str>, + table_name: Option<&str>, + table_type: Option>, + column_name: Option<&str>, + ) -> Result { + let catalog = catalog.map(CString::new).transpose()?; + let db_schema = db_schema.map(CString::new).transpose()?; + let table_name = table_name.map(CString::new).transpose()?; + let column_name = column_name.map(CString::new).transpose()?; + let table_type = table_type + .map(|t| { + t.iter() + .map(|x| CString::new(*x)) + .collect::, _>>() + }) + .transpose()?; + + let catalog_ptr = catalog.as_ref().map(|c| c.as_ptr()).unwrap_or(null()); + let db_schema_ptr = db_schema.as_ref().map(|c| c.as_ptr()).unwrap_or(null()); + let table_name_ptr = table_name.as_ref().map(|c| c.as_ptr()).unwrap_or(null()); + let column_name_ptr = column_name.as_ref().map(|c| c.as_ptr()).unwrap_or(null()); + + let mut table_type_ptrs = table_type + .as_ref() + .map(|v| v.iter().map(|c| c.as_ptr())) + .map(|c| c.collect::>()); + let table_type_ptr = match table_type_ptrs.as_mut() { + None => null(), + Some(t) => { + t.push(null()); + t.as_ptr() + } + }; + + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, ConnectionGetObjects); + let mut stream = FFI_ArrowArrayStream::empty(); + + let status = unsafe { + method( + connection.deref_mut(), + depth.into(), + catalog_ptr, + db_schema_ptr, + table_name_ptr, + table_type_ptr, + column_name_ptr, + &mut stream, + &mut error, + ) + }; + check_status(status, error)?; + + let reader = ArrowArrayStreamReader::try_new(stream)?; + Ok(reader) + } + + fn get_statistics( + &self, + catalog: Option<&str>, + db_schema: Option<&str>, + table_name: Option<&str>, + approximate: bool, + ) -> Result { + if let AdbcVersion::V100 = self.driver_version() { + return Err(Error::with_message_and_status( + ERR_STATISTICS_UNSUPPORTED, + Status::NotImplemented, + )); + } + + let catalog = catalog.map(CString::new).transpose()?; + let db_schema = db_schema.map(CString::new).transpose()?; + let table_name = table_name.map(CString::new).transpose()?; + + let catalog_ptr = catalog.as_ref().map(|c| c.as_ptr()).unwrap_or(null()); + let db_schema_ptr = db_schema.as_ref().map(|c| c.as_ptr()).unwrap_or(null()); + let table_name_ptr = table_name.as_ref().map(|c| c.as_ptr()).unwrap_or(null()); + + let mut stream = FFI_ArrowArrayStream::empty(); + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, ConnectionGetStatistics); + let status = unsafe { + method( + connection.deref_mut(), + catalog_ptr, + db_schema_ptr, + table_name_ptr, + approximate as std::os::raw::c_char, + &mut stream, + &mut error, + ) + }; + check_status(status, error)?; + let reader = ArrowArrayStreamReader::try_new(stream)?; + Ok(reader) + } + + fn get_statistic_names(&self) -> Result { + if let AdbcVersion::V100 = self.driver_version() { + return Err(Error::with_message_and_status( + ERR_STATISTICS_UNSUPPORTED, + Status::NotImplemented, + )); + } + let mut stream = FFI_ArrowArrayStream::empty(); + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, ConnectionGetStatisticNames); + let status = unsafe { method(connection.deref_mut(), &mut stream, &mut error) }; + check_status(status, error)?; + let reader = ArrowArrayStreamReader::try_new(stream)?; + Ok(reader) + } + + fn get_table_schema( + &self, + catalog: Option<&str>, + db_schema: Option<&str>, + table_name: &str, + ) -> Result { + let catalog = catalog.map(CString::new).transpose()?; + let db_schema = db_schema.map(CString::new).transpose()?; + let table_name = CString::new(table_name)?; + + let catalog_ptr = catalog.as_ref().map(|c| c.as_ptr()).unwrap_or(null()); + let db_schema_ptr = db_schema.as_ref().map(|c| c.as_ptr()).unwrap_or(null()); + let table_name_ptr = table_name.as_ptr(); + + let mut schema = FFI_ArrowSchema::empty(); + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, ConnectionGetTableSchema); + let status = unsafe { + method( + connection.deref_mut(), + catalog_ptr, + db_schema_ptr, + table_name_ptr, + &mut schema, + &mut error, + ) + }; + check_status(status, error)?; + Ok((&schema).try_into()?) + } + + fn get_table_types(&self) -> Result { + let mut stream = FFI_ArrowArrayStream::empty(); + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, ConnectionGetTableTypes); + let status = unsafe { method(connection.deref_mut(), &mut stream, &mut error) }; + check_status(status, error)?; + let reader = ArrowArrayStreamReader::try_new(stream)?; + Ok(reader) + } + + fn read_partition(&self, partition: impl AsRef<[u8]>) -> Result { + let mut stream = FFI_ArrowArrayStream::empty(); + let driver = &self.inner.database.driver.driver.lock().unwrap(); + let mut connection = self.inner.connection.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, ConnectionReadPartition); + let partition = partition.as_ref(); + let status = unsafe { + method( + connection.deref_mut(), + partition.as_ptr(), + partition.len(), + &mut stream, + &mut error, + ) + }; + check_status(status, error)?; + let reader = ArrowArrayStreamReader::try_new(stream)?; + Ok(reader) + } +} + +fn set_option_statement( + driver: &ffi::FFI_AdbcDriver, + statement: &mut ffi::FFI_AdbcStatement, + version: AdbcVersion, + key: impl AsRef, + value: OptionValue, +) -> Result<()> { + let key = CString::new(key.as_ref())?; + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let status = match (version, value) { + (_, OptionValue::String(value)) => { + let value = CString::new(value)?; + let method = driver_method!(driver, StatementSetOption); + unsafe { method(statement, key.as_ptr(), value.as_ptr(), &mut error) } + } + (AdbcVersion::V110, OptionValue::Bytes(value)) => { + let method = driver_method!(driver, StatementSetOptionBytes); + unsafe { + method( + statement, + key.as_ptr(), + value.as_ptr(), + value.len(), + &mut error, + ) + } + } + (AdbcVersion::V110, OptionValue::Int(value)) => { + let method = driver_method!(driver, StatementSetOptionInt); + unsafe { method(statement, key.as_ptr(), value, &mut error) } + } + (AdbcVersion::V110, OptionValue::Double(value)) => { + let method = driver_method!(driver, StatementSetOptionDouble); + unsafe { method(statement, key.as_ptr(), value, &mut error) } + } + (AdbcVersion::V100, _) => Err(Error::with_message_and_status( + ERR_ONLY_STRING_OPT, + Status::NotImplemented, + ))?, + }; + check_status(status, error) +} + +struct ManagedStatementInner { + statement: Mutex, + connection: Arc, +} +/// Implementation of [Statement]. +#[derive(Clone)] +pub struct ManagedStatement { + inner: Arc, +} + +impl ManagedStatement { + fn driver_version(&self) -> AdbcVersion { + self.inner.connection.database.driver.version + } +} + +impl Statement for ManagedStatement { + fn bind(&mut self, batch: RecordBatch) -> Result<()> { + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, StatementBind); + let batch: StructArray = batch.into(); + let (mut array, mut schema) = to_ffi(&batch.to_data())?; + let status = unsafe { method(statement.deref_mut(), &mut array, &mut schema, &mut error) }; + check_status(status, error)?; + Ok(()) + } + + fn bind_stream(&mut self, reader: Box) -> Result<()> { + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, StatementBindStream); + let mut stream = FFI_ArrowArrayStream::new(reader); + let status = unsafe { method(statement.deref_mut(), &mut stream, &mut error) }; + check_status(status, error)?; + Ok(()) + } + + fn cancel(&mut self) -> Result<()> { + if let AdbcVersion::V100 = self.driver_version() { + return Err(Error::with_message_and_status( + ERR_CANCEL_UNSUPPORTED, + Status::NotImplemented, + )); + } + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, StatementCancel); + let status = unsafe { method(statement.deref_mut(), &mut error) }; + check_status(status, error) + } + + fn execute(&mut self) -> Result { + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, StatementExecuteQuery); + let mut stream = FFI_ArrowArrayStream::empty(); + let status = unsafe { method(statement.deref_mut(), &mut stream, null_mut(), &mut error) }; + check_status(status, error)?; + let reader = ArrowArrayStreamReader::try_new(stream)?; + Ok(reader) + } + + fn execute_schema(&mut self) -> Result { + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, StatementExecuteSchema); + let mut schema = FFI_ArrowSchema::empty(); + let status = unsafe { method(statement.deref_mut(), &mut schema, &mut error) }; + check_status(status, error)?; + Ok((&schema).try_into()?) + } + + fn execute_update(&mut self) -> Result> { + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, StatementExecuteQuery); + let mut rows_affected: i64 = -1; + let status = unsafe { + method( + statement.deref_mut(), + null_mut(), + &mut rows_affected, + &mut error, + ) + }; + check_status(status, error)?; + let rows_affected = if rows_affected == -1 { + None + } else { + Some(rows_affected) + }; + Ok(rows_affected) + } + + fn execute_partitions(&mut self) -> Result { + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, StatementExecutePartitions); + let mut schema = FFI_ArrowSchema::empty(); + let mut partitions = ffi::FFI_AdbcPartitions::default(); + let mut rows_affected: i64 = -1; + let status = unsafe { + method( + statement.deref_mut(), + &mut schema, + &mut partitions, + &mut rows_affected, + &mut error, + ) + }; + check_status(status, error)?; + + let result = PartitionedResult { + partitions: partitions.into(), + schema: (&schema).try_into()?, + rows_affected, + }; + + Ok(result) + } + + fn get_parameters_schema(&self) -> Result { + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, StatementGetParameterSchema); + let mut schema = FFI_ArrowSchema::empty(); + let status = unsafe { method(statement.deref_mut(), &mut schema, &mut error) }; + check_status(status, error)?; + Ok((&schema).try_into()?) + } + + fn prepare(&mut self) -> Result<()> { + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, StatementPrepare); + let status = unsafe { method(statement.deref_mut(), &mut error) }; + check_status(status, error)?; + Ok(()) + } + + fn set_sql_query(&mut self, query: &str) -> Result<()> { + let query = CString::new(query)?; + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, StatementSetSqlQuery); + let status = unsafe { method(statement.deref_mut(), query.as_ptr(), &mut error) }; + check_status(status, error)?; + Ok(()) + } + + fn set_substrait_plan(&mut self, plan: &[u8]) -> Result<()> { + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, StatementSetSubstraitPlan); + let status = + unsafe { method(statement.deref_mut(), plan.as_ptr(), plan.len(), &mut error) }; + check_status(status, error)?; + Ok(()) + } +} + +impl Optionable for ManagedStatement { + type Option = options::OptionStatement; + + fn get_option_bytes(&self, key: Self::Option) -> Result> { + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let method = driver_method!(driver, StatementGetOptionBytes); + let populate = |key: *const c_char, + value: *mut u8, + length: *mut usize, + error: *mut ffi::FFI_AdbcError| unsafe { + method(statement.deref_mut(), key, value, length, error) + }; + get_option_bytes(key, populate, driver) + } + + fn get_option_double(&self, key: Self::Option) -> Result { + let key = CString::new(key.as_ref())?; + let mut value: f64 = 0.0; + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, StatementGetOptionDouble); + let status = unsafe { method(statement.deref_mut(), key.as_ptr(), &mut value, &mut error) }; + check_status(status, error)?; + Ok(value) + } + + fn get_option_int(&self, key: Self::Option) -> Result { + let key = CString::new(key.as_ref())?; + let mut value: i64 = 0; + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let mut error = ffi::FFI_AdbcError::with_driver(driver); + let method = driver_method!(driver, StatementGetOptionInt); + let status = unsafe { method(statement.deref_mut(), key.as_ptr(), &mut value, &mut error) }; + check_status(status, error)?; + Ok(value) + } + + fn get_option_string(&self, key: Self::Option) -> Result { + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let method = driver_method!(driver, StatementGetOption); + let populate = |key: *const c_char, + value: *mut c_char, + length: *mut usize, + error: *mut ffi::FFI_AdbcError| unsafe { + method(statement.deref_mut(), key, value, length, error) + }; + get_option_string(key, populate, driver) + } + + fn set_option(&mut self, key: Self::Option, value: OptionValue) -> Result<()> { + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + set_option_statement( + driver, + statement.deref_mut(), + self.driver_version(), + key, + value, + ) + } +} + +impl Drop for ManagedStatement { + fn drop(&mut self) { + let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); + let mut statement = self.inner.statement.lock().unwrap(); + let method = driver_method!(driver, StatementRelease); + // TODO(alexandreyc): how should we handle `StatementRelease` failing? + // See: https://github.com/apache/arrow-adbc/pull/1742#discussion_r1574388409 + unsafe { method(statement.deref_mut(), null_mut()) }; + } +} diff --git a/rust/core/src/error.rs b/rust/core/src/error.rs index 75fe359e1e..2ee19452db 100644 --- a/rust/core/src/error.rs +++ b/rust/core/src/error.rs @@ -136,18 +136,6 @@ impl From for Error { } } -impl From for Error { - fn from(value: libloading::Error) -> Self { - Self { - message: format!("Error with dynamic library: {value}"), - status: Status::Internal, - vendor_code: 0, - sqlstate: [0; 5], - details: None, - } - } -} - impl From for Error { fn from(value: std::str::Utf8Error) -> Self { Self { diff --git a/rust/core/src/ffi/types.rs b/rust/core/src/ffi/types.rs index 921064c383..2aaafb908f 100644 --- a/rust/core/src/ffi/types.rs +++ b/rust/core/src/ffi/types.rs @@ -80,6 +80,8 @@ pub struct FFI_AdbcConnection { pub(crate) private_driver: *const FFI_AdbcDriver, } +unsafe impl Send for FFI_AdbcConnection {} + #[repr(C)] #[derive(Debug)] pub struct FFI_AdbcStatement { @@ -90,6 +92,8 @@ pub struct FFI_AdbcStatement { pub(crate) private_driver: *const FFI_AdbcDriver, } +unsafe impl Send for FFI_AdbcStatement {} + #[repr(C)] #[derive(Debug)] pub struct FFI_AdbcPartitions { @@ -190,7 +194,6 @@ pub struct FFI_AdbcDriver { } unsafe impl Send for FFI_AdbcDriver {} -unsafe impl Sync for FFI_AdbcDriver {} macro_rules! driver_method { ($driver:expr, $method:ident) => { diff --git a/rust/core/src/lib.rs b/rust/core/src/lib.rs index de099ca61b..242934654f 100644 --- a/rust/core/src/lib.rs +++ b/rust/core/src/lib.rs @@ -56,11 +56,11 @@ //! can build it as an object file implementing the C API with the //! [export_driver] macro. -// TODO(alexandreyc): uncomment these lines during follow-up PRs -// pub mod driver_manager; mod driver_exporter; #[doc(hidden)] pub use driver_exporter::FFIDriver; +#[cfg(feature = "driver_manager")] +pub mod driver_manager; pub mod error; pub mod ffi; pub mod options; From 4352ed5984a94cef31adc266990a77bcf3d0a9cc Mon Sep 17 00:00:00 2001 From: alexandreyc Date: Wed, 1 May 2024 17:07:21 +0200 Subject: [PATCH 02/31] Dummy commit to trigger CI --- rust/core/src/driver_manager.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/rust/core/src/driver_manager.rs b/rust/core/src/driver_manager.rs index 87e8670b2c..e6ea575739 100644 --- a/rust/core/src/driver_manager.rs +++ b/rust/core/src/driver_manager.rs @@ -1205,3 +1205,4 @@ impl Drop for ManagedStatement { unsafe { method(statement.deref_mut(), null_mut()) }; } } + From 4488d5d58be0e1c5fb70bfc0d73ab9ed68accca3 Mon Sep 17 00:00:00 2001 From: alexandreyc Date: Thu, 2 May 2024 10:17:44 +0200 Subject: [PATCH 03/31] Add comment about lock inversion --- rust/core/src/driver_manager.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/rust/core/src/driver_manager.rs b/rust/core/src/driver_manager.rs index e6ea575739..3879f72aa0 100644 --- a/rust/core/src/driver_manager.rs +++ b/rust/core/src/driver_manager.rs @@ -93,6 +93,17 @@ //! # } //! ``` +// According to the ADBC specification, objects allow serialized access from +// multiple threads: one thread may make a call, and once finished, another +// thread may make a call. They do not allow concurrent access from multiple +// threads. +// +// In order to implement this semantics, all FFI objects are wrapped into +// `Mutex`. Hence, we need to deal with multiple locks at once, so care must +// be taken to avoid deadlock and in particular we must avoid "lock inversion". +// The general convention chosen here is to first acquire lock to the driver +// and then acquire lock to the specific object under implementation. + use std::collections::HashSet; use std::ffi::{CStr, CString}; use std::ops::DerefMut; @@ -1205,4 +1216,3 @@ impl Drop for ManagedStatement { unsafe { method(statement.deref_mut(), null_mut()) }; } } - From 98f880cec95acd23a7cd00b937e16e2a1a367bb3 Mon Sep 17 00:00:00 2001 From: alexandreyc Date: Fri, 3 May 2024 10:25:25 +0200 Subject: [PATCH 04/31] CI: Add reusable workflows to build drivers --- .github/workflows/native-unix.yml | 1 + .github/workflows/native-windows.yml | 1 + .github/workflows/rust.yml | 6 ++++++ 3 files changed, 8 insertions(+) diff --git a/.github/workflows/native-unix.yml b/.github/workflows/native-unix.yml index c3f54cf549..486111ae24 100644 --- a/.github/workflows/native-unix.yml +++ b/.github/workflows/native-unix.yml @@ -42,6 +42,7 @@ on: - "python/**" - "ruby/**" - ".github/workflows/native-unix.yml" + workflow_call: concurrency: group: ${{ github.repository }}-${{ github.ref }}-${{ github.workflow }} diff --git a/.github/workflows/native-windows.yml b/.github/workflows/native-windows.yml index 5f0964d029..6ceb07538f 100644 --- a/.github/workflows/native-windows.yml +++ b/.github/workflows/native-windows.yml @@ -40,6 +40,7 @@ on: - "python/**" - "ruby/**" - ".github/workflows/native-windows.yml" + workflow_call: concurrency: group: ${{ github.repository }}-${{ github.ref }}-${{ github.workflow }} diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 793db58a80..6dd317b78f 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -42,6 +42,12 @@ defaults: working-directory: rust jobs: + native-unix: + uses: ./.github/workflows/native-unix.yml + + native-windows: + uses: ./.github/workflows/native-windows.yml + rust: strategy: matrix: From 7c619e196a6c63aa6798ada4878d9f93561620bb Mon Sep 17 00:00:00 2001 From: alexandreyc Date: Mon, 6 May 2024 17:50:09 +0200 Subject: [PATCH 05/31] Improve documentation --- rust/core/src/driver_manager.rs | 34 ++++++++++++++++----------------- rust/core/src/lib.rs | 2 ++ 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/rust/core/src/driver_manager.rs b/rust/core/src/driver_manager.rs index 3879f72aa0..ab6654d218 100644 --- a/rust/core/src/driver_manager.rs +++ b/rust/core/src/driver_manager.rs @@ -21,13 +21,11 @@ //! uses FFI to wrap an object file implementation of //! [`adbc.h`](https://github.com/apache/arrow-adbc/blob/main/adbc.h). //! -//! There are three ways that drivers can be loaded: -//! 1. By statically linking the driver implementation using -//! [ManagedDriver::load_static]. -//! 2. By dynamically linking the driver implementation using -//! [ManagedDriver::load_static]. -//! 3. By loading the driver implementation at runtime (with -//! `dlopen/LoadLibrary`) using [ManagedDriver::load_dynamic]. +//! There are two ways that drivers can be used: +//! 2. By linking (either statically or dynamically) the driver implementation +//! at link-time and then using [ManagedDriver::load_static]. +//! 3. By loading the driver implementation at run-time (with `dlopen/LoadLibrary`) +//! using [ManagedDriver::load_dynamic]. //! //! Drivers are initialized using a function provided by the driver as a main //! entrypoint, canonically called `AdbcDriverInit`. Although many will use a @@ -64,16 +62,16 @@ //! let mut statement = connection.new_statement()?; //! //! // Define some data. -//! # let columns: Vec> = vec![ -//! # Arc::new(Int64Array::from(vec![1, 2, 3, 4])), -//! # Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])), -//! # Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), -//! # ]; -//! # let schema = Schema::new(vec![ -//! # Field::new("a", DataType::Int64, true), -//! # Field::new("b", DataType::Float64, true), -//! # Field::new("c", DataType::Utf8, true), -//! # ]); +//! let columns: Vec> = vec![ +//! Arc::new(Int64Array::from(vec![1, 2, 3, 4])), +//! Arc::new(Float64Array::from(vec![1.0, 2.0, 3.0, 4.0])), +//! Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), +//! ]; +//! let schema = Schema::new(vec![ +//! Field::new("a", DataType::Int64, true), +//! Field::new("b", DataType::Float64, true), +//! Field::new("c", DataType::Utf8, true), +//! ]); //! let input: RecordBatch = RecordBatch::try_new(Arc::new(schema), columns)?; //! //! // Ingest data. @@ -179,7 +177,7 @@ impl ManagedDriver { /// /// Will attempt to load the dynamic library with the given `name`, find the /// symbol with name `entrypoint` (defaults to `AdbcDriverInit` if `None`), - /// and then call create the driver using the resolved function. + /// and then create the driver using the resolved function. /// /// The `name` should not include any platform-specific prefixes or suffixes. /// For example, use `adbc_driver_sqlite` rather than `libadbc_driver_sqlite.so`. diff --git a/rust/core/src/lib.rs b/rust/core/src/lib.rs index 242934654f..8446a25d86 100644 --- a/rust/core/src/lib.rs +++ b/rust/core/src/lib.rs @@ -48,6 +48,8 @@ //! The [driver_manager] module allows loading drivers exposing the C API, //! either from an initialization function (link-time, either static or dynamic) //! or by dynamically finding such a function in a dynamic library (run-time). +//! The driver manager is gated behind the `driver_manager` feature flag. +//! //! # Driver Exporter //! //! The driver exporter allows exposing native Rust drivers as C drivers to be From 751af51e022205eb534e284b033913112511d259 Mon Sep 17 00:00:00 2001 From: alexandreyc Date: Mon, 6 May 2024 17:53:59 +0200 Subject: [PATCH 06/31] Update to use `None` instead of empty array --- rust/core/src/driver_manager.rs | 4 ++-- rust/drivers/dummy/src/lib.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/core/src/driver_manager.rs b/rust/core/src/driver_manager.rs index ab6654d218..0ffb9b82f5 100644 --- a/rust/core/src/driver_manager.rs +++ b/rust/core/src/driver_manager.rs @@ -221,7 +221,7 @@ impl Driver for ManagedDriver { type DatabaseType = ManagedDatabase; fn new_database(&mut self) -> Result { - self.new_database_with_opts([]) + self.new_database_with_opts(None) } fn new_database_with_opts( @@ -478,7 +478,7 @@ impl Database for ManagedDatabase { type ConnectionType = ManagedConnection; fn new_connection(&mut self) -> Result { - self.new_connection_with_opts([]) + self.new_connection_with_opts(None) } fn new_connection_with_opts( diff --git a/rust/drivers/dummy/src/lib.rs b/rust/drivers/dummy/src/lib.rs index f5a70b98a1..8a5f8d1c44 100644 --- a/rust/drivers/dummy/src/lib.rs +++ b/rust/drivers/dummy/src/lib.rs @@ -183,7 +183,7 @@ impl Driver for DummyDriver { type DatabaseType = DummyDatabase; fn new_database(&mut self) -> Result { - self.new_database_with_opts([]) + self.new_database_with_opts(None) } fn new_database_with_opts( @@ -232,7 +232,7 @@ impl Database for DummyDatabase { type ConnectionType = DummyConnection; fn new_connection(&mut self) -> Result { - self.new_connection_with_opts([]) + self.new_connection_with_opts(None) } fn new_connection_with_opts( From 38bc72a05c27183a00501a6f5dc8b14a59784d13 Mon Sep 17 00:00:00 2001 From: alexandreyc Date: Mon, 6 May 2024 18:01:53 +0200 Subject: [PATCH 07/31] Use `f64::default` instead of `0.0` --- rust/core/src/driver_manager.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/core/src/driver_manager.rs b/rust/core/src/driver_manager.rs index 0ffb9b82f5..cc1fde9957 100644 --- a/rust/core/src/driver_manager.rs +++ b/rust/core/src/driver_manager.rs @@ -429,7 +429,7 @@ impl Optionable for ManagedDatabase { let mut database = self.inner.database.lock().unwrap(); let key = CString::new(key.as_ref())?; let mut error = ffi::FFI_AdbcError::with_driver(driver); - let mut value: f64 = 0.0; + let mut value: f64 = f64::default(); let method = driver_method!(driver, DatabaseGetOptionDouble); let status = unsafe { method(database.deref_mut(), key.as_ptr(), &mut value, &mut error) }; check_status(status, error)?; @@ -602,7 +602,7 @@ impl Optionable for ManagedConnection { fn get_option_double(&self, key: Self::Option) -> Result { let key = CString::new(key.as_ref())?; - let mut value: f64 = 0.0; + let mut value: f64 = f64::default(); let driver = &self.inner.database.driver.driver.lock().unwrap(); let mut connection = self.inner.connection.lock().unwrap(); let mut error = ffi::FFI_AdbcError::with_driver(driver); @@ -1156,7 +1156,7 @@ impl Optionable for ManagedStatement { fn get_option_double(&self, key: Self::Option) -> Result { let key = CString::new(key.as_ref())?; - let mut value: f64 = 0.0; + let mut value: f64 = f64::default(); let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); let mut statement = self.inner.statement.lock().unwrap(); let mut error = ffi::FFI_AdbcError::with_driver(driver); From 5dd08e69db20523dc0fc017a97084ba2e3f0651a Mon Sep 17 00:00:00 2001 From: alexandreyc Date: Mon, 6 May 2024 19:34:25 +0200 Subject: [PATCH 08/31] Rename `get_parameters_schema` to `get_parameter_schema` --- rust/core/src/driver_exporter.rs | 2 +- rust/core/src/driver_manager.rs | 2 +- rust/core/src/lib.rs | 2 +- rust/drivers/dummy/src/lib.rs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/rust/core/src/driver_exporter.rs b/rust/core/src/driver_exporter.rs index cc3ba8f0f5..bd09517595 100644 --- a/rust/core/src/driver_exporter.rs +++ b/rust/core/src/driver_exporter.rs @@ -1581,7 +1581,7 @@ unsafe extern "C" fn statement_get_parameter_schema( let exported = check_err!(statement_private_data::(statement), error); let statement = &exported.0; - let schema_value = check_err!(statement.get_parameters_schema(), error); + let schema_value = check_err!(statement.get_parameter_schema(), error); let schema_value: FFI_ArrowSchema = check_err!(schema_value.try_into(), error); std::ptr::write_unaligned(schema, schema_value); diff --git a/rust/core/src/driver_manager.rs b/rust/core/src/driver_manager.rs index cc1fde9957..e344cc8b32 100644 --- a/rust/core/src/driver_manager.rs +++ b/rust/core/src/driver_manager.rs @@ -1094,7 +1094,7 @@ impl Statement for ManagedStatement { Ok(result) } - fn get_parameters_schema(&self) -> Result { + fn get_parameter_schema(&self) -> Result { let driver = &self.inner.connection.database.driver.driver.lock().unwrap(); let mut statement = self.inner.statement.lock().unwrap(); let mut error = ffi::FFI_AdbcError::with_driver(driver); diff --git a/rust/core/src/lib.rs b/rust/core/src/lib.rs index 8446a25d86..b46c5d64b2 100644 --- a/rust/core/src/lib.rs +++ b/rust/core/src/lib.rs @@ -488,7 +488,7 @@ pub trait Statement: Optionable