Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added load partioning for distributed runtime. #489

Merged
merged 1 commit into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 14 additions & 53 deletions src/runtime/distributed/coordinator/kernels/Broadcast.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <runtime/local/datastructures/Range.h>
#include <runtime/distributed/worker/WorkerImpl.h>
#include <runtime/distributed/proto/DistributedGRPCCaller.h>
#include <runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h>

#ifdef USE_MPI
#include <runtime/distributed/worker/MPIHelper.h>
Expand Down Expand Up @@ -75,41 +76,23 @@ struct Broadcast<ALLOCATION_TYPE::DIST_MPI, DT>
//auto ptr = (double*)(&mat);
MPISerializer::serializeStructure<DT>(&dataToSend, mat, isScalar, &messageLength);
std::vector<int> targetGroup; // We will not be able to take the advantage of broadcast if some mpi processes have the data
int worldSize = MPIHelper::getCommSize();
worldSize--; // to exclude coordinator
Range range;
range.r_start = 0;
range.c_start = 0;
range.r_len = mat->getNumRows();
range.c_len = mat->getNumCols();
for (int rank=0;rank<worldSize;rank++){ // we currently exclude the coordinator
std::string address=std::to_string(rank+1); // to skip coordinator
DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address);
if (dp!=nullptr) {
mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range);
auto data = dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData();
data.ix = DistributedIndex(0, 0);
dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).updateDistributedData(data);
}
else { // else create new dp entry
DistributedData data;
data.ix = DistributedIndex(0, 0);
AllocationDescriptorMPI allocationDescriptor (rank+1, /* to exclude coordinator*/
dctx,
data);
dp = mat->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range);
}

LoadPartitioningDistributed<DT, AllocationDescriptorMPI> partioner(DistributionSchema::BROADCAST, mat, dctx);
while (partioner.HasNextChunk()){
auto dp = partioner.GetNextChunk();
auto rank = dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getRank();

if (dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
{
//std::cout<<"data is already placed at rank "<<rank<<std::endl;
auto data =dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData();
MPIHelper::sendObjectIdentifier(data.identifier, rank+1);
MPIHelper::sendObjectIdentifier(data.identifier, rank);
// std::cout<<"Identifier ( "<<data.identifier<< " ) has been send to " <<(rank+1)<<std::endl;
continue;
}
targetGroup.push_back(rank+1);
targetGroup.push_back(rank);
}
if((int)targetGroup.size()==worldSize){
if((int)targetGroup.size()==MPIHelper::getCommSize() - 1){ // exclude coordinator
MPIHelper::sendData(messageLength, dataToSend);
// std::cout<<"data has been send to all "<<std::endl;
}
Expand Down Expand Up @@ -182,37 +165,15 @@ struct Broadcast<ALLOCATION_TYPE::DIST_GRPC, DT>
}
ProtoDataConverter<DenseMatrix<double>>::convertToProto(denseMat, protoMsg.mutable_matrix());
}
LoadPartitioningDistributed<DT, AllocationDescriptorGRPC> partioner(DistributionSchema::BROADCAST, mat, dctx);

