diff --git a/src/runtime/distributed/coordinator/kernels/Broadcast.h b/src/runtime/distributed/coordinator/kernels/Broadcast.h index 129e9b781..ff1e03be9 100644 --- a/src/runtime/distributed/coordinator/kernels/Broadcast.h +++ b/src/runtime/distributed/coordinator/kernels/Broadcast.h @@ -26,6 +26,7 @@ #include #include #include +#include #ifdef USE_MPI #include @@ -75,41 +76,23 @@ struct Broadcast //auto ptr = (double*)(&mat); MPISerializer::serializeStructure
(&dataToSend, mat, isScalar, &messageLength); std::vector targetGroup; // We will not be able to take the advantage of broadcast if some mpi processes have the data - int worldSize = MPIHelper::getCommSize(); - worldSize--; // to exclude coordinator - Range range; - range.r_start = 0; - range.c_start = 0; - range.r_len = mat->getNumRows(); - range.c_len = mat->getNumCols(); - for (int rank=0;rankgetMetaDataObject()->getDataPlacementByLocation(address); - if (dp!=nullptr) { - mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); - data.ix = DistributedIndex(0, 0); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); - } - else { // else create new dp entry - DistributedData data; - data.ix = DistributedIndex(0, 0); - AllocationDescriptorMPI allocationDescriptor (rank+1, /* to exclude coordinator*/ - dctx, - data); - dp = mat->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); - } + + LoadPartitioningDistributed partioner(DistributionSchema::BROADCAST, mat, dctx); + while (partioner.HasNextChunk()){ + auto dp = partioner.GetNextChunk(); + auto rank = dynamic_cast(*(dp->allocation)).getRank(); + if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) { //std::cout<<"data is already placed at rank "<(*(dp->allocation)).getDistributedData(); - MPIHelper::sendObjectIdentifier(data.identifier, rank+1); + MPIHelper::sendObjectIdentifier(data.identifier, rank); // std::cout<<"Identifier ( "< } ProtoDataConverter>::convertToProto(denseMat, protoMsg.mutable_matrix()); } + LoadPartitioningDistributed partioner(DistributionSchema::BROADCAST, mat, dctx); - Range range; - range.r_start = 0; - range.c_start = 0; - range.r_len = mat->getNumRows(); - range.c_len = mat->getNumCols(); - for (auto i=0ul; i < workers.size(); i++){ - auto workerAddr = workers.at(i); - - // If DataPlacement dp already exists simply - // update range (in case we have a different one) and distributed data - DataPlacement *dp; - if ((dp = mat->getMetaDataObject()->getDataPlacementByLocation(workerAddr))) { - mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); - data.ix = DistributedIndex(0, 0); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); - } - else { // else create new dp entry - DistributedData data; - data.ix = DistributedIndex(0, 0); - AllocationDescriptorGRPC allocationDescriptor (dctx, - workerAddr, - data); - dp = mat->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); - } + while(partioner.HasNextChunk()){ + auto dp = partioner.GetNextChunk(); if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) continue; StoredInfo storedInfo({dp->dp_id}); - caller.asyncStoreCall(workerAddr, storedInfo, protoMsg); + caller.asyncStoreCall(dynamic_cast(*(dp->allocation)).getLocation(), storedInfo, protoMsg); } while (!caller.isQueueEmpty()){ diff --git a/src/runtime/distributed/coordinator/kernels/Distribute.h b/src/runtime/distributed/coordinator/kernels/Distribute.h index 23b2b854f..17ea377f6 100644 --- a/src/runtime/distributed/coordinator/kernels/Distribute.h +++ b/src/runtime/distributed/coordinator/kernels/Distribute.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include @@ -64,70 +65,29 @@ void distribute(DT *mat, DCTX(dctx)) template struct Distribute { - static void apply(DT *mat, DCTX(dctx)) { - int worldSize= MPIHelper::getCommSize()-1; // exclude coordinator - size_t startRow=0, rowCount=0, startCol=0, colCount=0, remainingRows=0; - auto partitionSize = mat->getNumRows()/worldSize; - //if(partitionSize==0){ - // throw std::runtime_error("number of workers is more than the work items, i.e., maximum number of workers is " + std::str(worldSize+1) + " \n"); - //} - - // this part is to handle the case when the number of worker is larger than the number of work items - if(partitionSize<1) - { - //std::cout<<"testing 1\n"; - partitionSize=1; - worldSize=mat->getNumRows(); - } - remainingRows=mat->getNumRows(); - size_t messageLengths [worldSize]; + static void apply(DT *mat, DCTX(dctx)) { void *dataToSend; std::vector targetGroup; - for(int rank=0;rankgetNumCols(); - startCol=0; - Range range; - range.r_start = startRow; - range.r_len = rowCount; - range.c_start = startCol; - range.c_len = colCount; - std::string address=std::to_string(rank+1); - DataPlacement *dp = mat->getMetaDataObject()->getDataPlacementByLocation(address); - if (dp!=nullptr) { - mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); - data.ix = DistributedIndex(rank, 0); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); - } - else { - DistributedData data; - AllocationDescriptorMPI allocationDescriptor(rank+1,/*exclude coordinator*/ - dctx, - data); - data.ix = DistributedIndex(rank, 0); - dp = mat->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); - } + + LoadPartitioningDistributed partioner(DistributionSchema::DISTRIBUTE, mat, dctx); + + while (partioner.HasNextChunk()){ + DataPlacement *dp = partioner.GetNextChunk(); + auto rank = dynamic_cast(*(dp->allocation)).getRank(); + //std::cout<<"rank "<< rank+1<< " will work on rows from " << startRow << " to " << startRow+rowCount<(*(dp->allocation)).getDistributedData().isPlacedAtWorker) { // std::cout<<"worker already has the data"<(*(dp->allocation)).getDistributedData(); - MPIHelper::sendObjectIdentifier(data.identifier, rank+1); + MPIHelper::sendObjectIdentifier(data.identifier, rank); //std::cout<<"Identifier ( "<(&dataToSend, mat ,false, &messageLengths[rank], startRow, rowCount, startCol, colCount); - MPIHelper::distributeData(messageLengths[rank], dataToSend,rank+1); - targetGroup.push_back(rank+1); + size_t messageLength; + MPISerializer::serializeStructure
(&dataToSend, mat ,false, &messageLength, dp->range->r_start, dp->range->r_len, dp->range->c_start, dp->range->c_len); + MPIHelper::distributeData(messageLength, dataToSend,rank); + targetGroup.push_back(rank); free(dataToSend); } for(size_t i=0;i }; DistributedGRPCCaller caller; - - auto ctx = DistributedContext::get(dctx); - auto workers = ctx->getWorkers(); - + assert(mat != nullptr); - - auto r = 0ul; - for (auto workerIx = 0ul; workerIx < workers.size() && r < mat->getNumRows(); workerIx++) { - auto workerAddr = workers.at(workerIx); - - auto k = mat->getNumRows() / workers.size(); - auto m = mat->getNumRows() % workers.size(); - - Range range; - range.r_start = (workerIx * k) + std::min(workerIx, m); - range.r_len = ((workerIx + 1) * k + std::min(workerIx + 1, m)) - range.r_start; - range.c_start = 0; - range.c_len = mat->getNumCols(); - - // If dp already exists simply - // update range (in case we have a different one) and distribute data - DataPlacement *dp; - if ((dp = mat->getMetaDataObject()->getDataPlacementByLocation(workerAddr))) { - mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); - auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); - // TODO Currently we do not support distributing/splitting - // by columns. When we do, this should be changed (e.g. Index(0, workerIx)) - data.ix = DistributedIndex(workerIx, 0); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); - } - else { // Else, create new object metadata entry - DistributedData data; - // TODO Currently we do not support distributing/splitting - // by columns. When we do, this should be changed (e.g. Index(0, workerIx)) - data.ix = DistributedIndex(workerIx, 0); - AllocationDescriptorGRPC allocationDescriptor( - dctx, - workerAddr, - data); - dp = mat->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); - } - // keep track of processed rows + + LoadPartitioningDistributed partioner(DistributionSchema::DISTRIBUTE, mat, dctx); + + while (partioner.HasNextChunk()){ + auto dp = partioner.GetNextChunk(); // Skip if already placed at workers if (dynamic_cast(*(dp->allocation)).getDistributedData().isPlacedAtWorker) continue; @@ -224,14 +149,13 @@ struct Distribute throw std::runtime_error("Distribute grpc only supports DenseMatrix for now"); } ProtoDataConverter>::convertToProto(denseMat, protoMsg.mutable_matrix(), - range.r_start, - range.r_start + range.r_len, - range.c_start, - range.c_start + range.c_len); + dp->range->r_start, + dp->range->r_start + dp->range->r_len, + dp->range->c_start, + dp->range->c_start + dp->range->c_len); StoredInfo storedInfo({dp->dp_id}); caller.asyncStoreCall(dynamic_cast(*(dp->allocation)).getLocation(), storedInfo, protoMsg); - r = (workerIx + 1) * k + std::min(workerIx + 1, m); } diff --git a/src/runtime/distributed/coordinator/kernels/DistributedCompute.h b/src/runtime/distributed/coordinator/kernels/DistributedCompute.h index 43fd2ef01..dd3f37424 100644 --- a/src/runtime/distributed/coordinator/kernels/DistributedCompute.h +++ b/src/runtime/distributed/coordinator/kernels/DistributedCompute.h @@ -25,6 +25,7 @@ #include #include #include +#include #ifdef USE_MPI #include #endif @@ -75,66 +76,11 @@ struct DistributedCompute VectorCombine *vectorCombine, DCTX(dctx)) { - int worldSize= MPIHelper::getCommSize()-1; // exclude coordinator - // Initialize Distributed index array, needed for results - for (size_t i = 0; i < numOutputs; i++) - { - size_t partitionSize=0, remainingSize=0, rowCount=0,colCount=0; // startRow=0, startCol=0; - auto combineType = vectorCombine[i]; - remainingSize = (combineType==VectorCombine::ROWS)? (*res[i])->getNumRows(): (*res[i])->getNumCols(); - partitionSize = (combineType==VectorCombine::ROWS)? (*res[i])->getNumRows()/worldSize: (*res[i])->getNumCols()/worldSize; - if(partitionSize<1) - { - partitionSize = 1; - worldSize= (combineType==VectorCombine::ROWS)? (*res[i])->getNumRows() : (*res[i])->getNumCols(); - } - for(int rank=0; rankgetNumCols(); - range.r_start = data.ix.getRow() * partitionSize; - range.r_len = rowCount; - range.c_start = 0; - range.c_len = colCount; - } - if (vectorCombine[i] == VectorCombine::COLS) { - data.ix = DistributedIndex(0, rank); - rowCount= (*res[i])->getNumRows(); - range.r_start = 0; - range.r_len = rowCount; - range.c_start = data.ix.getCol() * partitionSize; - range.c_len = colCount; - } - // std::cout<<"rank "<< rank+1 <<" Range rows from "<< range.r_start <<" to " <<( range.r_len + range.r_start)<< " cols from " <getMetaDataObject()->getDataPlacementByLocation(addr)) { - (*res[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); - } - else { // else create new dp entry - AllocationDescriptorMPI allocationDescriptor( - rank+1, - dctx, - data); - ((*res[i]))->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); - } - } - } + int worldSize = MPIHelper::getCommSize() - 1; // exclude coordinator + + LoadPartitioningDistributed::SetOutputsMetadata(res, numOutputs, vectorCombine, dctx); + + std::vector taskBuffer; void *taskToSend; size_t messageLengths[worldSize]; for (int rank=0;rank std::string addr; }; DistributedGRPCCaller caller; - - // Initialize Distributed index array, needed for results - std::vector ix(numOutputs, DistributedIndex(0, 0)); + + // Set output meta data + LoadPartitioningDistributed::SetOutputsMetadata(res, numOutputs, vectorCombine, dctx); // Iterate over workers // Pass all the nessecary arguments for the pipeline for (auto addr : workers) { - // Set output meta data - for (size_t i = 0; i < numOutputs; i++){ - // Get Result ranges - auto combineType = vectorCombine[i]; - auto workersSize = workers.size(); - size_t k = 0, m = 0; - if (combineType == VectorCombine::ROWS) { - k = (*res[i])->getNumRows() / workersSize; - m = (*res[i])->getNumRows() % workersSize; - } - else if (combineType == VectorCombine::COLS){ - k = (*res[i])->getNumCols() / workersSize; - m = (*res[i])->getNumCols() % workersSize; - } - else - assert(!"Only Rows/Cols combineType supported atm"); - - DistributedData data; - data.ix = ix[i]; - data.vectorCombine = vectorCombine[i]; - data.isPlacedAtWorker = true; - - // Update distributed index for next iteration - // and set ranges for objmetadata - Range range; - if (vectorCombine[i] == VectorCombine::ROWS) { - ix[i] = DistributedIndex(ix[i].getRow() + 1, ix[i].getCol()); - - range.r_start = data.ix.getRow() * k + std::min(data.ix.getRow(), m); - range.r_len = ((data.ix.getRow() + 1) * k + std::min((data.ix.getRow() + 1), m)) - range.r_start; - range.c_start = 0; - range.c_len = (*res[i])->getNumCols(); - } - if (vectorCombine[i] == VectorCombine::COLS) { - ix[i] = DistributedIndex(ix[i].getRow(), ix[i].getCol() + 1); - - range.r_start = 0; - range.r_len = (*res[i])->getNumRows(); - range.c_start = data.ix.getCol() * k + std::min(data.ix.getCol(), m); - range.c_len = ((data.ix.getCol() + 1) * k + std::min((data.ix.getCol() + 1), m)) - range.c_start; - } - - // If dp already exists for this worker, update the range and data - if (auto dp = (*res[i])->getMetaDataObject()->getDataPlacementByLocation(addr)) { - (*res[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); - dynamic_cast(*(dp->allocation)).updateDistributedData(data); - } - else { // else create new dp entry - AllocationDescriptorGRPC allocationDescriptor( - dctx, - addr, - data); - ((*res[i]))->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); - } - } distributed::Task task; for (size_t i = 0; i < numInputs; i++){ diff --git a/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h b/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h new file mode 100644 index 000000000..2c1654640 --- /dev/null +++ b/src/runtime/distributed/coordinator/scheduling/LoadPartitioningDistributed.h @@ -0,0 +1,220 @@ +/* + * Copyright 2021 The DAPHNE Consortium + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +using mlir::daphne::VectorCombine; + + +enum class DistributionSchema{ + DISTRIBUTE = 1, + BROADCAST = 2 +}; + +template +class LoadPartitioningDistributed { +private: + DistributionSchema distrschema; + DT *mat; + std::vector workerList; + size_t taskIndex = 0; + size_t totalTasks; + DaphneContext *dctx; + +public: + + LoadPartitioningDistributed(DistributionSchema schema, DT *&mat, DCTX(dctx)) : + distrschema(schema), + mat(mat), + dctx(dctx) + { + auto ctx = DistributedContext::get(dctx); + workerList = ctx->getWorkers(); + totalTasks = workerList.size(); + }; + + bool HasNextChunk(){ + return taskIndex < totalTasks; + }; + + // Each allocation descriptor might use a different constructor. + // Here we provide the different implementations. + // Another solution would be to make sure that every constructor is similar so this would not be needed. + static ALLOCATOR CreateAllocatorDescriptor(DaphneContext* ctx, const std::string &addr, const DistributedData &data) { + if constexpr (std::is_same_v) + return AllocationDescriptorMPI(std::stoi(addr), ctx, data); + else if constexpr (std::is_same_v) + return AllocationDescriptorGRPC(ctx, addr, data); + else + throw std::runtime_error("Unknown allocation type"); + } + + // Set ranges + Range CreateRange() { + switch (distrschema) { + case DistributionSchema::DISTRIBUTE: { + // Todo support for different distribution schemas + auto k = mat->getNumRows() / workerList.size(); + auto m = mat->getNumRows() % workerList.size(); + return Range( + (taskIndex * k) + std::min(taskIndex, m), + 0, + ((taskIndex + 1) * k + std::min(taskIndex + 1, m)) - ((taskIndex * k) + std::min(taskIndex, m)), + mat->getNumCols() + ); + break; + } + case DistributionSchema::BROADCAST: + return Range( + 0, + 0, + mat->getNumRows(), + mat->getNumCols() + ); + break; + default: + throw std::runtime_error("Unknown distribution scheme"); + } + } + + // Update current distributed index object based on distribution schema + DistributedIndex GetDistributedIndex() { + switch (distrschema) { + case DistributionSchema::DISTRIBUTE: + // Todo support for different distribution schemas + return DistributedIndex(taskIndex, 0); + case DistributionSchema::BROADCAST: + return DistributedIndex(0, 0); + default: + throw std::runtime_error("Unknown distribution scheme"); + } + } + + DataPlacement * GetNextChunk(){ + + auto workerAddr = workerList.at(taskIndex); + + auto range = CreateRange(); + + DataPlacement *dp; + if ((dp = mat->getMetaDataObject()->getDataPlacementByLocation(workerAddr))) { + auto data = dynamic_cast(*(dp->allocation)).getDistributedData(); + + // Check if existing placement matches the same ranges we currently need + auto existingRange = dp->range.get(); + if (*existingRange == range) + data.isPlacedAtWorker = true; + else { + mat->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); + data.isPlacedAtWorker = false; + } + // TODO Currently we do not support distributing/splitting + // by columns. When we do, this should be changed (e.g. Index(0, taskIndex)) + // This can be decided based on DistributionSchema + data.ix = GetDistributedIndex(); + dynamic_cast(*(dp->allocation)).updateDistributedData(data); + } + else { // Else, create new object metadata entry + DistributedData data; + // TODO Currently we do not support distributing/splitting + // by columns. When we do, this should be changed (e.g. Index(0, taskIndex)) + data.ix = GetDistributedIndex(); + auto allocationDescriptor = CreateAllocatorDescriptor(dctx, workerAddr, data); + dp = mat->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); + } + taskIndex++; + return dp; + } + + static void SetOutputsMetadata(DT **&outputs, size_t numOutputs, VectorCombine *&vectorCombine, DCTX(dctx)) { + auto ctx = DistributedContext::get(dctx); + auto workers = ctx->getWorkers(); + // Initialize Distributed index array, needed for results + std::vector ix(numOutputs, DistributedIndex(0, 0)); + for (auto workerAddr : workers) { + for (size_t i = 0; i < numOutputs; i++) + { + // Get Result ranges + // TODO Seperate this into a different function and implement different strategies + auto combineType = vectorCombine[i]; + auto workersSize = workers.size(); + size_t k = 0, m = 0; + if (combineType == VectorCombine::ROWS) + { + k = (*outputs[i])->getNumRows() / workersSize; + m = (*outputs[i])->getNumRows() % workersSize; + } + else if (combineType == VectorCombine::COLS) + { + k = (*outputs[i])->getNumCols() / workersSize; + m = (*outputs[i])->getNumCols() % workersSize; + } + else + assert(!"Only Rows/Cols combineType supported atm"); + + DistributedData data; + data.ix = ix[i]; + data.vectorCombine = vectorCombine[i]; + data.isPlacedAtWorker = true; + + // Update distributed index for next iteration + // and set ranges for objmetadata + Range range; + if (vectorCombine[i] == VectorCombine::ROWS) + { + ix[i] = DistributedIndex(ix[i].getRow() + 1, ix[i].getCol()); + + range.r_start = data.ix.getRow() * k + std::min(data.ix.getRow(), m); + range.r_len = ((data.ix.getRow() + 1) * k + std::min((data.ix.getRow() + 1), m)) - range.r_start; + range.c_start = 0; + range.c_len = (*outputs[i])->getNumCols(); + } + if (vectorCombine[i] == VectorCombine::COLS) + { + ix[i] = DistributedIndex(ix[i].getRow(), ix[i].getCol() + 1); + + range.r_start = 0; + range.r_len = (*outputs[i])->getNumRows(); + range.c_start = data.ix.getCol() * k + std::min(data.ix.getCol(), m); + range.c_len = ((data.ix.getCol() + 1) * k + std::min((data.ix.getCol() + 1), m)) - range.c_start; + } + + // If dp already exists for this worker, update the range and data + if (auto dp = (*outputs[i])->getMetaDataObject()->getDataPlacementByLocation(workerAddr)) + { + (*outputs[i])->getMetaDataObject()->updateRangeDataPlacementByID(dp->dp_id, &range); + dynamic_cast(*(dp->allocation)).updateDistributedData(data); + } + else + { // else create new dp entry + auto allocationDescriptor = CreateAllocatorDescriptor(dctx, workerAddr, data); + ((*outputs[i]))->getMetaDataObject()->addDataPlacement(&allocationDescriptor, &range); + } + } + } + } +}; \ No newline at end of file diff --git a/src/runtime/distributed/worker/MPIHelper.h b/src/runtime/distributed/worker/MPIHelper.h index 3b71ad41f..c72d744cd 100644 --- a/src/runtime/distributed/worker/MPIHelper.h +++ b/src/runtime/distributed/worker/MPIHelper.h @@ -4,6 +4,7 @@ #include #include #include +#include #include #include #include diff --git a/src/runtime/local/context/DistributedContext.h b/src/runtime/local/context/DistributedContext.h index d6c4bef17..268aa9cd7 100644 --- a/src/runtime/local/context/DistributedContext.h +++ b/src/runtime/local/context/DistributedContext.h @@ -17,7 +17,9 @@ #pragma once #include - +#ifdef USE_MPI + #include +#endif #include #include #include @@ -29,30 +31,40 @@ class DistributedContext final : public IContext { private: std::vector workers; public: - DistributedContext() { - - // TODO: Get the list of distributed workers from daphne user config/cli arguments and - // keep environmental variables optional. - auto envVar = std::getenv("DISTRIBUTED_WORKERS"); + DistributedContext(const DaphneUserConfig &cfg) { - if (envVar == nullptr) { - throw std::runtime_error("--distributed execution is set but EV DISTRIBUTED_WORKERS is empty"); - } + if (cfg.distributedBackEndSetup == ALLOCATION_TYPE::DIST_GRPC) { + // TODO: Get the list of distributed workers from daphne user config/cli arguments and + // keep environmental variables optional. + auto envVar = std::getenv("DISTRIBUTED_WORKERS"); + if (envVar == nullptr) { + throw std::runtime_error("--distributed execution is set with gRPC but EV DISTRIBUTED_WORKERS is empty"); + } - std::string workersStr(envVar); - std::string delimiter(","); + std::string workersStr(envVar); + std::string delimiter(","); - size_t pos; - while ((pos = workersStr.find(delimiter)) != std::string::npos) { - workers.push_back(workersStr.substr(0, pos)); - workersStr.erase(0, pos + delimiter.size()); + size_t pos; + while ((pos = workersStr.find(delimiter)) != std::string::npos) { + workers.push_back(workersStr.substr(0, pos)); + workersStr.erase(0, pos + delimiter.size()); + } + workers.push_back(workersStr); + } else if (cfg.distributedBackEndSetup == ALLOCATION_TYPE::DIST_MPI) { +#ifdef USE_MPI + // Exclude Coordinator + size_t worldSize = MPIHelper::getCommSize(); + // We use strings for the addresses for consistency with other frameworks (e.g. gRPC) + // Exclude coordinator + for (size_t i = 1; i < worldSize; i++) + workers.push_back(std::to_string(i)); +#endif } - workers.push_back(workersStr); } ~DistributedContext() = default; - static std::unique_ptr createDistributedContext() { - auto ctx = std::unique_ptr(new DistributedContext()); + static std::unique_ptr createDistributedContext(const DaphneUserConfig &cfg) { + auto ctx = std::unique_ptr(new DistributedContext(cfg)); return ctx; }; diff --git a/src/runtime/local/datastructures/AllocationDescriptorMPI.h b/src/runtime/local/datastructures/AllocationDescriptorMPI.h index e5dfc5fba..a5ee9110b 100644 --- a/src/runtime/local/datastructures/AllocationDescriptorMPI.h +++ b/src/runtime/local/datastructures/AllocationDescriptorMPI.h @@ -67,6 +67,8 @@ class AllocationDescriptorMPI : public IAllocationDescriptor { { return distributedData; } void updateDistributedData(DistributedData data_) { distributedData = data_; } + int getRank() + { return processRankID; } }; #endif //SRC_RUNTIME_LOCAL_DATASTRUCTURE_ALLOCATION_DESCRIPTORMPH_H diff --git a/src/runtime/local/kernels/CreateDistributedContext.h b/src/runtime/local/kernels/CreateDistributedContext.h index 82df55aaa..f07c80961 100644 --- a/src/runtime/local/kernels/CreateDistributedContext.h +++ b/src/runtime/local/kernels/CreateDistributedContext.h @@ -24,5 +24,5 @@ // **************************************************************************** static void createDistributedContext(DCTX(ctx)) { - ctx->distributed_context = DistributedContext::createDistributedContext(); + ctx->distributed_context = DistributedContext::createDistributedContext(ctx->config); }