Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINIFICPP-2501 Add processorStatuses C2 metric node to FlowInformation #1909

Closed
wants to merge 9 commits into from
Closed
400 changes: 385 additions & 15 deletions C2.md

Large diffs are not rendered by default.

34 changes: 25 additions & 9 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ RepositoryMetrics is a system level metric that reports metrics for the register
| rocksdb_table_readers_size_bytes | repository_name | RocksDB's estimated memory used for reading SST tables (only present if repository uses RocksDB) |
| rocksdb_all_memory_tables_size_bytes | repository_name | RocksDB's approximate size of active and unflushed immutable memtables (only present if repository uses RocksDB) |

| Label | Description |
|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| Label | Description |
|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------|
| repository_name | Name of the reported repository. There are three repositories present with the following names: `flowfile`, `content` and `provenance` |

### DeviceInfoNode
Expand All @@ -181,20 +181,31 @@ DeviceInfoNode is a system level metric that reports metrics about the system re

FlowInformation is a system level metric that reports component and queue related metrics.

| Metric name | Labels | Description |
|----------------------|----------------------------------|--------------------------------------------|
| queue_data_size | connection_uuid, connection_name | Current queue data size |
| queue_data_size_max | connection_uuid, connection_name | Max queue data size to apply back pressure |
| queue_size | connection_uuid, connection_name | Current queue size |
| queue_size_max | connection_uuid, connection_name | Max queue size to apply back pressure |
| is_running | component_uuid, component_name | Check if the component is running (1 or 0) |
| Metric name | Labels | Description |
|----------------------|----------------------------------|----------------------------------------------------------------------------|
| queue_data_size | connection_uuid, connection_name | Current queue data size |
| queue_data_size_max | connection_uuid, connection_name | Max queue data size to apply back pressure |
| queue_size | connection_uuid, connection_name | Current queue size |
| queue_size_max | connection_uuid, connection_name | Max queue size to apply back pressure |
| is_running | component_uuid, component_name | Check if the component is running (1 or 0) |
| bytes_read | processor_uuid, processor_name | Number of bytes read by the processor |
| bytes_written | processor_uuid, processor_name | Number of bytes written by the processor |
| flow_files_in | processor_uuid, processor_name | Number of flow files from the incoming queue processed by the processor |
| flow_files_out | processor_uuid, processor_name | Number of flow files transferred to outgoing relationship by the processor |
| bytes_in | processor_uuid, processor_name | Sum of data from the incoming queue processed by the processor |
| bytes_out | processor_uuid, processor_name | Sum of data transferred to outgoing relationship by the processor |
| invocations | processor_uuid, processor_name | Number of times the processor was triggered |
| processing_nanos | processor_uuid, processor_name | Sum of the runtime spent in the processor in nanoseconds |


| Label | Description |
|-----------------|--------------------------------------------------------------|
| connection_uuid | UUID of the connection defined in the flow configuration |
| connection_name | Name of the connection defined in the flow configuration |
| component_uuid | UUID of the component |
| component_name | Name of the component |
| processor_uuid | UUID of the processor |
| processor_name | Name of the processor |

### AgentStatus

Expand Down Expand Up @@ -251,6 +262,11 @@ There are general metrics that are available for all processors. Besides these m
| transferred_flow_files | metric_class, processor_name, processor_uuid | Number of flow files transferred to a relationship |
| transferred_bytes | metric_class, processor_name, processor_uuid | Number of bytes transferred to a relationship |
| transferred_to_\<relationship\> | metric_class, processor_name, processor_uuid | Number of flow files transferred to a specific relationship |
| incoming_flow_files | metric_class, processor_name, processor_uuid | Number of flow files from the incoming queue processed by the processor |
| incoming_bytes | metric_class, processor_name, processor_uuid | Sum of data from the incoming queue processed by the processor |
| bytes_read | metric_class, processor_name, processor_uuid | Number of bytes read by the processor |
| bytes_written | metric_class, processor_name, processor_uuid | Number of bytes written by the processor |
| processing_nanos | metric_class, processor_name, processor_uuid | Sum of the runtime spent in the processor in nanoseconds |

