Skip to content

Commit

Permalink
MINIFICPP-2501 Add processorStatuses C2 metric node to FlowInformation
Browse files Browse the repository at this point in the history
Closes #1909

Signed-off-by: Marton Szasz <[email protected]>
  • Loading branch information
lordgamez authored and szaszm committed Jan 21, 2025
1 parent 634016b commit 07b9641
Show file tree
Hide file tree
Showing 20 changed files with 756 additions and 124 deletions.
400 changes: 385 additions & 15 deletions C2.md

Large diffs are not rendered by default.

34 changes: 25 additions & 9 deletions METRICS.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ RepositoryMetrics is a system level metric that reports metrics for the register
| rocksdb_table_readers_size_bytes | repository_name | RocksDB's estimated memory used for reading SST tables (only present if repository uses RocksDB) |
| rocksdb_all_memory_tables_size_bytes | repository_name | RocksDB's approximate size of active and unflushed immutable memtables (only present if repository uses RocksDB) |

| Label | Description |
|--------------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| Label | Description |
|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------|
| repository_name | Name of the reported repository. There are three repositories present with the following names: `flowfile`, `content` and `provenance` |

### DeviceInfoNode
Expand All @@ -181,20 +181,31 @@ DeviceInfoNode is a system level metric that reports metrics about the system re

FlowInformation is a system level metric that reports component and queue related metrics.

| Metric name | Labels | Description |
|----------------------|----------------------------------|--------------------------------------------|
| queue_data_size | connection_uuid, connection_name | Current queue data size |
| queue_data_size_max | connection_uuid, connection_name | Max queue data size to apply back pressure |
| queue_size | connection_uuid, connection_name | Current queue size |
| queue_size_max | connection_uuid, connection_name | Max queue size to apply back pressure |
| is_running | component_uuid, component_name | Check if the component is running (1 or 0) |
| Metric name | Labels | Description |
|----------------------|----------------------------------|----------------------------------------------------------------------------|
| queue_data_size | connection_uuid, connection_name | Current queue data size |
| queue_data_size_max | connection_uuid, connection_name | Max queue data size to apply back pressure |
| queue_size | connection_uuid, connection_name | Current queue size |
| queue_size_max | connection_uuid, connection_name | Max queue size to apply back pressure |
| is_running | component_uuid, component_name | Check if the component is running (1 or 0) |
| bytes_read | processor_uuid, processor_name | Number of bytes read by the processor |
| bytes_written | processor_uuid, processor_name | Number of bytes written by the processor |
| flow_files_in | processor_uuid, processor_name | Number of flow files from the incoming queue processed by the processor |
| flow_files_out | processor_uuid, processor_name | Number of flow files transferred to outgoing relationship by the processor |
| bytes_in | processor_uuid, processor_name | Sum of data from the incoming queue processed by the processor |
| bytes_out | processor_uuid, processor_name | Sum of data transferred to outgoing relationship by the processor |
| invocations | processor_uuid, processor_name | Number of times the processor was triggered |
| processing_nanos | processor_uuid, processor_name | Sum of the runtime spent in the processor in nanoseconds |


| Label | Description |
|-----------------|--------------------------------------------------------------|
| connection_uuid | UUID of the connection defined in the flow configuration |
| connection_name | Name of the connection defined in the flow configuration |
| component_uuid | UUID of the component |
| component_name | Name of the component |
| processor_uuid | UUID of the processor |
| processor_name | Name of the processor |

### AgentStatus

Expand Down Expand Up @@ -251,6 +262,11 @@ There are general metrics that are available for all processors. Besides these m
| transferred_flow_files | metric_class, processor_name, processor_uuid | Number of flow files transferred to a relationship |
| transferred_bytes | metric_class, processor_name, processor_uuid | Number of bytes transferred to a relationship |
| transferred_to_\<relationship\> | metric_class, processor_name, processor_uuid | Number of flow files transferred to a specific relationship |
| incoming_flow_files | metric_class, processor_name, processor_uuid | Number of flow files from the incoming queue processed by the processor |
| incoming_bytes | metric_class, processor_name, processor_uuid | Sum of data from the incoming queue processed by the processor |
| bytes_read | metric_class, processor_name, processor_uuid | Number of bytes read by the processor |
| bytes_written | metric_class, processor_name, processor_uuid | Number of bytes written by the processor |
| processing_nanos | metric_class, processor_name, processor_uuid | Sum of the runtime spent in the processor in nanoseconds |

