From e527e2506e7d35c6a3584f21b0460a7c84bcb3dd Mon Sep 17 00:00:00 2001 From: James Duong Date: Thu, 15 Feb 2024 06:23:07 -0800 Subject: [PATCH] Support Depth.TABLES and Depth.ALL --- java/driver/flight-sql/pom.xml | 11 +- .../flightsql/GetObjectsMetadataReaders.java | 685 +++++++++++------- 2 files changed, 445 insertions(+), 251 deletions(-) diff --git a/java/driver/flight-sql/pom.xml b/java/driver/flight-sql/pom.xml index e410c2a14e..13aa44f5c0 100644 --- a/java/driver/flight-sql/pom.xml +++ b/java/driver/flight-sql/pom.xml @@ -67,6 +67,12 @@ adbc-sql + + + org.apache.arrow + flight-sql-jdbc-core + + org.checkerframework @@ -96,11 +102,6 @@ 4.13.1 test - - org.apache.arrow - flight-sql-jdbc-core - test - org.apache.arrow flight-sql-jdbc-core diff --git a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/GetObjectsMetadataReaders.java b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/GetObjectsMetadataReaders.java index f18e4da1ab..e7400ae744 100644 --- a/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/GetObjectsMetadataReaders.java +++ b/java/driver/flight-sql/src/main/java/org/apache/arrow/adbc/driver/flightsql/GetObjectsMetadataReaders.java @@ -17,23 +17,29 @@ package org.apache.arrow.adbc.driver.flightsql; import com.github.benmanes.caffeine.cache.LoadingCache; +import com.google.common.primitives.Shorts; +import java.io.ByteArrayInputStream; import java.io.IOException; -import java.nio.charset.StandardCharsets; +import java.nio.channels.Channels; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; -import java.util.LinkedHashSet; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.function.Supplier; import java.util.regex.Pattern; import org.apache.arrow.adbc.core.AdbcConnection; import org.apache.arrow.adbc.core.AdbcException; import org.apache.arrow.adbc.core.StandardSchemas; +import org.apache.arrow.driver.jdbc.utils.SqlTypes; import org.apache.arrow.flight.FlightEndpoint; import org.apache.arrow.flight.Location; +import org.apache.arrow.flight.sql.FlightSqlColumnMetadata; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; @@ -41,21 +47,52 @@ import org.apache.arrow.vector.complex.writer.BaseWriter; import org.apache.arrow.vector.complex.writer.VarCharWriter; import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; import org.apache.arrow.vector.util.Text; final class GetObjectsMetadataReaders { + private static final String JAVA_REGEX_SPECIALS = "[]()|^-+*?{}$\\."; + private static final byte[] EMPTY_BYTE_ARRAY = new byte[0]; + + static final int NO_DECIMAL_DIGITS = 0; + static final int COLUMN_SIZE_BYTE = (int) Math.ceil((Byte.SIZE - 1) * Math.log(2) / Math.log(10)); + static final int COLUMN_SIZE_SHORT = + (int) Math.ceil((Short.SIZE - 1) * Math.log(2) / Math.log(10)); + static final int COLUMN_SIZE_INT = + (int) Math.ceil((Integer.SIZE - 1) * Math.log(2) / Math.log(10)); + static final int COLUMN_SIZE_LONG = (int) Math.ceil((Long.SIZE - 1) * Math.log(2) / Math.log(10)); + static final int COLUMN_SIZE_VARCHAR_AND_BINARY = 65536; + static final int COLUMN_SIZE_DATE = "YYYY-MM-DD".length(); + static final int COLUMN_SIZE_TIME = "HH:MM:ss".length(); + static final int COLUMN_SIZE_TIME_MILLISECONDS = "HH:MM:ss.SSS".length(); + static final int COLUMN_SIZE_TIME_MICROSECONDS = "HH:MM:ss.SSSSSS".length(); + static final int COLUMN_SIZE_TIME_NANOSECONDS = "HH:MM:ss.SSSSSSSSS".length(); + static final int COLUMN_SIZE_TIMESTAMP_SECONDS = COLUMN_SIZE_DATE + 1 + COLUMN_SIZE_TIME; + static final int COLUMN_SIZE_TIMESTAMP_MILLISECONDS = + COLUMN_SIZE_DATE + 1 + COLUMN_SIZE_TIME_MILLISECONDS; + static final int COLUMN_SIZE_TIMESTAMP_MICROSECONDS = + COLUMN_SIZE_DATE + 1 + COLUMN_SIZE_TIME_MICROSECONDS; + static final int COLUMN_SIZE_TIMESTAMP_NANOSECONDS = + COLUMN_SIZE_DATE + 1 + COLUMN_SIZE_TIME_NANOSECONDS; + static final int DECIMAL_DIGITS_TIME_MILLISECONDS = 3; + static final int DECIMAL_DIGITS_TIME_MICROSECONDS = 6; + static final int DECIMAL_DIGITS_TIME_NANOSECONDS = 9; + static ArrowReader CreateGetObjectsReader( BufferAllocator allocator, FlightSqlClientWithCallOptions client, LoadingCache clientCache, - final AdbcConnection.GetObjectsDepth depth, - final String catalogPattern, - final String dbSchemaPattern, - final String tableNamePattern, - final String[] tableTypes, - final String columnNamePattern) + AdbcConnection.GetObjectsDepth depth, + String catalogPattern, + String dbSchemaPattern, + String tableNamePattern, + String[] tableTypes, + String columnNamePattern) throws AdbcException { switch (depth) { case CATALOGS: @@ -140,12 +177,6 @@ protected void finish() throws AdbcException, IOException {} protected VectorSchemaRoot getAggregateRoot() { return aggregateRoot; } - - protected static Pattern toPattern(String filterPattern) { - return filterPattern != null - ? Pattern.compile(Pattern.quote(filterPattern).replace("_", ".").replace("%", ".*")) - : null; - } } private static class GetCatalogsMetadataReader extends GetObjectMetadataReader { @@ -158,7 +189,7 @@ protected GetCatalogsMetadataReader( String catalog) throws AdbcException { super(allocator, client, clientCache, () -> doRequest(client)); - catalogPattern = toPattern(catalog); + catalogPattern = Pattern.compile(sqlToRegexLike(catalog)); } @Override @@ -180,7 +211,7 @@ private static List doRequest(FlightSqlClientWithCallOptions cli private static class GetDbSchemasMetadataReader extends GetObjectMetadataReader { private final String catalog; - private Map> catalogToSchemaMap = new LinkedHashMap<>(); + private final Map> catalogToSchemaMap = new LinkedHashMap<>(); protected GetDbSchemasMetadataReader( BufferAllocator allocator, @@ -231,10 +262,6 @@ protected void finish() throws AdbcException, IOException { catalogColumn.makeTransferPair(outputCatalogColumn).transfer(); } - // Track visited catalogs to see if there were any encountered during processRoots that didn't - // get reported by the GetCatalogs() RPC call. - Set visitedCatalogs = new LinkedHashSet<>(); - // Now map catalog names to schema lists. UnionListWriter adbcCatalogDbSchemasWriter = ((ListVector) getAggregateRoot().getVector(1)).getWriter(); @@ -255,23 +282,6 @@ protected void finish() throws AdbcException, IOException { } adbcCatalogDbSchemasWriter.endList(); } - visitedCatalogs.add(catalog); - } - - // If there were any catalogs that had schemas, but didn't show up in getCatalogs(), add them - // to the vector. - catalogToSchemaMap.keySet().removeAll(visitedCatalogs); - int i = getAggregateRoot().getRowCount(); - getAggregateRoot().setRowCount(i + catalogToSchemaMap.size()); - for (Map.Entry> entry : catalogToSchemaMap.entrySet()) { - outputCatalogColumn.setSafe(i, entry.getKey().getBytes(StandardCharsets.UTF_8)); - adbcCatalogDbSchemasWriter.startList(); - for (String schema : entry.getValue()) { - adbcCatalogDbSchemasStructWriter.start(); - adbcCatalogDbSchemaNameWriter.writeVarChar(schema); - adbcCatalogDbSchemasStructWriter.end(); - } - adbcCatalogDbSchemasWriter.endList(); } } @@ -282,8 +292,32 @@ private static List doRequest( } private static class GetTablesMetadataReader extends GetObjectMetadataReader { - private final Pattern columnNamePattern; + private static class ColumnDefinition { + Field field; + FlightSqlColumnMetadata metadata; + int ordinal; + + static ColumnDefinition from(Field field, int ordinal) { + ColumnDefinition columnDefinition = new ColumnDefinition(); + columnDefinition.field = field; + columnDefinition.metadata = new FlightSqlColumnMetadata(field.getMetadata()); + columnDefinition.ordinal = ordinal; + return columnDefinition; + } + } + + private static class TableDefinition { + String tableType; + + List columnDefinitions; + } + + private final String catalogPattern; + private final String dbSchemaPattern; + private final Pattern compiledColumnNamePattern; private final boolean shouldGetColumns; + private final Map>> tablePathToColumnsMap = + new LinkedHashMap<>(); protected GetTablesMetadataReader( BufferAllocator allocator, @@ -299,7 +333,9 @@ protected GetTablesMetadataReader( client, clientCache, () -> doRequest(client, catalogPattern, schemaPattern, tablePattern, tableTypes, false)); - columnNamePattern = null; + this.catalogPattern = catalogPattern; + this.dbSchemaPattern = schemaPattern; + compiledColumnNamePattern = null; shouldGetColumns = false; } @@ -318,12 +354,251 @@ protected GetTablesMetadataReader( client, clientCache, () -> doRequest(client, catalogPattern, schemaPattern, tablePattern, tableTypes, true)); - columnNamePattern = toPattern(columnPattern); + this.catalogPattern = catalogPattern; + this.dbSchemaPattern = schemaPattern; + compiledColumnNamePattern = Pattern.compile(sqlToRegexLike(columnPattern)); shouldGetColumns = true; } @Override - protected void processRootFromStream(VectorSchemaRoot root) {} + protected void processRootFromStream(VectorSchemaRoot root) { + VarCharVector catalogVector = (VarCharVector) root.getVector(0); + VarCharVector schemaVector = (VarCharVector) root.getVector(1); + VarCharVector tableVector = (VarCharVector) root.getVector(2); + VarCharVector tableTypeVector = (VarCharVector) root.getVector(3); + VarBinaryVector tableSchemaVector = + shouldGetColumns ? (VarBinaryVector) root.getVector(4) : null; + + for (int i = 0; i < root.getRowCount(); ++i) { + List columns = getColumnDefinitions(tableSchemaVector, i); + final String catalog; + if (catalogVector.isNull(i)) { + catalog = ""; + } else { + catalogVector.read(i, buffer); + catalog = buffer.toString(); + } + + final String schema; + if (schemaVector.isNull(i)) { + schema = ""; + } else { + schemaVector.read(i, buffer); + schema = buffer.toString(); + } + + final String tableType; + if (tableTypeVector.isNull(i)) { + tableType = null; + } else { + tableVector.read(i, buffer); + tableType = buffer.toString(); + } + + tableVector.read(i, buffer); + String table = buffer.toString(); + tablePathToColumnsMap.compute( + // Build the outer-most map (catalog-level). + catalog, + (catalogEntryKey, catalogEntryValue) -> { + if (catalogEntryValue == null) { + catalogEntryValue = new HashMap<>(); + } + catalogEntryValue.compute( + // Build the mid-level map (schema-level). + schema, + (schemaEntryKey, schemaEntryValue) -> { + // Build the inner-most map (table-level). + if (schemaEntryValue == null) { + schemaEntryValue = new LinkedHashMap<>(); + } + TableDefinition tableDefinition = new TableDefinition(); + tableDefinition.columnDefinitions = columns; + tableDefinition.tableType = tableType; + schemaEntryValue.put(table, tableDefinition); + return schemaEntryValue; + }); + return catalogEntryValue; + }); + } + } + + @Override + protected void finish() throws AdbcException, IOException { + // Create a schema-only reader to get the catalog->schema hierarchy, including empty catalogs + // and schemas. + // Then transfer the contents of this to the current reader's root. + VarCharVector outputCatalogColumn = (VarCharVector) getAggregateRoot().getVector(0); + ListVector outputSchemaStructList = (ListVector) getAggregateRoot().getVector(1); + try (GetDbSchemasMetadataReader schemaReader = + new GetDbSchemasMetadataReader( + allocator, client, clientCache, catalogPattern, dbSchemaPattern)) { + getAggregateRoot().setRowCount(schemaReader.getAggregateRoot().getRowCount()); + VarCharVector catalogColumn = (VarCharVector) schemaReader.getAggregateRoot().getVector(0); + catalogColumn.makeTransferPair(outputCatalogColumn).transfer(); + } + + // Iterate over catalogs and schemas reported by the GetDbSchemasMetadataReader. + for (int i = 0; i < getAggregateRoot().getRowCount(); ++i) { + outputCatalogColumn.read(i, buffer); + final String catalog = buffer.toString(); + final UnionListWriter schemaListWriter = outputSchemaStructList.getWriter(); + for (Object schemaStructObj : outputSchemaStructList.getObject(i)) { + final Map schemaStructAsMap = (Map) schemaStructObj; + String schemaName = (String) schemaStructAsMap.get("db_schema_name"); + + // If either the catalog or the schema was not reported by the GetTables RPC call during + // processRootFromStream(), + // it means that this was an empty (table-less) catalog or schema pair and should be + // skipped. + final Map> schemaToTableMap = + tablePathToColumnsMap.get(catalog); + if (schemaToTableMap == null) { + continue; + } + + final Map tables = schemaToTableMap.get(schemaName); + if (tables == null) { + continue; + } + + // Set up the schema list writer to write at the current position. + schemaListWriter.setPosition(i); + BaseWriter.StructWriter schemaStructWriter = schemaListWriter.struct(); + schemaStructWriter.start(); + schemaStructWriter.varChar("db_schema_name").writeVarChar(schemaName); + BaseWriter.ListWriter tableWriter = schemaStructWriter.list("db_schema_tables"); + // Process each table. + for (Map.Entry table : tables.entrySet()) { + tableWriter.startList(); + BaseWriter.StructWriter tableStructWriter = tableWriter.struct(); + tableStructWriter.start(); + tableStructWriter.varChar("table_name").writeVarChar(table.getKey()); + tableStructWriter.varChar("table_type").writeVarChar(table.getValue().tableType); + + // Process each column if columns are requested. + if (shouldGetColumns) { + BaseWriter.ListWriter columnListWriter = tableStructWriter.list("table_columns"); + columnListWriter.startList(); + for (ColumnDefinition columnDefinition : table.getValue().columnDefinitions) { + BaseWriter.StructWriter columnDefinitionWriter = columnListWriter.struct(); + writeColumnDefinition(columnDefinition, columnDefinitionWriter); + } + columnListWriter.endList(); + } + tableStructWriter.end(); + tableWriter.endList(); + } + tableWriter.endList(); + schemaStructWriter.end(); + } + } + } + + /** + * If columns are not needed, return an empty list. If columns are needed, and all columns fail + * the column pattern filter, return an empty list. If columns are needed, and the column name + * passes the column pattern filter, return the ColumnDefinition list. + */ + private List getColumnDefinitions( + VarBinaryVector tableSchemaVector, int index) { + if (tableSchemaVector == null) { + return Collections.emptyList(); + } + + tableSchemaVector.read(index, buffer); + try { + final List result = new ArrayList<>(); + final Schema tableSchema = + MessageSerializer.deserializeSchema( + new ReadChannel( + Channels.newChannel( + new ByteArrayInputStream(buffer.getBytes(), 0, (int) buffer.getLength())))); + + final List fields = tableSchema.getFields(); + for (int fieldIndex = 0; fieldIndex < fields.size(); fieldIndex++) { + final Field field = fields.get(fieldIndex); + if (compiledColumnNamePattern == null + || compiledColumnNamePattern.matcher(field.getName()).matches()) { + result.add(ColumnDefinition.from(field, fieldIndex + 1)); + } + } + return result; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void writeColumnDefinition( + ColumnDefinition columnDefinition, BaseWriter.StructWriter columnDefinitionWriter) { + // This code is based on the implementation of getColumns() in the Flight JDBC driver. + columnDefinitionWriter.varChar("column_name").writeVarChar(columnDefinition.field.getName()); + columnDefinitionWriter.integer("ordinal_position").writeInt(columnDefinition.ordinal); + // columnDefinitionWriter.varChar("remarks").writeVarChar(); + columnDefinitionWriter + .smallInt("xdbc_data_type") + .writeSmallInt( + Shorts.saturatedCast( + SqlTypes.getSqlTypeIdFromArrowType(columnDefinition.field.getType()))); + + final ArrowType fieldType = columnDefinition.field.getType(); + String typeName = columnDefinition.metadata.getTypeName(); + if (typeName == null) { + typeName = SqlTypes.getSqlTypeNameFromArrowType(fieldType); + } + if (typeName != null) { + columnDefinitionWriter.varChar("xdbc_type_name").writeVarChar(typeName); + } + + Integer columnSize = columnDefinition.metadata.getPrecision(); + if (columnSize == null) { + columnSize = getColumnSize(fieldType); + } + if (columnSize != null) { + columnDefinitionWriter.integer("xdbc_column_size").writeInt(columnSize); + } + + Integer decimalDigits = columnDefinition.metadata.getScale(); + if (decimalDigits == null) { + decimalDigits = getDecimalDigits(fieldType); + } + if (decimalDigits != null) { + columnDefinitionWriter + .smallInt("xdbc_decimal_digits") + .writeSmallInt(Shorts.saturatedCast(columnDefinition.metadata.getScale())); + } + + // This is taken from the JDBC driver, but seems wrong that all three branches write the same + // value. + // Float should probably be 2. + if (fieldType instanceof ArrowType.Decimal) { + columnDefinitionWriter.smallInt("xdbc_num_prec_radix").writeSmallInt((short) 10); + } else if (fieldType instanceof ArrowType.Int) { + columnDefinitionWriter.smallInt("xdbc_num_prec_radix").writeSmallInt((short) 10); + } else if (fieldType instanceof ArrowType.FloatingPoint) { + columnDefinitionWriter.smallInt("xdbc_num_prec_radix").writeSmallInt((short) 10); + } + + columnDefinitionWriter + .smallInt("xdbc_nullable") + .writeSmallInt(columnDefinition.field.isNullable() ? (short) 1 : 0); + // columnDefinitionWriter.varChar("xdbc_column_def").writeVarChar(); + columnDefinitionWriter + .smallInt("xdbc_sql_data_type") + .writeSmallInt((short) SqlTypes.getSqlTypeIdFromArrowType(fieldType)); + // columnDefinitionWriter.smallInt("xdbc_datetime_sub").writeSmallInt(); + // columnDefinitionWriter.integer("xdbc_char_octet_length").writeInt(); + columnDefinitionWriter + .varChar("xdbc_is_nullable") + .writeVarChar(columnDefinition.field.isNullable() ? "YES" : "NO"); + // columnDefinitionWriter.varChar("xdbc_scope_catalog").writeVarChar(); + // columnDefinitionWriter.varChar("xdbc_scope_schema").writeVarChar(); + // columnDefinitionWriter.varChar("xdbc_scope_table").writeVarChar(); + columnDefinitionWriter + .bit("xdbc_auto_increment") + .writeBit(columnDefinition.metadata.isAutoIncrement() ? 1 : 0); + // columnDefinitionWriter.bit("xdbc_is_generatedcolumn").writeBit(); + } private static List doRequest( FlightSqlClientWithCallOptions client, @@ -338,208 +613,126 @@ private static List doRequest( } } - // private final FlightSqlClientWithCallOptions client; - // private final VectorSchemaRoot root; - // private final VarCharVector adbcCatalogNames; - // private final UnionListWriter adbcCatalogDbSchemasWriter; - // private final BaseWriter.StructWriter adbcCatalogDbSchemasStructWriter; - // private final BaseWriter.ListWriter adbcCatalogDbSchemaTablesWriter; - // private final VarCharWriter adbcCatalogDbSchemaNameWriter; - // private final BaseWriter.StructWriter adbcTablesStructWriter; - // private final VarCharWriter adbcTableNameWriter; - // private final VarCharWriter adbcTableTypeWriter; - // private final BaseWriter.ListWriter adbcTableColumnsWriter; - // private final BufferAllocator allocator; - // private final AdbcConnection.GetObjectsDepth depth; - // private final Pattern precompiledColumnNamePattern; - // - // GetObjectsMetadataReaders( - // BufferAllocator allocator, - // FlightSqlClientWithCallOptions client, - // LoadingCache clientCache, - // final AdbcConnection.GetObjectsDepth depth, - // final String catalogPattern, - // final String dbSchemaPattern, - // final String tableNamePattern, - // final String[] tableTypes, - // final String columnNamePattern) { - // super(allocator, client, clientCache, () -> doRequest(client, depth, catalogPattern, - // dbSchemaPattern, tableNamePattern, tableTypes)); - // this.allocator = allocator; - // this.client = client; - // this.depth = depth; - // this.precompiledColumnNamePattern = - // columnNamePattern != null && this.depth == AdbcConnection.GetObjectsDepth.ALL - // ? Pattern.compile(Pattern.quote(columnNamePattern).replace("_", ".").replace("%", - // ".*")) - // : null; - // this.root = VectorSchemaRoot.create(StandardSchemas.GET_OBJECTS_SCHEMA, allocator); - // this.adbcCatalogNames = (VarCharVector) root.getVector(0); - // this.adbcCatalogDbSchemasWriter = ((ListVector) root.getVector(1)).getWriter(); - // this.adbcCatalogDbSchemasStructWriter = adbcCatalogDbSchemasWriter.struct(); - // this.adbcCatalogDbSchemaTablesWriter = - // adbcCatalogDbSchemasStructWriter.list("db_schema_tables"); - // this.adbcCatalogDbSchemaNameWriter = - // adbcCatalogDbSchemasStructWriter.varChar("db_schema_name"); - // this.adbcTablesStructWriter = adbcCatalogDbSchemaTablesWriter.struct(); - // this.adbcTableNameWriter = adbcTablesStructWriter.varChar("table_name"); - // this.adbcTableTypeWriter = adbcTablesStructWriter.varChar("table_type"); - // this.adbcTableColumnsWriter = adbcTablesStructWriter.list("table_columns"); - // } - // - // private void writeVarChar(VarCharWriter writer, NullableVarCharHolder value) { - // writer.writeVarChar(0, value.length, tempBuf); - // } - // - // @Override - // protected void processRootFromStream(VectorSchemaRoot root) { - // - // } - // - // /** The caller must close the returned root. */ - // VectorSchemaRoot build() throws AdbcException { - // // TODO Catalogs and schemas that don't contain tables are being left out - // byte[] lastCatalogAdded = null; - // byte[] lastDbSchemaAdded = null; - // int catalogIndex = 0; - // - // for (FlightEndpoint endpoint : info.getEndpoints()) { - // try (FlightStream stream = client.getStream(endpoint.getTicket())) { - // while (stream.next()) { - // try (VectorSchemaRoot res = stream.getRoot()) { - // VarCharVector catalogVector = (VarCharVector) res.getVector(0); - // - // for (int i = 0; i < res.getRowCount(); i++) { - // byte[] catalog = catalogVector.get(i); - // - // if (i == 0 || lastCatalogAdded != catalog) { - // if (catalog == null) { - // adbcCatalogNames.setNull(catalogIndex); - // } else { - // adbcCatalogNames.setSafe(catalogIndex, catalog); - // } - // if (depth == AdbcConnection.GetObjectsDepth.CATALOGS) { - // adbcCatalogDbSchemasWriter.writeNull(); - // } else { - // if (catalogIndex != 0) { - // adbcCatalogDbSchemasWriter.endList(); - // } - // adbcCatalogDbSchemasWriter.startList(); - // lastDbSchemaAdded = null; - // } - // catalogIndex++; - // lastCatalogAdded = catalog; - // } - // - // if (depth != AdbcConnection.GetObjectsDepth.CATALOGS) { - // VarCharVector dbSchemaVector = (VarCharVector) res.getVector(1); - // byte[] dbSchema = dbSchemaVector.get(i); - // - // if (!Arrays.equals(lastDbSchemaAdded, dbSchema)) { - // if (i != 0) { - // adbcCatalogDbSchemaTablesWriter.endList(); - // adbcCatalogDbSchemasStructWriter.end(); - // } - // adbcCatalogDbSchemasStructWriter.start(); - // writeVarChar( - // adbcCatalogDbSchemaNameWriter, new String(dbSchema, - // StandardCharsets.UTF_8)); - // if (depth == AdbcConnection.GetObjectsDepth.DB_SCHEMAS) { - // adbcCatalogDbSchemaTablesWriter.writeNull(); - // } else { - // adbcCatalogDbSchemaTablesWriter.startList(); - // } - // - // lastDbSchemaAdded = dbSchema; - // } - // } - // - // if (depth != AdbcConnection.GetObjectsDepth.CATALOGS - // && depth != AdbcConnection.GetObjectsDepth.DB_SCHEMAS) { - // VarCharVector tableNameVector = (VarCharVector) res.getVector(2); - // VarCharVector tableTypeVector = (VarCharVector) res.getVector(3); - // - // adbcTablesStructWriter.start(); - // writeVarChar( - // adbcTableNameWriter, - // new String(tableNameVector.get(i), StandardCharsets.UTF_8)); - // writeVarChar( - // adbcTableTypeWriter, - // new String(tableTypeVector.get(i), StandardCharsets.UTF_8)); - // - // if (depth == AdbcConnection.GetObjectsDepth.ALL) { - // VarBinaryVector tableSchemaVector = (VarBinaryVector) res.getVector(4); - // Schema schema; - // - // try { - // schema = - // MessageSerializer.deserializeSchema( - // new ReadChannel( - // Channels.newChannel( - // new ByteArrayInputStream(tableSchemaVector.get(i))))); - // } catch (IOException e) { - // throw new RuntimeException(e); - // } - // - // adbcTableColumnsWriter.startList(); - // - // for (int y = 0; y < schema.getFields().size(); y++) { - // Field field = schema.getFields().get(y); - // if (precompiledColumnNamePattern == null - // || precompiledColumnNamePattern.matcher(field.getName()).matches()) { - // adbcTableColumnsWriter.struct().start(); - // writeVarChar( - // adbcTableColumnsWriter.struct().varChar("column_name"), - // field.getName()); - // adbcTableColumnsWriter.struct().integer("ordinal_position").writeInt(y + - // 1); - // adbcTableColumnsWriter.struct().end(); - // } - // } - // adbcTableColumnsWriter.endList(); - // } - // - // adbcTablesStructWriter.end(); - // } - // } - // - // if (depth != AdbcConnection.GetObjectsDepth.CATALOGS) { - // adbcCatalogDbSchemaTablesWriter.endList(); - // adbcCatalogDbSchemasStructWriter.end(); - // adbcCatalogDbSchemasWriter.endList(); - // } - // } - // } - // } catch (Exception e) { - // throw new RuntimeException(e); - // } - // } - // - // root.setRowCount(catalogIndex); - // return root; - // } - // - // private static List doRequest(FlightSqlClientWithCallOptions client, - // AdbcConnection.GetObjectsDepth depth, String - // catalogPattern, - // String dbSchemaPattern, String tableNamePattern, - // String[] tableTypes) { - // - // final FlightInfo info; - // if (depth == AdbcConnection.GetObjectsDepth.CATALOGS) { - // info = client.getCatalogs(); - // } else if (depth == AdbcConnection.GetObjectsDepth.DB_SCHEMAS) { - // info = client.getSchemas(catalogPattern, dbSchemaPattern); - // } else { - // info = - // client.getTables( - // catalogPattern, - // dbSchemaPattern, - // tableNamePattern, - // tableTypes == null ? null : Arrays.asList(tableTypes), - // depth == AdbcConnection.GetObjectsDepth.ALL); - // } - // - // return info.getEndpoints(); - // } + static Integer getDecimalDigits(final ArrowType fieldType) { + // We aren't setting DECIMAL_DIGITS for Float/Double as their precision and scale are variable. + if (fieldType instanceof ArrowType.Decimal) { + final ArrowType.Decimal thisDecimal = (ArrowType.Decimal) fieldType; + return thisDecimal.getScale(); + } else if (fieldType instanceof ArrowType.Int) { + return NO_DECIMAL_DIGITS; + } else if (fieldType instanceof ArrowType.Timestamp) { + switch (((ArrowType.Timestamp) fieldType).getUnit()) { + case SECOND: + return NO_DECIMAL_DIGITS; + case MILLISECOND: + return DECIMAL_DIGITS_TIME_MILLISECONDS; + case MICROSECOND: + return DECIMAL_DIGITS_TIME_MICROSECONDS; + case NANOSECOND: + return DECIMAL_DIGITS_TIME_NANOSECONDS; + default: + break; + } + } else if (fieldType instanceof ArrowType.Time) { + switch (((ArrowType.Time) fieldType).getUnit()) { + case SECOND: + return NO_DECIMAL_DIGITS; + case MILLISECOND: + return DECIMAL_DIGITS_TIME_MILLISECONDS; + case MICROSECOND: + return DECIMAL_DIGITS_TIME_MICROSECONDS; + case NANOSECOND: + return DECIMAL_DIGITS_TIME_NANOSECONDS; + default: + break; + } + } else if (fieldType instanceof ArrowType.Date) { + return NO_DECIMAL_DIGITS; + } + + return null; + } + + static Integer getColumnSize(final ArrowType fieldType) { + // We aren't setting COLUMN_SIZE for ROWID SQL Types, as there's no such Arrow type. + // We aren't setting COLUMN_SIZE nor DECIMAL_DIGITS for Float/Double as their precision and + // scale are variable. + if (fieldType instanceof ArrowType.Decimal) { + final ArrowType.Decimal thisDecimal = (ArrowType.Decimal) fieldType; + return thisDecimal.getPrecision(); + } else if (fieldType instanceof ArrowType.Int) { + final ArrowType.Int thisInt = (ArrowType.Int) fieldType; + switch (thisInt.getBitWidth()) { + case Byte.SIZE: + return COLUMN_SIZE_BYTE; + case Short.SIZE: + return COLUMN_SIZE_SHORT; + case Integer.SIZE: + return COLUMN_SIZE_INT; + case Long.SIZE: + return COLUMN_SIZE_LONG; + default: + break; + } + } else if (fieldType instanceof ArrowType.Utf8 || fieldType instanceof ArrowType.Binary) { + return COLUMN_SIZE_VARCHAR_AND_BINARY; + } else if (fieldType instanceof ArrowType.Timestamp) { + switch (((ArrowType.Timestamp) fieldType).getUnit()) { + case SECOND: + return COLUMN_SIZE_TIMESTAMP_SECONDS; + case MILLISECOND: + return COLUMN_SIZE_TIMESTAMP_MILLISECONDS; + case MICROSECOND: + return COLUMN_SIZE_TIMESTAMP_MICROSECONDS; + case NANOSECOND: + return COLUMN_SIZE_TIMESTAMP_NANOSECONDS; + default: + break; + } + } else if (fieldType instanceof ArrowType.Time) { + switch (((ArrowType.Time) fieldType).getUnit()) { + case SECOND: + return COLUMN_SIZE_TIME; + case MILLISECOND: + return COLUMN_SIZE_TIME_MILLISECONDS; + case MICROSECOND: + return COLUMN_SIZE_TIME_MICROSECONDS; + case NANOSECOND: + return COLUMN_SIZE_TIME_NANOSECONDS; + default: + break; + } + } else if (fieldType instanceof ArrowType.Date) { + return COLUMN_SIZE_DATE; + } + + return null; + } + + static String sqlToRegexLike(final String sqlPattern) { + final int len = sqlPattern.length(); + final StringBuilder javaPattern = new StringBuilder(len + len); + + for (int i = 0; i < len; i++) { + final char currentChar = sqlPattern.charAt(i); + + if (JAVA_REGEX_SPECIALS.indexOf(currentChar) >= 0) { + javaPattern.append('\\'); + } + + switch (currentChar) { + case '_': + javaPattern.append('.'); + break; + case '%': + javaPattern.append("."); + javaPattern.append('*'); + break; + default: + javaPattern.append(currentChar); + break; + } + } + return javaPattern.toString(); + } }