Skip to content

Commit

Permalink
[feature](selectdb-cloud) Fix unbalanced tablet distribution (apache#…
Browse files Browse the repository at this point in the history
…1121) (apache#1164)

* Fix the bug of unbalanced tablet distribution
* Use replica index hash to BE
  • Loading branch information
luwei16 authored Nov 28, 2022
1 parent 380864b commit b2f297e
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class CloudReplica extends Replica {
private static final Logger LOG = LogManager.getLogger(CloudReplica.class);

// In the future, a replica may be mapped to multiple BEs in a cluster,
// so this value is be list
private Map<String, List<Long>> clusterToBackends = new HashMap<String, List<Long>>();
private Map<String, List<Long>> clusterToBackends = new ConcurrentHashMap<String, List<Long>>();
@SerializedName(value = "dbId")
private long dbId = -1;
@SerializedName(value = "tableId")
Expand All @@ -67,7 +67,7 @@ public CloudReplica(long replicaId, List<Long> backendIds, ReplicaState state, l
}

private boolean isColocated() {
return idx != -1;
return Env.getCurrentColocateIndex().isColocateTable(tableId);
}

private long getColocatedBeId(String cluster) {
Expand Down Expand Up @@ -161,6 +161,10 @@ public long getBackendId() {
}
}

return hashReplicaToBe(cluster);
}

private long hashReplicaToBe(String cluster) {
// TODO(luwei) list shoule be sorted
List<Backend> clusterBes = Env.getCurrentSystemInfo().getBackendsByClusterName(cluster);
// use alive be to exec sql
Expand All @@ -175,9 +179,15 @@ public long getBackendId() {
return -1;
}
LOG.debug("availableBes={}", availableBes);
long index = getId() % availableBes.size();
long index = -1;
if (idx == -1) {
index = getId() % availableBes.size();
} else {
index = (partitionId + idx) % availableBes.size();
}
long pickedBeId = availableBes.get((int) index).getId();
LOG.debug("picked backendId={}", pickedBeId);
LOG.info("picked be Id {}, replica id {}, partition id {}, alive be num {}, replica idx {}, picked Index {}",
pickedBeId, getId(), partitionId, availableBes.size(), idx, index);

// save to clusterToBackends map
List<Long> bes = new ArrayList<Long>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3514,19 +3514,17 @@ public long loadDb(DataInputStream dis, long checksum) throws IOException, DdlEx
private void createCloudTablets(String clusterName, MaterializedIndex index, ReplicaState replicaState,
DistributionInfo distributionInfo, long version, ReplicaAllocation replicaAlloc, TabletMeta tabletMeta,
Set<Long> tabletIdSet) throws DdlException {
ColocateTableIndex colocateIndex = Env.getCurrentColocateIndex();
for (int i = 0; i < distributionInfo.getBucketNum(); ++i) {
Tablet tablet = new Tablet(Env.getCurrentEnv().getNextId());

// add tablet to inverted index first
index.addTablet(tablet, tabletMeta);
tabletIdSet.add(tablet.getId());

long idx = colocateIndex.isColocateTable(tabletMeta.getTableId()) ? i : -1;
long replicaId = Env.getCurrentEnv().getNextId();
Replica replica = new CloudReplica(replicaId, null, replicaState, version,
tabletMeta.getOldSchemaHash(), tabletMeta.getDbId(), tabletMeta.getTableId(),
tabletMeta.getPartitionId(), tabletMeta.getIndexId(), idx);
tabletMeta.getPartitionId(), tabletMeta.getIndexId(), i);
tablet.addReplica(replica);
}
}
Expand Down

0 comments on commit b2f297e

Please sign in to comment.