| Label | Description |
|----------------|------------------------------------------------------------------------|
Expand Down
11 changes: 8 additions & 3 deletions docker/test/integration/cluster/checkers/PrometheusChecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,21 @@ def verify_queue_metrics(self):
def verify_general_processor_metrics(self, metric_class, processor_name):
labels = {'processor_name': processor_name}
return self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 'minifi_last_onTrigger_runtime_milliseconds',
'minifi_average_session_commit_runtime_milliseconds', 'minifi_last_session_commit_runtime_milliseconds'], metric_class, labels) and \
self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 'minifi_transferred_flow_files', 'minifi_transferred_to_success', 'minifi_transferred_bytes'], metric_class, labels)
'minifi_average_session_commit_runtime_milliseconds', 'minifi_last_session_commit_runtime_milliseconds',
'minifi_incoming_flow_files', 'minifi_incoming_bytes', 'minifi_bytes_read', 'minifi_bytes_written'], metric_class, labels) and \
self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 'minifi_transferred_flow_files', 'minifi_transferred_to_success',
'minifi_transferred_bytes', 'minifi_processing_nanos'],
metric_class, labels)

def verify_getfile_metrics(self, metric_class, processor_name):
labels = {'processor_name': processor_name}
return self.verify_general_processor_metrics(metric_class, processor_name) and \
self.verify_metrics_exist(['minifi_input_bytes', 'minifi_accepted_files'], metric_class, labels)

def verify_flow_information_metrics(self):
return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 'FlowInformation') and \
return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max',
'minifi_bytes_read', 'minifi_bytes_written', 'minifi_flow_files_in', 'minifi_flow_files_out', 'minifi_bytes_in', 'minifi_bytes_out',
'minifi_invocations', 'minifi_processing_nanos'], 'FlowInformation') and \
self.verify_metric_exists('minifi_is_running', 'FlowInformation', {'component_name': 'FlowController'})

def verify_device_info_node_metrics(self):
Expand Down
10 changes: 10 additions & 0 deletions libminifi/include/core/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ class Processor : public Connectable, public ConfigurableComponent, public state
active_tasks_ = 0;
}

std::string getProcessGroupUUIDStr() const {
return process_group_uuid_;
}

void setProcessGroupUUIDStr(const std::string &uuid) {
process_group_uuid_ = uuid;
}

void yield() override;

void yield(std::chrono::steady_clock::duration delta_time);
Expand Down Expand Up @@ -256,6 +264,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state

// an outgoing connection allows us to reach these nodes
std::unordered_map<Connection*, std::unordered_set<Processor*>> reachable_processors_;

