Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] [NPU] Using global command queue #28745

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ struct Pipeline {
void closeCommandListIndex(size_t command_list_index);

protected:
void getCommandQueue();

std::shared_ptr<ZeroInitStructsHolder> _init_structs;
std::shared_ptr<IGraph> _graph;
const Config _config;
const uint32_t _id;
Expand All @@ -59,9 +62,11 @@ struct Pipeline {
std::vector<std::unique_ptr<Fence>> _fences;
std::shared_ptr<EventPool> _event_pool;
std::vector<std::shared_ptr<Event>> _events;
bool sync_output_with_fences_ = true;
bool _sync_output_with_fences = true;
std::shared_ptr<zeroProfiling::NpuInferProfiling> _npu_profiling;
Logger _logger;

std::mutex _mutex;
};

} // namespace intel_npu
106 changes: 80 additions & 26 deletions src/plugins/intel_npu/src/backend/src/zero_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,21 @@
#include "intel_npu/utils/zero/zero_types.hpp"
#include "zero_remote_tensor.hpp"

namespace {

template <class t>
bool compare_shared_ptr(const std::shared_ptr<t>& a, const std::shared_ptr<t>& b) {
if (a == b) {
return true;
}
if (a && b) {
return a.get() == b.get();
}
return false;
}

} // namespace