| Label | Description |
|----------------|------------------------------------------------------------------------|
Expand Down
11 changes: 8 additions & 3 deletions docker/test/integration/cluster/checkers/PrometheusChecker.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,21 @@ def verify_queue_metrics(self):
def verify_general_processor_metrics(self, metric_class, processor_name):
labels = {'processor_name': processor_name}
return self.verify_metrics_exist(['minifi_average_onTrigger_runtime_milliseconds', 'minifi_last_onTrigger_runtime_milliseconds',
'minifi_average_session_commit_runtime_milliseconds', 'minifi_last_session_commit_runtime_milliseconds'], metric_class, labels) and \
self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 'minifi_transferred_flow_files', 'minifi_transferred_to_success', 'minifi_transferred_bytes'], metric_class, labels)
'minifi_average_session_commit_runtime_milliseconds', 'minifi_last_session_commit_runtime_milliseconds',
'minifi_incoming_flow_files', 'minifi_incoming_bytes', 'minifi_bytes_read', 'minifi_bytes_written'], metric_class, labels) and \
self.verify_metrics_larger_than_zero(['minifi_onTrigger_invocations', 'minifi_transferred_flow_files', 'minifi_transferred_to_success',
'minifi_transferred_bytes', 'minifi_processing_nanos'],
metric_class, labels)

def verify_getfile_metrics(self, metric_class, processor_name):
labels = {'processor_name': processor_name}
return self.verify_general_processor_metrics(metric_class, processor_name) and \
self.verify_metrics_exist(['minifi_input_bytes', 'minifi_accepted_files'], metric_class, labels)

def verify_flow_information_metrics(self):
return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max'], 'FlowInformation') and \
return self.verify_metrics_exist(['minifi_queue_data_size', 'minifi_queue_data_size_max', 'minifi_queue_size', 'minifi_queue_size_max',
'minifi_bytes_read', 'minifi_bytes_written', 'minifi_flow_files_in', 'minifi_flow_files_out', 'minifi_bytes_in', 'minifi_bytes_out',
'minifi_invocations', 'minifi_processing_nanos'], 'FlowInformation') and \
self.verify_metric_exists('minifi_is_running', 'FlowInformation', {'component_name': 'FlowController'})

def verify_device_info_node_metrics(self):
Expand Down
10 changes: 10 additions & 0 deletions libminifi/include/core/Processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ class Processor : public Connectable, public ConfigurableComponent, public state
active_tasks_ = 0;
}

std::string getProcessGroupUUIDStr() const {
return process_group_uuid_;
}

void setProcessGroupUUIDStr(const std::string &uuid) {
process_group_uuid_ = uuid;
}

void yield() override;

void yield(std::chrono::steady_clock::duration delta_time);
Expand Down Expand Up @@ -256,6 +264,8 @@ class Processor : public Connectable, public ConfigurableComponent, public state

// an outgoing connection allows us to reach these nodes
std::unordered_map<Connection*, std::unordered_set<Processor*>> reachable_processors_;