std::string process_group_uuid_;
};

} // namespace core
Expand Down
10 changes: 8 additions & 2 deletions libminifi/include/core/ProcessorMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,16 @@ class ProcessorMetrics : public state::response::ResponseNode {
std::chrono::milliseconds getAverageSessionCommitRuntime() const;
std::chrono::milliseconds getLastSessionCommitRuntime() const;
void addLastSessionCommitRuntime(std::chrono::milliseconds runtime);
std::optional<size_t> getTransferredFlowFilesToRelationshipCount(const std::string& relationship) const;

std::atomic<size_t> iterations{0};
std::atomic<size_t> invocations{0};
std::atomic<size_t> incoming_flow_files{0};
std::atomic<size_t> transferred_flow_files{0};
std::atomic<uint64_t> incoming_bytes{0};
std::atomic<uint64_t> transferred_bytes{0};
std::atomic<uint64_t> bytes_read{0};
std::atomic<uint64_t> bytes_written{0};
std::atomic<uint64_t> processing_nanos{0};

protected:
template<typename ValueType>
Expand All @@ -80,7 +86,7 @@ class ProcessorMetrics : public state::response::ResponseNode {
[[nodiscard]] std::unordered_map<std::string, std::string> getCommonLabels() const;
static constexpr uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10;

std::mutex transferred_relationships_mutex_;
mutable std::mutex transferred_relationships_mutex_;
std::unordered_map<std::string, size_t> transferred_relationships_;
const Processor& source_processor_;
Averager<std::chrono::milliseconds> on_trigger_runtime_averager_;
Expand Down
42 changes: 17 additions & 25 deletions libminifi/include/core/state/nodes/FlowInformation.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "core/state/nodes/StateMonitor.h"
#include "Connection.h"
#include "core/state/ConnectionStore.h"
#include "core/Processor.h"

namespace org::apache::nifi::minifi::state::response {

Expand Down Expand Up @@ -92,16 +93,22 @@ class FlowVersion : public DeviceInformation {
std::shared_ptr<FlowIdentifier> identifier;
};

class FlowMonitor : public StateMonitorNode {
class FlowInformation : public StateMonitorNode {
public:
FlowMonitor(std::string_view name, const utils::Identifier &uuid)
FlowInformation(std::string_view name, const utils::Identifier &uuid)
: StateMonitorNode(name, uuid) {
}

explicit FlowMonitor(std::string_view name)
explicit FlowInformation(std::string_view name)
: StateMonitorNode(name) {
}

MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent";

std::string getName() const override {
return "flowInfo";
}

void setFlowVersion(std::shared_ptr<state::response::FlowVersion> flow_version) {
flow_version_ = std::move(flow_version);
}
Expand All @@ -110,32 +117,17 @@ class FlowMonitor : public StateMonitorNode {
connection_store_.updateConnection(connection);
}

protected:
std::shared_ptr<state::response::FlowVersion> flow_version_;
ConnectionStore connection_store_;
};

/**
* Justification and Purpose: Provides flow version Information
*/
class FlowInformation : public FlowMonitor {
public:
FlowInformation(std::string_view name, const utils::Identifier &uuid)
: FlowMonitor(name, uuid) {
}

explicit FlowInformation(std::string_view name)
: FlowMonitor(name) {
}

MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent";

std::string getName() const override {
return "flowInfo";
void setProcessors(std::vector<core::Processor*> processors) {
processors_ = std::move(processors);
}

std::vector<SerializedResponseNode> serialize() override;
std::vector<PublishedMetric> calculateMetrics() override;

private:
std::shared_ptr<state::response::FlowVersion> flow_version_;
ConnectionStore connection_store_;
std::vector<core::Processor*> processors_;
};

} // namespace org::apache::nifi::minifi::state::response
2 changes: 1 addition & 1 deletion libminifi/include/core/state/nodes/ResponseNodeLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ResponseNodeLoader {
void initializeAgentNode(const SharedResponseNode& response_node) const;
void initializeAgentStatus(const SharedResponseNode& response_node) const;
void initializeConfigurationChecksums(const SharedResponseNode& response_node) const;
void initializeFlowMonitor(const SharedResponseNode& response_node) const;
void initializeFlowInformation(const SharedResponseNode& response_node) const;
void initializeAssetInformation(const SharedResponseNode& response_node) const;
std::vector<SharedResponseNode> getMatchingComponentMetricsNodes(const std::string& regex_str) const;

Expand Down
8 changes: 7 additions & 1 deletion libminifi/include/io/StreamCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@

#include <functional>
#include <memory>
#include <optional>

namespace org::apache::nifi::minifi::io {

class InputStream;
class OutputStream;

struct ReadWriteResult {
int64_t bytes_written = 0;
int64_t bytes_read = 0;
};

// FlowFile IO Callback functions for input and output
// throw exception for error
using InputStreamCallback = std::function<int64_t(const std::shared_ptr<InputStream>& input_stream)>;
using OutputStreamCallback = std::function<int64_t(const std::shared_ptr<OutputStream>& output_stream)>;
using InputOutputStreamCallback = std::function<int64_t(const std::shared_ptr<InputStream>& input_stream, const std::shared_ptr<OutputStream>& output_stream)>;
using InputOutputStreamCallback = std::function<std::optional<ReadWriteResult>(const std::shared_ptr<InputStream>& input_stream, const std::shared_ptr<OutputStream>& output_stream)>;

} // namespace org::apache::nifi::minifi::io
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class LineByLineInputOutputStreamCallback {
public:
using CallbackType = std::function<std::string(const std::string& input_line, bool is_first_line, bool is_last_line)>;
explicit LineByLineInputOutputStreamCallback(CallbackType callback);
int64_t operator()(const std::shared_ptr<io::InputStream>& input, const std::shared_ptr<io::OutputStream>& output);
std::optional<io::ReadWriteResult> operator()(const std::shared_ptr<io::InputStream>& input, const std::shared_ptr<io::OutputStream>& output);

private:
int64_t readInput(io::InputStream& stream);
Expand Down
1 change: 1 addition & 0 deletions libminifi/src/core/ProcessGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ std::tuple<Processor*, bool> ProcessGroup::addProcessor(std::unique_ptr<Processo
gsl_Expects(processor);
const auto name = processor->getName();
std::lock_guard<std::recursive_mutex> lock(mutex_);
processor->setProcessGroupUUIDStr(getUUIDStr());
const auto [iter, inserted] = processors_.insert(std::move(processor));
if (inserted) {
logger_->log_debug("Add processor {} into process group {}", name, name_);
Expand Down
Loading