namespace intel_npu {

Pipeline::Pipeline(const Config& config,
Expand All @@ -26,14 +41,11 @@ Pipeline::Pipeline(const Config& config,
const std::vector<std::vector<std::shared_ptr<ov::ITensor>>>& input_tensors,
const std::vector<std::shared_ptr<ov::ITensor>>& output_tensors,
uint32_t group_ordinal)
: _graph(graph),
: _init_structs(init_structs),
_graph(graph),
_config(config),
_id(_graph->get_unique_id()),
_number_of_command_lists(_graph->get_batch_size().has_value() ? *_graph->get_batch_size() : 1),
_event_pool{
std::make_shared<EventPool>(init_structs->getDevice(),
init_structs->getContext(),
_number_of_command_lists ? static_cast<uint32_t>(_number_of_command_lists) : 1)},
_npu_profiling(npu_profiling),
_logger("Pipeline", _config.get<LOG_LEVEL>()) {
OV_ITT_SCOPED_TASK(itt::domains::LevelZeroBackend, "Zero_infer_request::Pipeline::Pipeline");
Expand All @@ -43,22 +55,43 @@ Pipeline::Pipeline(const Config& config,
profiling_query.create(profiling_pool._handle);
}

OPENVINO_ASSERT(_sync_output_with_fences || !_config.get<RUN_INFERENCES_SEQUENTIALLY>(),
"In-order execution doesn't work in case synchronization of the inferences is done using events");

if (!_sync_output_with_fences || _config.get<RUN_INFERENCES_SEQUENTIALLY>()) {
_event_pool =
std::make_shared<EventPool>(_init_structs->getDevice(),
_init_structs->getContext(),
_number_of_command_lists ? static_cast<uint32_t>(_number_of_command_lists) : 1);

_events.reserve(_number_of_command_lists);
for (size_t i = 0; i < _number_of_command_lists; i++) {
_events.emplace_back(std::make_shared<Event>(_event_pool, static_cast<uint32_t>(i)));
}
}

_command_lists.reserve(_number_of_command_lists);
_events.reserve(_number_of_command_lists);
_fences.reserve(_number_of_command_lists);
_logger.debug("Pipeline - emplace_back _event_pool and _command_queue");
for (size_t i = 0; i < _number_of_command_lists; i++) {
_command_lists.emplace_back(
std::make_unique<CommandList>(init_structs,
std::make_unique<CommandList>(_init_structs,
group_ordinal,
init_structs->getMutableCommandListVersion() ? true : false));
_events.emplace_back(std::make_shared<Event>(_event_pool, static_cast<uint32_t>(i)));
_fences.emplace_back(std::make_unique<Fence>(*_graph->get_command_queue()));
_init_structs->getMutableCommandListVersion() ? true : false));
}

_command_queue = _graph->get_command_queue();

if (_sync_output_with_fences) {
_fences.resize(_number_of_command_lists);

for (size_t i = 0; i < _number_of_command_lists; i++) {
_logger.debug("Pipeline - getCommandQueue() - create new fence");
_fences[i] = std::make_unique<Fence>(_command_queue);
}
}

for (size_t i = 0; i < _number_of_command_lists; i++) {
size_t io_index = 0;
for (const auto& desc : graph->get_input_descriptors()) {
for (const auto& desc : _graph->get_input_descriptors()) {
if (input_tensors.at(io_index).size() > 1) {
void* data = nullptr;
auto remote_tensor = std::dynamic_pointer_cast<ZeroRemoteTensor>(input_tensors.at(io_index).at(i));
Expand All @@ -68,7 +101,7 @@ Pipeline::Pipeline(const Config& config,
data = remote_tensor->get_original_memory();
}

graph->set_argument_value(desc.idx, data);
_graph->set_argument_value(desc.idx, data);

++io_index;
continue;
Expand All @@ -82,7 +115,7 @@ Pipeline::Pipeline(const Config& config,
data = remote_tensor->get_original_memory();
}

graph->set_argument_value(
_graph->set_argument_value(
desc.idx,
static_cast<unsigned char*>(data) +
(i * input_tensors.at(io_index).at(0)->get_byte_size()) / _number_of_command_lists);
Expand All @@ -91,7 +124,7 @@ Pipeline::Pipeline(const Config& config,
}

io_index = 0;
for (const auto& desc : graph->get_output_descriptors()) {
for (const auto& desc : _graph->get_output_descriptors()) {
void* data = nullptr;
auto remote_tensor = std::dynamic_pointer_cast<ZeroRemoteTensor>(output_tensors.at(io_index));
if (remote_tensor == nullptr) {
Expand All @@ -100,7 +133,7 @@ Pipeline::Pipeline(const Config& config,
data = remote_tensor->get_original_memory();
}

graph->set_argument_value(
_graph->set_argument_value(
desc.idx,
static_cast<unsigned char*>(data) +
(i * output_tensors.at(io_index)->get_byte_size()) / _number_of_command_lists);
Expand All @@ -119,7 +152,7 @@ Pipeline::Pipeline(const Config& config,
_command_lists.at(i)->appendNpuTimestamp(reinterpret_cast<uint64_t*>(_npu_profiling->npu_ts_infer_start));
}

_command_lists.at(i)->appendGraphExecute(static_cast<ze_graph_handle_t>(graph->get_handle()),
_command_lists.at(i)->appendGraphExecute(static_cast<ze_graph_handle_t>(_graph->get_handle()),
profiling_query.getHandle());

/// append timestamp command if feature was activated
Expand All @@ -138,7 +171,7 @@ Pipeline::Pipeline(const Config& config,
}

// appendBarrier used in L0 as well
if (!sync_output_with_fences_) {
if (!_sync_output_with_fences) {
_command_lists.at(i)->appendBarrier();
_events.at(i)->AppendSignalEvent(*_command_lists.at(i));
}
Expand All @@ -147,9 +180,30 @@ Pipeline::Pipeline(const Config& config,
_logger.debug("Pipeline - initialize completed");
}

void Pipeline::getCommandQueue() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: a bit weird to call a function a "getter" without returning anything.

_logger.debug("Pipeline - getCommandQueue() started");

std::lock_guard<std::mutex> lock(_mutex);

if (!compare_shared_ptr(_command_queue, _graph->get_command_queue())) {
_command_queue = _graph->get_command_queue();

if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
_logger.debug("Pipeline - getCommandQueue() - create new fence");
_fences[i] = std::make_unique<Fence>(_command_queue);
}
}
}

_logger.debug("Pipeline - getCommandQueue() completed");
}

void Pipeline::push() {
_logger.debug("Pipeline - push() started");

getCommandQueue();

if (_config.get<RUN_INFERENCES_SEQUENTIALLY>()) {
if (_id) {
auto previousIndex = _graph->get_last_submitted_id();
Expand All @@ -164,10 +218,10 @@ void Pipeline::push() {

for (size_t i = 0; i < _command_lists.size(); ++i) {
OV_ITT_TASK_CHAIN(ZERO_PIPELINE_IP_PUSH, itt::domains::LevelZeroBackend, "Pipeline", "push");
if (sync_output_with_fences_) {
_graph->get_command_queue()->executeCommandList(*_command_lists.at(i), *_fences.at(i));
if (_sync_output_with_fences) {
_command_queue->executeCommandList(*_command_lists.at(i), *_fences.at(i));
} else {
_graph->get_command_queue()->executeCommandList(*_command_lists.at(i));
_command_queue->executeCommandList(*_command_lists.at(i));
}
}

Expand All @@ -179,7 +233,7 @@ void Pipeline::pull() {
OV_ITT_TASK_CHAIN(ZERO_PIPELINE_IP_PULL, itt::domains::LevelZeroBackend, "Pipeline", "pull");

for (size_t i = 0; i < _command_lists.size(); ++i) {
if (sync_output_with_fences_) {
if (_sync_output_with_fences) {
_fences.at(i)->hostSynchronize();
} else {
_events.at(i)->hostSynchronize();
Expand All @@ -194,17 +248,17 @@ void Pipeline::pull() {
};

void Pipeline::reset() const {
_logger.debug("Pipeline - rest() started");
_logger.debug("Pipeline - reset() started");

for (size_t i = 0; i < _command_lists.size(); ++i) {
if (sync_output_with_fences_) {
if (_sync_output_with_fences) {
_fences.at(i)->reset();
} else {
_events.at(i)->reset();
}
}

_logger.debug("Pipeline - rest() completed");
_logger.debug("Pipeline - reset() completed");
};

void Pipeline::updateCommandList(uint32_t arg_index, const void* arg_data, size_t byte_size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ class IGraph : public std::enable_shared_from_this<IGraph> {

const std::vector<ArgumentDescriptor>& get_input_descriptors() const;
const std::vector<ArgumentDescriptor>& get_output_descriptors() const;

const std::shared_ptr<CommandQueue>& get_command_queue() const;

void set_workload_type(const ov::WorkloadType workloadType) const;
virtual void set_workload_type(const ov::WorkloadType workloadType) = 0;

std::mutex& get_mutex();

Expand All @@ -59,8 +60,8 @@ class IGraph : public std::enable_shared_from_this<IGraph> {

protected:
/**
* @brief Determines if batching can be addressed inside the plugin. In the positive case, the batch size used by
* the model will also be deduced and returned.
* @brief Determines if batching can be addressed inside the plugin. In the positive case, the batch size used
* by the model will also be deduced and returned.
* @details Batching can be handled by the plugin only if:
* - The batch axis is the first axis.
* - The batch size received by the compiler takes the default value of 1.
Expand All @@ -72,22 +73,23 @@ class IGraph : public std::enable_shared_from_this<IGraph> {
*
* @param metadata Metadata containing the shape values as seen by both the compiler and IR model. These will
* ultimately be used for determining the batch size.
* @returns The batch size deduced by the algorithm or the default value of 1 if batching cannot be performed inside
* the plugin.
* @returns The batch size deduced by the algorithm or the default value of 1 if batching cannot be performed
* inside the plugin.
*/
std::optional<size_t> get_batch_size(const NetworkMetadata& metadata);

virtual void create_new_command_queue() = 0;

ze_graph_handle_t _handle = nullptr;
NetworkMetadata _metadata;

std::vector<ArgumentDescriptor> _input_descriptors;
std::vector<ArgumentDescriptor> _output_descriptors;

std::shared_ptr<CommandQueue> _command_queue;
std::vector<std::shared_ptr<Event>> _last_submitted_event;

// Used to protect zero pipeline creation in the graph. The pipeline should be created only once per graph when the
// first inference starts running
// Used to protect zero pipeline creation in the graph. The pipeline should be created only once per graph when
// the first inference starts running
std::mutex _mutex;

std::unique_ptr<BlobContainer> _blobPtr;
Expand All @@ -101,6 +103,12 @@ class IGraph : public std::enable_shared_from_this<IGraph> {
*/
std::optional<std::size_t> _batch_size = std::nullopt;

std::shared_ptr<CommandQueue> _command_queue;
uint32_t _group_ordinal;
ze_command_queue_workload_type_t _ze_workload_type;
bool _turbo = false;
ze_command_queue_priority_t _ze_queue_priority;

Logger _logger;
};

Expand Down
20 changes: 0 additions & 20 deletions src/plugins/intel_npu/src/common/src/igraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,6 @@ const std::shared_ptr<CommandQueue>& IGraph::get_command_queue() const {
return _command_queue;
}

void IGraph::set_workload_type(const ov::WorkloadType workloadType) const {
if (_command_queue == nullptr) {
return;
}

ze_command_queue_workload_type_t zeWorkloadType;
switch (workloadType) {
case ov::WorkloadType::DEFAULT:
zeWorkloadType = ze_command_queue_workload_type_t::ZE_WORKLOAD_TYPE_DEFAULT;
break;
case ov::WorkloadType::EFFICIENT:
zeWorkloadType = ze_command_queue_workload_type_t::ZE_WORKLOAD_TYPE_BACKGROUND;
break;
default:
OPENVINO_THROW("Unknown value for WorkloadType!");
}

_command_queue->setWorkloadType(zeWorkloadType);
}

std::mutex& IGraph::get_mutex() {
return _mutex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ class DriverGraph final : public IGraph {

void initialize(const Config& config) override;

void set_workload_type(const ov::WorkloadType workloadType) override;

~DriverGraph() override;

private:
bool release_blob(const Config& config);

void create_new_command_queue() override;

std::shared_ptr<ZeGraphExtWrappers> _zeGraphExt;
std::shared_ptr<ZeroInitStructsHolder> _zeroInitStruct;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,13 @@ class PluginGraph final : public IGraph {

void initialize(const Config& config) override;

void set_workload_type(const ov::WorkloadType workloadType) override;

~PluginGraph() override;

private:
void create_new_command_queue() override;

std::shared_ptr<ZeGraphExtWrappers> _zeGraphExt;
std::shared_ptr<ZeroInitStructsHolder> _zeroInitStruct;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ZeGraphExtWrappers {

void setGraphArgumentValue(ze_graph_handle_t graphHandle, uint32_t argi_, const void* argv) const;

void initializeGraph(ze_graph_handle_t graphHandle, const Config& config) const;
void initializeGraph(ze_graph_handle_t graphHandle) const;

private:
std::unordered_set<std::string> getQueryResultFromSupportedLayers(
Expand All @@ -60,7 +60,7 @@ class ZeGraphExtWrappers {
std::vector<IODescriptor>& inputs,
std::vector<IODescriptor>& outputs) const;

void initialize_graph_through_command_list(ze_graph_handle_t graphHandle, const Config& config) const;
void initialize_graph_through_command_list(ze_graph_handle_t graphHandle) const;

std::shared_ptr<ZeroInitStructsHolder> _zeroInitStruct;
uint32_t _graphExtVersion;
Expand Down
Loading
Loading