std::string process_group_uuid_;
};

} // namespace core
Expand Down
10 changes: 8 additions & 2 deletions libminifi/include/core/ProcessorMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,16 @@ class ProcessorMetrics : public state::response::ResponseNode {
std::chrono::milliseconds getAverageSessionCommitRuntime() const;
std::chrono::milliseconds getLastSessionCommitRuntime() const;
void addLastSessionCommitRuntime(std::chrono::milliseconds runtime);
std::optional<size_t> getTransferredFlowFilesToRelationshipCount(const std::string& relationship) const;

std::atomic<size_t> iterations{0};
std::atomic<size_t> invocations{0};
std::atomic<size_t> incoming_flow_files{0};
std::atomic<size_t> transferred_flow_files{0};
std::atomic<uint64_t> incoming_bytes{0};
std::atomic<uint64_t> transferred_bytes{0};
std::atomic<uint64_t> bytes_read{0};
std::atomic<uint64_t> bytes_written{0};
std::atomic<uint64_t> processing_nanos{0};

protected:
template<typename ValueType>
Expand All @@ -80,7 +86,7 @@ class ProcessorMetrics : public state::response::ResponseNode {
[[nodiscard]] std::unordered_map<std::string, std::string> getCommonLabels() const;
static constexpr uint8_t STORED_ON_TRIGGER_RUNTIME_COUNT = 10;

std::mutex transferred_relationships_mutex_;
mutable std::mutex transferred_relationships_mutex_;
std::unordered_map<std::string, size_t> transferred_relationships_;
const Processor& source_processor_;
Averager<std::chrono::milliseconds> on_trigger_runtime_averager_;
Expand Down
42 changes: 17 additions & 25 deletions libminifi/include/core/state/nodes/FlowInformation.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "core/state/nodes/StateMonitor.h"
#include "Connection.h"
#include "core/state/ConnectionStore.h"
#include "core/Processor.h"

namespace org::apache::nifi::minifi::state::response {

Expand Down Expand Up @@ -92,16 +93,22 @@ class FlowVersion : public DeviceInformation {
std::shared_ptr<FlowIdentifier> identifier;
};

class FlowMonitor : public StateMonitorNode {
class FlowInformation : public StateMonitorNode {
public:
FlowMonitor(std::string_view name, const utils::Identifier &uuid)
FlowInformation(std::string_view name, const utils::Identifier &uuid)
: StateMonitorNode(name, uuid) {
}

explicit FlowMonitor(std::string_view name)
explicit FlowInformation(std::string_view name)
: StateMonitorNode(name) {
}

MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent";

std::string getName() const override {
return "flowInfo";
}

void setFlowVersion(std::shared_ptr<state::response::FlowVersion> flow_version) {
flow_version_ = std::move(flow_version);
}
Expand All @@ -110,32 +117,17 @@ class FlowMonitor : public StateMonitorNode {
connection_store_.updateConnection(connection);
}

protected:
std::shared_ptr<state::response::FlowVersion> flow_version_;
ConnectionStore connection_store_;
};

/**
* Justification and Purpose: Provides flow version Information
*/
class FlowInformation : public FlowMonitor {
public:
FlowInformation(std::string_view name, const utils::Identifier &uuid)
: FlowMonitor(name, uuid) {
}

explicit FlowInformation(std::string_view name)
: FlowMonitor(name) {
}

MINIFIAPI static constexpr const char* Description = "Metric node that defines the flow ID and flow URL deployed to this agent";

std::string getName() const override {
return "flowInfo";
void setProcessors(std::vector<core::Processor*> processors) {
processors_ = std::move(processors);
}

std::vector<SerializedResponseNode> serialize() override;
std::vector<PublishedMetric> calculateMetrics() override;

private:
std::shared_ptr<state::response::FlowVersion> flow_version_;
ConnectionStore connection_store_;
std::vector<core::Processor*> processors_;
};

} // namespace org::apache::nifi::minifi::state::response
2 changes: 1 addition & 1 deletion libminifi/include/core/state/nodes/ResponseNodeLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ResponseNodeLoader {
void initializeAgentNode(const SharedResponseNode& response_node) const;
void initializeAgentStatus(const SharedResponseNode& response_node) const;
void initializeConfigurationChecksums(const SharedResponseNode& response_node) const;
void initializeFlowMonitor(const SharedResponseNode& response_node) const;
void initializeFlowInformation(const SharedResponseNode& response_node) const;
void initializeAssetInformation(const SharedResponseNode& response_node) const;
std::vector<SharedResponseNode> getMatchingComponentMetricsNodes(const std::string& regex_str) const;

Expand Down
8 changes: 7 additions & 1 deletion libminifi/include/io/StreamCallback.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@

#include <functional>
#include <memory>
#include <optional>

namespace org::apache::nifi::minifi::io {

class InputStream;
class OutputStream;

struct ReadWriteResult {
int64_t bytes_written = 0;
int64_t bytes_read = 0;
};

// FlowFile IO Callback functions for input and output
// throw exception for error
using InputStreamCallback = std::function<int64_t(const std::shared_ptr<InputStream>& input_stream)>;
using OutputStreamCallback = std::function<int64_t(const std::shared_ptr<OutputStream>& output_stream)>;
using InputOutputStreamCallback = std::function<int64_t(const std::shared_ptr<InputStream>& input_stream, const std::shared_ptr<OutputStream>& output_stream)>;
using InputOutputStreamCallback = std::function<std::optional<ReadWriteResult>(const std::shared_ptr<InputStream>& input_stream, const std::shared_ptr<OutputStream>& output_stream)>;

} // namespace org::apache::nifi::minifi::io
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class LineByLineInputOutputStreamCallback {
public:
using CallbackType = std::function<std::string(const std::string& input_line, bool is_first_line, bool is_last_line)>;
explicit LineByLineInputOutputStreamCallback(CallbackType callback);
int64_t operator()(const std::shared_ptr<io::InputStream>& input, const std::shared_ptr<io::OutputStream>& output);
std::optional<io::ReadWriteResult> operator()(const std::shared_ptr<io::InputStream>& input, const std::shared_ptr<io::OutputStream>& output);

private:
int64_t readInput(io::InputStream& stream);
Expand Down
1 change: 1 addition & 0 deletions libminifi/src/core/ProcessGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ std::tuple<Processor*, bool> ProcessGroup::addProcessor(std::unique_ptr<Processo
gsl_Expects(processor);
const auto name = processor->getName();
std::lock_guard<std::recursive_mutex> lock(mutex_);
processor->setProcessGroupUUIDStr(getUUIDStr());
const auto [iter, inserted] = processors_.insert(std::move(processor));
if (inserted) {
logger_->log_debug("Add processor {} into process group {}", name, name_);
Expand Down
Loading

0 comments on commit 07b9641

Please sign in to comment.