From 41c6d37370680a8eb769eabea3f5a27290ec6fb0 Mon Sep 17 00:00:00 2001 From: alexandreyc Date: Wed, 24 Apr 2024 16:11:35 +0200 Subject: [PATCH 01/33] Improve documentation --- rust/core/src/ffi/mod.rs | 2 ++ rust/core/src/lib.rs | 17 +++++++++++------ 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/rust/core/src/ffi/mod.rs b/rust/core/src/ffi/mod.rs index 69e866c0cb..4a633cab14 100644 --- a/rust/core/src/ffi/mod.rs +++ b/rust/core/src/ffi/mod.rs @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +//! C-compatible items as defined in [`adbc.h`](https://github.com/apache/arrow-adbc/blob/main/adbc.h) + pub mod constants; pub(crate) mod methods; pub(crate) mod types; diff --git a/rust/core/src/lib.rs b/rust/core/src/lib.rs index 55d9f8e5b3..acce627524 100644 --- a/rust/core/src/lib.rs +++ b/rust/core/src/lib.rs @@ -29,9 +29,8 @@ //! - An abstract Rust API to be implemented by vendor-specific drivers. //! - A driver manager which implements this same API, but dynamically loads //! drivers internally and forwards calls appropriately using the [C API](https://github.com/apache/arrow-adbc/blob/main/adbc.h). -//! -//! We hope to provide the ability to expose Rust drivers as C drivers in the -//! near future permitting other languages to use Rust drivers. +//! - A driver exporter that takes an implementation of the abstract API and +//! turns it into an object file that implements the C API. //! //! # Native Rust drivers //! @@ -49,12 +48,18 @@ //! The [driver_manager] module allows loading drivers exposing the C API, //! either from an initialization function (link-time, either static or dynamic) //! or by dynamically finding such a function in a dynamic library (run-time). +//! # Driver Exporter +//! +//! The driver exporter allows exposing native Rust drivers as C drivers to be +//! used by other langages via their own driver manager. Once you have an +//! implementation of [Driver], provided that it also implements [Default], you +//! can build it as an object file implementing the C API with the +//! [export_driver] macro. // TODO(alexandreyc): uncomment these lines during follow-up PRs -// pub mod driver_exporter; // pub mod driver_manager; -// pub use ffi::FFI_AdbcDriverInitFunc as AdbcDriverInitFunc; - +#[doc(hidden)] +pub mod driver_exporter; pub mod error; pub mod ffi; pub mod options; From 13bfd29f85db8de9ef64bc2f76c035885aa72439 Mon Sep 17 00:00:00 2001 From: alexandreyc Date: Wed, 24 Apr 2024 16:11:55 +0200 Subject: [PATCH 02/33] Add driver exporter --- rust/core/src/driver_exporter.rs | 1581 ++++++++++++++++++++++++++++++ 1 file changed, 1581 insertions(+) create mode 100644 rust/core/src/driver_exporter.rs diff --git a/rust/core/src/driver_exporter.rs b/rust/core/src/driver_exporter.rs new file mode 100644 index 0000000000..25bf7655c8 --- /dev/null +++ b/rust/core/src/driver_exporter.rs @@ -0,0 +1,1581 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::{HashMap, HashSet}; +use std::ffi::{CStr, CString}; +use std::hash::Hash; +use std::os::raw::{c_char, c_int, c_void}; + +use arrow::array::StructArray; +use arrow::datatypes::DataType; +use arrow::ffi::{from_ffi, FFI_ArrowArray, FFI_ArrowSchema}; +use arrow::ffi_stream::{ArrowArrayStreamReader, FFI_ArrowArrayStream}; + +use crate::error::{Error, Result, Status}; +use crate::ffi::constants::ADBC_STATUS_OK; +use crate::ffi::{ + types::ErrorPrivateData, FFI_AdbcConnection, FFI_AdbcDatabase, FFI_AdbcDriver, FFI_AdbcError, + FFI_AdbcErrorDetail, FFI_AdbcPartitions, FFI_AdbcStatement, FFI_AdbcStatusCode, +}; +use crate::options::{InfoCode, ObjectDepth, OptionConnection, OptionDatabase, OptionValue}; +use crate::{Connection, Database, Driver, Optionable, Statement}; + +// Invariant: options.is_none() XOR database.is_none() +struct ExportedDatabase { + options: Option>, // Pre-init options + database: Option, +} + +// Invariant: options.is_none() XOR database.is_none() +struct ExportedConnection { + options: Option>, // Pre-init options + connection: Option<::ConnectionType>, +} + +struct ExportedStatement { + statement: + <::ConnectionType as Connection>::StatementType, +} + +pub fn make_ffi_driver() -> FFI_AdbcDriver { + FFI_AdbcDriver { + private_data: std::ptr::null_mut(), + private_manager: std::ptr::null(), + release: Some(release_ffi_driver), + DatabaseInit: Some(database_init::), + DatabaseNew: Some(database_new::), + DatabaseSetOption: Some(database_set_option::), + DatabaseRelease: Some(database_release::), + ConnectionCommit: Some(connection_commit::), + ConnectionGetInfo: Some(connection_get_info::), + ConnectionGetObjects: Some(connection_get_objects::), + ConnectionGetTableSchema: Some(connection_get_table_schema::), + ConnectionGetTableTypes: Some(connection_get_table_types::), + ConnectionInit: Some(connection_init::), + ConnectionNew: Some(connection_new::), + ConnectionSetOption: Some(connection_set_option::), + ConnectionReadPartition: Some(connection_read_partition::), + ConnectionRelease: Some(connection_release::), + ConnectionRollback: Some(connection_rollback::), + StatementBind: Some(statement_bind::), + StatementBindStream: Some(statement_bind_stream::), + StatementExecuteQuery: Some(statement_execute_query::), + StatementExecutePartitions: Some(statement_execute_partitions::), + StatementGetParameterSchema: Some(statement_get_parameter_schema::), + StatementNew: Some(statement_new::), + StatementPrepare: Some(statement_prepare::), + StatementRelease: Some(statement_release::), + StatementSetOption: Some(statement_set_option::), + StatementSetSqlQuery: Some(statement_set_sql_query::), + StatementSetSubstraitPlan: Some(statement_set_substrait_plan::), + ErrorGetDetailCount: Some(error_get_detail_count), + ErrorGetDetail: Some(error_get_detail), + ErrorFromArrayStream: None, // TODO(alexandreyc): what to do with this? + DatabaseGetOption: Some(database_get_option::), + DatabaseGetOptionBytes: Some(database_get_option_bytes::), + DatabaseGetOptionDouble: Some(database_get_option_double::), + DatabaseGetOptionInt: Some(database_get_option_int::), + DatabaseSetOptionBytes: Some(database_set_option_bytes::), + DatabaseSetOptionDouble: Some(database_set_option_double::), + DatabaseSetOptionInt: Some(database_set_option_int::), + ConnectionCancel: Some(connection_cancel::), + ConnectionGetOption: Some(connection_get_option::), + ConnectionGetOptionBytes: Some(connection_get_option_bytes::), + ConnectionGetOptionDouble: Some(connection_get_option_double::), + ConnectionGetOptionInt: Some(connection_get_option_int::), + ConnectionGetStatistics: Some(connection_get_statistics::), + ConnectionGetStatisticNames: Some(connection_get_statistic_names::), + ConnectionSetOptionBytes: Some(connection_set_option_bytes::), + ConnectionSetOptionDouble: Some(connection_set_option_double::), + ConnectionSetOptionInt: Some(connection_set_option_int::), + StatementCancel: Some(statement_cancel::), + StatementExecuteSchema: Some(statement_execute_schema::), + StatementGetOption: Some(statement_get_option::), + StatementGetOptionBytes: Some(statement_get_option_bytes::), + StatementGetOptionDouble: Some(statement_get_option_double::), + StatementGetOptionInt: Some(statement_get_option_int::), + StatementSetOptionBytes: Some(statement_set_option_bytes::), + StatementSetOptionDouble: Some(statement_set_option_double::), + StatementSetOptionInt: Some(statement_set_option_int::), + } +} + +/// Export a Rust driver to a C driver. +/// +/// The default name recommended is `AdbcDriverInit` or `DriverInit`. +/// +/// The driver type must implement [Driver] and [Default]. +#[macro_export] +macro_rules! export_driver { + ($func_name:ident, $driver_type:ty) => { + #[no_mangle] + pub unsafe extern "C" fn $func_name( + version: std::os::raw::c_int, + driver: *mut std::os::raw::c_void, + error: *mut $crate::ffi::FFI_AdbcError, + ) -> $crate::ffi::FFI_AdbcStatusCode { + if version != $crate::options::AdbcVersion::V110.into() { + let err = $crate::error::Error::with_message_and_status( + format!("Unsupported ADBC version: {version}"), + $crate::error::Status::NotImplemented, + ); + $crate::check_err!(Err(err), error); + } + + if driver.is_null() { + let err = $crate::error::Error::with_message_and_status( + "Passed null pointer to initialization function", + $crate::error::Status::NotImplemented, + ); + $crate::check_err!(Err(err), error); + } + + let ffi_driver = $crate::driver_exporter::make_ffi_driver::<$driver_type>(); + unsafe { + std::ptr::write_unaligned(driver as *mut $crate::ffi::FFI_AdbcDriver, ffi_driver); + } + $crate::ffi::constants::ADBC_STATUS_OK + } + }; +} + +/// Given a Result, either unwrap the value or handle the error in ADBC function. +/// +/// This macro is for use when implementing ADBC methods that have an out +/// parameter for [FFI_AdbcError] and return [FFI_AdbcStatusCode]. If the result is +/// `Ok`, the expression resolves to the value. Otherwise, it will return early, +/// setting the error and status code appropriately. In order for this to work, +/// the error must be convertible to [crate::error::Error]. +#[doc(hidden)] +#[macro_export] +macro_rules! check_err { + ($res:expr, $err_out:expr) => { + match $res { + Ok(x) => x, + Err(error) => { + let error = $crate::error::Error::from(error); + let status: $crate::ffi::FFI_AdbcStatusCode = error.status.into(); + if !$err_out.is_null() { + let mut ffi_error = $crate::ffi::FFI_AdbcError::try_from(error).unwrap(); + ffi_error.private_driver = (*$err_out).private_driver; + unsafe { std::ptr::write_unaligned($err_out, ffi_error) }; + } + return status; + } + } + }; +} + +/// Check that the given raw pointer is not null. +/// +/// If null, an error is returned from the enclosing function, otherwise this is +/// a no-op. +macro_rules! check_not_null { + ($ptr:ident, $err_out:expr) => { + let res = if $ptr.is_null() { + Err(Error::with_message_and_status( + format!("Passed null pointer for argument {:?}", stringify!($ptr)), + Status::InvalidArguments, + )) + } else { + Ok(()) + }; + check_err!(res, $err_out); + }; +} + +unsafe extern "C" fn release_ffi_driver( + driver: *mut FFI_AdbcDriver, + _error: *mut FFI_AdbcError, +) -> FFI_AdbcStatusCode { + if let Some(driver) = driver.as_mut() { + driver.release = None; + } + ADBC_STATUS_OK +} + +// Option helpers + +unsafe fn copy_string(src: &str, dst: *mut c_char, length: *mut usize) -> Result<()> { + let n = src.len() + 1; // +1 for nul terminator + let src = CString::new(src)?; + if n <= *length { + std::ptr::copy(src.as_ptr(), dst, n); + } + *length = n; + Ok::<(), Error>(()) +} + +unsafe fn copy_bytes(src: &[u8], dst: *mut u8, length: *mut usize) { + let n = src.len(); + if n <= *length { + std::ptr::copy(src.as_ptr(), dst, n); + } + *length = n; +} + +unsafe fn get_option_int<'a, OptionType, Object>( + object: Option<&Object>, + options: &mut Option>, + key: *const c_char, +) -> Result +where + OptionType: Hash + Eq + From<&'a str>, + Object: Optionable