Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor statistics data collect #34568

Merged
merged 5 commits into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.shardingsphere.sharding.metadata.data;

import com.cedarsoftware.util.CaseInsensitiveMap;
import org.apache.shardingsphere.infra.database.DatabaseTypeEngine;
import org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
Expand All @@ -26,10 +27,7 @@
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
import org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector;
import org.apache.shardingsphere.infra.metadata.statistics.collector.shardingsphere.ShardingSphereTableStatisticsCollector;
import org.apache.shardingsphere.infra.rule.attribute.datasource.aggregate.AggregatedDataSourceRuleAttribute;
import org.apache.shardingsphere.sharding.metadata.data.dialect.DialectShardingStatisticsTableCollector;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
Expand All @@ -39,58 +37,59 @@
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Sharding table statistics collector.
*/
public final class ShardingTableStatisticsCollector implements TableStatisticsCollector {
public final class ShardingTableStatisticsCollector implements ShardingSphereTableStatisticsCollector {

private static final String SHARDING_TABLE_STATISTICS = "sharding_table_statistics";
private long currentId = 1;

@Override
public Optional<TableStatistics> collect(final String databaseName, final ShardingSphereTable table, final ShardingSphereMetaData metaData) throws SQLException {
TableStatistics result = new TableStatistics(SHARDING_TABLE_STATISTICS);
public Collection<Map<String, Object>> collect(final String databaseName, final String schemaName, final String tableName, final ShardingSphereMetaData metaData) throws SQLException {
Collection<Map<String, Object>> result = new LinkedList<>();
DatabaseType protocolType = metaData.getAllDatabases().iterator().next().getProtocolType();
DialectDatabaseMetaData dialectDatabaseMetaData = new DatabaseTypeRegistry(protocolType).getDialectDatabaseMetaData();
currentId = 1;
if (dialectDatabaseMetaData.getDefaultSchema().isPresent()) {
collectFromDatabase(metaData.getDatabase(databaseName), result);
} else {
for (ShardingSphereDatabase each : metaData.getAllDatabases()) {
collectFromDatabase(each, result);
}
}
return result.getRows().isEmpty() ? Optional.empty() : Optional.of(result);
return result;
}

private void collectFromDatabase(final ShardingSphereDatabase database, final TableStatistics tableStatistics) throws SQLException {
private void collectFromDatabase(final ShardingSphereDatabase database, final Collection<Map<String, Object>> rows) throws SQLException {
Optional<ShardingRule> rule = database.getRuleMetaData().findSingleRule(ShardingRule.class);
if (!rule.isPresent()) {
return;
}
collectForShardingStatisticTable(database, rule.get(), tableStatistics);
collectForShardingStatisticTable(database, rule.get(), rows);
}

private void collectForShardingStatisticTable(final ShardingSphereDatabase database, final ShardingRule rule, final TableStatistics tableStatistics) throws SQLException {
int count = 1;
private void collectForShardingStatisticTable(final ShardingSphereDatabase database, final ShardingRule rule, final Collection<Map<String, Object>> rows) throws SQLException {
for (ShardingTable each : rule.getShardingTables().values()) {
for (DataNode dataNode : each.getActualDataNodes()) {
List<Object> row = new LinkedList<>();
row.add(count++);
row.add(database.getName());
row.add(each.getLogicTable());
row.add(dataNode.getDataSourceName());
row.add(dataNode.getTableName());
addTableRowsAndDataLength(database.getResourceMetaData().getStorageUnits(), dataNode, row, rule);
tableStatistics.getRows().add(new RowStatistics(row));
Map<String, Object> rowColumnValues = new CaseInsensitiveMap<>();
rowColumnValues.put("id", currentId++);
rowColumnValues.put("logic_database_name", database.getName());
rowColumnValues.put("logic_table_name", each.getLogicTable());
rowColumnValues.put("actual_database_name", dataNode.getDataSourceName());
rowColumnValues.put("actual_table_name", dataNode.getTableName());
addTableRowsAndDataLength(database.getResourceMetaData().getStorageUnits(), dataNode, rowColumnValues, rule);
rows.add(rowColumnValues);
}
}
}

private void addTableRowsAndDataLength(final Map<String, StorageUnit> storageUnits, final DataNode dataNode, final List<Object> row, final ShardingRule rule) throws SQLException {
private void addTableRowsAndDataLength(final Map<String, StorageUnit> storageUnits, final DataNode dataNode, final Map<String, Object> rowColumnValues,
final ShardingRule rule) throws SQLException {
DataSource dataSource;
DatabaseType databaseType;
StorageUnit storageUnit = storageUnits.get(dataNode.getDataSourceName());
Expand All @@ -103,26 +102,37 @@ private void addTableRowsAndDataLength(final Map<String, StorageUnit> storageUni
databaseType = null != dataSource ? DatabaseTypeEngine.getStorageType(dataSource) : null;
}
if (null != dataSource && null != databaseType) {
addTableRowsAndDataLength(databaseType, dataSource, dataNode, row);
addTableRowsAndDataLength(databaseType, dataSource, dataNode, rowColumnValues);
}
}

private void addTableRowsAndDataLength(final DatabaseType databaseType, final DataSource dataSource, final DataNode dataNode, final List<Object> row) throws SQLException {
private void addTableRowsAndDataLength(final DatabaseType databaseType, final DataSource dataSource, final DataNode dataNode,
final Map<String, Object> rowColumnValues) throws SQLException {
boolean isAppended = false;
Optional<DialectShardingStatisticsTableCollector> dialectCollector = DatabaseTypedSPILoader.findService(DialectShardingStatisticsTableCollector.class, databaseType);
if (dialectCollector.isPresent()) {
try (Connection connection = dataSource.getConnection()) {
isAppended = dialectCollector.get().appendRow(connection, dataNode, row);
isAppended = dialectCollector.get().appendRow(connection, dataNode, rowColumnValues);
}
}
if (!isAppended) {
row.add(BigDecimal.ZERO);
row.add(BigDecimal.ZERO);
rowColumnValues.put("row_count", BigDecimal.ZERO);
rowColumnValues.put("size", BigDecimal.ZERO);
}
}

@Override
public String getSchemaName() {
return "shardingsphere";
}

@Override
public String getTableName() {
return "sharding_table_statistics";
}

@Override
public String getType() {
return SHARDING_TABLE_STATISTICS;
return "shardingsphere.sharding_table_statistics";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;

/**
* Dialect sharding statistics table data collector.
Expand All @@ -40,9 +40,9 @@ public interface DialectShardingStatisticsTableCollector extends DatabaseTypedSP
*
* @param connection connection
* @param dataNode data node
* @param row row to be appended
* @param rowColumnValues row column values
* @return is appended or not
* @throws SQLException SQL exception
*/
boolean appendRow(Connection connection, DataNode dataNode, List<Object> row) throws SQLException;
boolean appendRow(Connection connection, DataNode dataNode, Map<String, Object> rowColumnValues) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;

/**
* Sharding statistics table data collector of MySQL.
Expand All @@ -34,14 +34,14 @@ public final class MySQLShardingStatisticsTableCollector implements DialectShard
private static final String FETCH_TABLE_ROWS_AND_DATA_LENGTH_SQL = "SELECT TABLE_ROWS, DATA_LENGTH FROM information_schema.TABLES WHERE TABLE_SCHEMA = ? AND TABLE_NAME = ?";

@Override
public boolean appendRow(final Connection connection, final DataNode dataNode, final List<Object> row) throws SQLException {
public boolean appendRow(final Connection connection, final DataNode dataNode, final Map<String, Object> rowColumnValues) throws SQLException {
try (PreparedStatement preparedStatement = connection.prepareStatement(FETCH_TABLE_ROWS_AND_DATA_LENGTH_SQL)) {
preparedStatement.setString(1, connection.getCatalog());
preparedStatement.setString(2, dataNode.getTableName());
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
row.add(resultSet.getBigDecimal(TABLE_ROWS_COLUMN_NAME));
row.add(resultSet.getBigDecimal(DATA_LENGTH_COLUMN_NAME));
rowColumnValues.put("row_count", resultSet.getBigDecimal(TABLE_ROWS_COLUMN_NAME));
rowColumnValues.put("size", resultSet.getBigDecimal(DATA_LENGTH_COLUMN_NAME));
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;

/**
* Sharding statistics table data collector of openGauss.
Expand All @@ -34,7 +34,7 @@ public final class OpenGaussShardingStatisticsTableCollector implements DialectS
private static final String FETCH_TABLE_ROWS_AND_DATA_LENGTH_SQL = "SELECT RELTUPLES AS TABLE_ROWS, PG_TABLE_SIZE(?) AS DATA_LENGTH FROM PG_CLASS WHERE RELNAME = ?";

@Override
public boolean appendRow(final Connection connection, final DataNode dataNode, final List<Object> row) throws SQLException {
public boolean appendRow(final Connection connection, final DataNode dataNode, final Map<String, Object> rowColumnValues) throws SQLException {
if (!isTableExist(connection, dataNode.getTableName())) {
return false;
}
Expand All @@ -43,8 +43,8 @@ public boolean appendRow(final Connection connection, final DataNode dataNode, f
preparedStatement.setString(2, dataNode.getTableName());
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet.next()) {
row.add(resultSet.getBigDecimal(TABLE_ROWS_COLUMN_NAME));
row.add(resultSet.getBigDecimal(DATA_LENGTH_COLUMN_NAME));
rowColumnValues.put("row_count", resultSet.getBigDecimal(TABLE_ROWS_COLUMN_NAME));
rowColumnValues.put("size", resultSet.getBigDecimal(DATA_LENGTH_COLUMN_NAME));
return true;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
Expand All @@ -38,9 +38,9 @@ public final class PostgreSQLShardingStatisticsTableCollector implements Dialect
private static final String FETCH_TABLE_DATA_LENGTH_SQL = "SELECT PG_RELATION_SIZE(RELID) as DATA_LENGTH FROM PG_STAT_ALL_TABLES T WHERE SCHEMANAME= ? AND RELNAME = ?";

@Override
public boolean appendRow(final Connection connection, final DataNode dataNode, final List<Object> row) throws SQLException {
row.add(getRowValue(connection, dataNode, FETCH_TABLE_ROWS_LENGTH_SQL, TABLE_ROWS_COLUMN_NAME).orElse(BigDecimal.ZERO));
row.add(getRowValue(connection, dataNode, FETCH_TABLE_DATA_LENGTH_SQL, DATA_LENGTH_COLUMN_NAME).orElse(BigDecimal.ZERO));
public boolean appendRow(final Connection connection, final DataNode dataNode, final Map<String, Object> rowColumnValues) throws SQLException {
rowColumnValues.put("row_count", getRowValue(connection, dataNode, FETCH_TABLE_ROWS_LENGTH_SQL, TABLE_ROWS_COLUMN_NAME).orElse(BigDecimal.ZERO));
rowColumnValues.put("size", getRowValue(connection, dataNode, FETCH_TABLE_DATA_LENGTH_SQL, DATA_LENGTH_COLUMN_NAME).orElse(BigDecimal.ZERO));
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import org.apache.shardingsphere.infra.metadata.database.resource.ResourceMetaData;
import org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
import org.apache.shardingsphere.infra.metadata.database.rule.RuleMetaData;
import org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
import org.apache.shardingsphere.infra.metadata.statistics.RowStatistics;
import org.apache.shardingsphere.infra.metadata.statistics.TableStatistics;
import org.apache.shardingsphere.infra.metadata.statistics.collector.table.TableStatisticsCollector;
import org.apache.shardingsphere.infra.metadata.statistics.collector.shardingsphere.ShardingSphereTableStatisticsCollector;
import org.apache.shardingsphere.infra.metadata.statistics.collector.DialectTableStatisticsCollector;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.sharding.rule.ShardingRule;
import org.apache.shardingsphere.sharding.rule.ShardingTable;
Expand All @@ -36,13 +34,14 @@

import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.Map.Entry;
import java.util.Properties;

import static org.hamcrest.CoreMatchers.is;
Expand All @@ -57,11 +56,11 @@ class ShardingTableStatisticsCollectorTest {

private final DatabaseType databaseType = TypedSPILoader.getService(DatabaseType.class, "FIXTURE");

private TableStatisticsCollector statisticsCollector;
private DialectTableStatisticsCollector statisticsCollector;

@BeforeEach
void setUp() {
statisticsCollector = TypedSPILoader.getService(TableStatisticsCollector.class, "sharding_table_statistics");
statisticsCollector = TypedSPILoader.getService(ShardingSphereTableStatisticsCollector.class, "shardingsphere.sharding_table_statistics");
}

@Test
Expand All @@ -71,8 +70,8 @@ void assertCollectWithoutShardingRule() throws SQLException {
when(database.getProtocolType()).thenReturn(databaseType);
ShardingSphereMetaData metaData = new ShardingSphereMetaData(
Collections.singleton(database), mock(ResourceMetaData.class), mock(RuleMetaData.class), new ConfigurationProperties(new Properties()));
Optional<TableStatistics> actual = statisticsCollector.collect("foo_db", mock(ShardingSphereTable.class), metaData);
assertFalse(actual.isPresent());
Collection<Map<String, Object>> actualRows = statisticsCollector.collect("foo_db", "shardingsphere", "sharding_table_statistics", metaData);
assertTrue(actualRows.isEmpty());
}

@Test
Expand All @@ -85,12 +84,36 @@ void assertCollectWithShardingRule() throws SQLException {
ShardingSphereDatabase database = new ShardingSphereDatabase(
"foo_db", databaseType, new ResourceMetaData(Collections.emptyMap(), storageUnits), new RuleMetaData(Collections.singleton(rule)), Collections.emptyList());
ShardingSphereMetaData metaData = new ShardingSphereMetaData(Collections.singleton(database), mock(), mock(), new ConfigurationProperties(new Properties()));
Optional<TableStatistics> actual = statisticsCollector.collect("foo_db", mock(ShardingSphereTable.class), metaData);
assertTrue(actual.isPresent());
assertThat(actual.get().getName(), is("sharding_table_statistics"));
List<RowStatistics> actualRows = new ArrayList<>(actual.get().getRows());
assertThat(actualRows.size(), is(2));
assertThat(actualRows.get(0).getRows(), is(Arrays.asList(1, "foo_db", "foo_tbl", "ds_0", "foo_tbl", new BigDecimal("0"), new BigDecimal("0"))));
assertThat(actualRows.get(1).getRows(), is(Arrays.asList(2, "foo_db", "foo_tbl", "ds_1", "foo_tbl", new BigDecimal("0"), new BigDecimal("0"))));
Collection<Map<String, Object>> actualRows = statisticsCollector.collect("foo_db", "shardingsphere", "sharding_table_statistics", metaData);
assertFalse(actualRows.isEmpty());
Collection<Map<String, Object>> expectedRows = new LinkedList<>();
expectedRows.add(createRowColumnValues(1L, "foo_db", "foo_tbl", "ds_0", "foo_tbl", new BigDecimal("0"), new BigDecimal("0")));
expectedRows.add(createRowColumnValues(2L, "foo_db", "foo_tbl", "ds_1", "foo_tbl", new BigDecimal("0"), new BigDecimal("0")));
assertRowsValue(expectedRows, actualRows);
}

private Map<String, Object> createRowColumnValues(final long id, final String logicDatabaseName, final String logicTableName, final String actualDatabaseName,
final String actualTableName, final BigDecimal rowCount, final BigDecimal size) {
Map<String, Object> result = new HashMap<>();
result.put("id", id);
result.put("logic_database_name", logicDatabaseName);
result.put("logic_table_name", logicTableName);
result.put("actual_database_name", actualDatabaseName);
result.put("actual_table_name", actualTableName);
result.put("row_count", rowCount);
result.put("size", size);
return result;
}

private void assertRowsValue(final Collection<Map<String, Object>> expectedRows, final Collection<Map<String, Object>> actualRows) {
assertThat(actualRows.size(), is(expectedRows.size()));
Iterator<Map<String, Object>> actualRowsIterator = actualRows.iterator();
for (Map<String, Object> each : expectedRows) {
Map<String, Object> actualRow = actualRowsIterator.next();
for (Entry<String, Object> entry : each.entrySet()) {
assertTrue(actualRow.containsKey(entry.getKey()));
assertThat(actualRow.get(entry.getKey()), is(entry.getValue()));
}
}
}
}
Loading