Range range;
range.r_start = 0;
range.c_start = 0;
range.r_len = mat->getNumRows();
range.c_len = mat->getNumCols();
for (auto i=0ul; i < workers.size(); i++){
auto workerAddr = workers.at(i);

// If DataPlacement dp already exists simply
// update range (in case we have a different one) and distributed data
DataPlacement *dp;
if ((dp = mat->getMetaDataObject()->getDataPlacementByLocation(workerAddr))) {
mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range);
auto data = dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getDistributedData();
data.ix = DistributedIndex(0, 0);
dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).updateDistributedData(data);
}
else { // else create new dp entry
DistributedData data;
data.ix = DistributedIndex(0, 0);
AllocationDescriptorGRPC allocationDescriptor (dctx,
workerAddr,
data);
dp = mat->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range);
}
while(partioner.HasNextChunk()){
auto dp = partioner.GetNextChunk();
if (dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
continue;

StoredInfo storedInfo({dp->dp_id});
caller.asyncStoreCall(workerAddr, storedInfo, protoMsg);
caller.asyncStoreCall(dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getLocation(), storedInfo, protoMsg);
}

while (!caller.isQueueEmpty()){
Expand Down
124 changes: 24 additions & 100 deletions src/runtime/distributed/coordinator/kernels/Distribute.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <runtime/local/context/DistributedContext.h>
#include <runtime/local/datastructures/DataObjectFactory.h>
#include <runtime/local/datastructures/DenseMatrix.h>
#include <runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h>

#include <runtime/local/datastructures/AllocationDescriptorGRPC.h>
#include <runtime/distributed/proto/ProtoDataConverter.h>
Expand Down Expand Up @@ -64,70 +65,29 @@ void distribute(DT *mat, DCTX(dctx))
template<class DT>
struct Distribute<ALLOCATION_TYPE::DIST_MPI, DT>
{
static void apply(DT *mat, DCTX(dctx)) {
int worldSize= MPIHelper::getCommSize()-1; // exclude coordinator
size_t startRow=0, rowCount=0, startCol=0, colCount=0, remainingRows=0;
auto partitionSize = mat->getNumRows()/worldSize;
//if(partitionSize==0){
// throw std::runtime_error("number of workers is more than the work items, i.e., maximum number of workers is " + std::str(worldSize+1) + " \n");
//}

// this part is to handle the case when the number of worker is larger than the number of work items
if(partitionSize<1)
{
//std::cout<<"testing 1\n";
partitionSize=1;
worldSize=mat->getNumRows();
}
remainingRows=mat->getNumRows();
size_t messageLengths [worldSize];
static void apply(DT *mat, DCTX(dctx)) {
void *dataToSend;
std::vector<int> targetGroup;
for(int rank=0;rank<worldSize;rank++) //we currently exclude the coordinator
{
startRow= (rank * partitionSize);
if(rank==worldSize-1){
rowCount= remainingRows;
}
else{
rowCount = partitionSize;
}
remainingRows-=partitionSize;
colCount= mat->getNumCols();
startCol=0;
Range range;
range.r_start = startRow;
range.r_len = rowCount;
range.c_start = startCol;
range.c_len = colCount;
std::string address=std::to_string(rank+1);
DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address);
if (dp!=nullptr) {
mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range);
auto data = dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData();
data.ix = DistributedIndex(rank, 0);
dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).updateDistributedData(data);
}
else {
DistributedData data;
AllocationDescriptorMPI allocationDescriptor(rank+1,/*exclude coordinator*/
dctx,
data);
data.ix = DistributedIndex(rank, 0);
dp = mat->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range);
}

LoadPartitioningDistributed<DT, AllocationDescriptorMPI> partioner(DistributionSchema::DISTRIBUTE, mat, dctx);

while (partioner.HasNextChunk()){
DataPlacement *dp = partioner.GetNextChunk();
auto rank = dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getRank();

//std::cout<<"rank "<< rank+1<< " will work on rows from " << startRow << " to " << startRow+rowCount<<std::endl;
if (dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
{
// std::cout<<"worker already has the data"<<std::endl;
auto data = dynamic_cast<AllocationDescriptorMPI&>(*(dp->allocation)).getDistributedData();
MPIHelper::sendObjectIdentifier(data.identifier, rank+1);
MPIHelper::sendObjectIdentifier(data.identifier, rank);
//std::cout<<"Identifier ( "<<data.identifier<< " ) has been send to " <<(rank+1)<<std::endl;
continue;
}
MPISerializer::serializeStructure<DT>(&dataToSend, mat ,false, &messageLengths[rank], startRow, rowCount, startCol, colCount);
MPIHelper::distributeData(messageLengths[rank], dataToSend,rank+1);
targetGroup.push_back(rank+1);
size_t messageLength;
MPISerializer::serializeStructure<DT>(&dataToSend, mat ,false, &messageLength, dp->range->r_start, dp->range->r_len, dp->range->c_start, dp->range->c_len);
MPIHelper::distributeData(messageLength, dataToSend,rank);
targetGroup.push_back(rank);
free(dataToSend);
}
for(size_t i=0;i<targetGroup.size();i++)
Expand Down Expand Up @@ -169,48 +129,13 @@ struct Distribute<ALLOCATION_TYPE::DIST_GRPC, DT>
};

DistributedGRPCCaller<StoredInfo, distributed::Data, distributed::StoredData> caller;

auto ctx = DistributedContext::get(dctx);
auto workers = ctx->getWorkers();


assert(mat != nullptr);

auto r = 0ul;
for (auto workerIx = 0ul; workerIx < workers.size() && r < mat->getNumRows(); workerIx++) {
auto workerAddr = workers.at(workerIx);

auto k = mat->getNumRows() / workers.size();
auto m = mat->getNumRows() % workers.size();

Range range;
range.r_start = (workerIx * k) + std::min(workerIx, m);
range.r_len = ((workerIx + 1) * k + std::min(workerIx + 1, m)) - range.r_start;
range.c_start = 0;
range.c_len = mat->getNumCols();

// If dp already exists simply
// update range (in case we have a different one) and distribute data
DataPlacement *dp;
if ((dp = mat->getMetaDataObject()->getDataPlacementByLocation(workerAddr))) {
mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range);
auto data = dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getDistributedData();
// TODO Currently we do not support distributing/splitting
// by columns. When we do, this should be changed (e.g. Index(0, workerIx))
data.ix = DistributedIndex(workerIx, 0);
dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).updateDistributedData(data);
}
else { // Else, create new object metadata entry
DistributedData data;
// TODO Currently we do not support distributing/splitting
// by columns. When we do, this should be changed (e.g. Index(0, workerIx))
data.ix = DistributedIndex(workerIx, 0);
AllocationDescriptorGRPC allocationDescriptor(
dctx,
workerAddr,
data);
dp = mat->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range);
}
// keep track of processed rows

LoadPartitioningDistributed<DT, AllocationDescriptorGRPC> partioner(DistributionSchema::DISTRIBUTE, mat, dctx);

while (partioner.HasNextChunk()){
auto dp = partioner.GetNextChunk();
// Skip if already placed at workers
if (dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getDistributedData().isPlacedAtWorker)
continue;
Expand All @@ -224,14 +149,13 @@ struct Distribute<ALLOCATION_TYPE::DIST_GRPC, DT>
throw std::runtime_error("Distribute grpc only supports DenseMatrix<double> for now");
}
ProtoDataConverter<DenseMatrix<double>>::convertToProto(denseMat, protoMsg.mutable_matrix(),
range.r_start,
range.r_start + range.r_len,
range.c_start,
range.c_start + range.c_len);
dp->range->r_start,
dp->range->r_start + dp->range->r_len,
dp->range->c_start,
dp->range->c_start + dp->range->c_len);

StoredInfo storedInfo({dp->dp_id});
caller.asyncStoreCall(dynamic_cast<AllocationDescriptorGRPC&>(*(dp->allocation)).getLocation(), storedInfo, protoMsg);
r = (workerIx + 1) * k + std::min(workerIx + 1, m);
}


Expand Down
Loading