Skip to content

Commit afa3aa9

Browse files
authored
Add some pre-calculated metrics (#1079)
1. max io util of disks 2. max network send/receive bytes rate of all network devices 3. base/cumulative compaction request counter and failure counter
1 parent b2a022b commit afa3aa9

File tree

11 files changed

+156
-16
lines changed

11 files changed

+156
-16
lines changed

be/src/common/daemon.cpp

+28
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
#include "exprs/hll_hash_function.h"
5252
#include "olap/options.h"
5353
#include "util/time.h"
54+
#include "util/system_metrics.h"
5455

5556
namespace doris {
5657

@@ -111,17 +112,28 @@ void* memory_maintenance_thread(void* dummy) {
111112
* this thread will calculate some metrics at a fix interval(15 sec)
112113
* 1. push bytes per second
113114
* 2. scan bytes per second
115+
* 3. max io util of all disks
116+
* 4. max network send bytes rate
117+
* 5. max network receive bytes rate
114118
*/
115119
void* calculate_metrics(void* dummy) {
116120
int64_t last_ts = -1L;
117121
int64_t lst_push_bytes = -1;
118122
int64_t lst_query_bytes = -1;
119123

124+
std::map<std::string, int64_t> lst_disks_io_time;
125+
std::map<std::string, int64_t> lst_net_send_bytes;
126+
std::map<std::string, int64_t> lst_net_receive_bytes;
127+
120128
while (true) {
129+
DorisMetrics::metrics()->trigger_hook();
130+
121131
if (last_ts == -1L) {
122132
last_ts = GetCurrentTimeMicros() / 1000;
123133
lst_push_bytes = DorisMetrics::push_request_write_bytes.value();
124134
lst_query_bytes = DorisMetrics::query_scan_bytes.value();
135+
DorisMetrics::system_metrics()->get_disks_io_time(&lst_disks_io_time);
136+
DorisMetrics::system_metrics()->get_network_traffic(&lst_net_send_bytes, &lst_net_receive_bytes);
125137
} else {
126138
int64_t current_ts = GetCurrentTimeMicros() / 1000;
127139
long interval = (current_ts - last_ts) / 1000;
@@ -140,6 +152,22 @@ void* calculate_metrics(void* dummy) {
140152
DorisMetrics::query_scan_bytes_per_second.set_value(
141153
qps < 0 ? 0 : qps);
142154
lst_query_bytes = current_query_bytes;
155+
156+
// 3. max disk io util
157+
DorisMetrics::max_disk_io_util_percent.set_value(
158+
DorisMetrics::system_metrics()->get_max_io_util(lst_disks_io_time, 15));
159+
// update lst map
160+
DorisMetrics::system_metrics()->get_disks_io_time(&lst_disks_io_time);
161+
162+
// 4. max network traffic
163+
int64_t max_send = 0;
164+
int64_t max_receive = 0;
165+
DorisMetrics::system_metrics()->get_max_net_traffic(
166+
lst_net_send_bytes, lst_net_receive_bytes, 15, &max_send, &max_receive);
167+
DorisMetrics::max_network_send_bytes_rate.set_value(max_send);
168+
DorisMetrics::max_network_receive_bytes_rate.set_value(max_receive);
169+
// update lst map
170+
DorisMetrics::system_metrics()->get_network_traffic(&lst_net_send_bytes, &lst_net_receive_bytes);
143171
}
144172

145173
sleep(15); // 15 seconds

be/src/exec/olap_table_info.h

+1-7
Original file line numberDiff line numberDiff line change
@@ -116,13 +116,7 @@ class OlapTablePartKeyComparator {
116116

117117
bool lhs_null = lhs->is_null(_slot_desc->null_indicator_offset());
118118
bool rhs_null = rhs->is_null(_slot_desc->null_indicator_offset());
119-
if (lhs_null && rhs_null) {
120-
return false;
121-
} else if (lhs_null) {
122-
return true;
123-
} else if (rhs_null) {
124-
return false;
125-
}
119+
if (lhs_null || rhs_null) { return !rhs_null; }
126120

127121
auto lhs_value = lhs->get_slot(_slot_desc->tuple_offset());
128122
auto rhs_value = rhs->get_slot(_slot_desc->tuple_offset());

be/src/http/action/metrics_action.cpp

+11-2
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,18 @@ class SimpleCoreMetricsVisitor : public MetricsVisitor {
5757
static const std::string PROCESS_THREAD_NUM;
5858
static const std::string PUSH_REQUEST_WRITE_BYTES_PER_SECOND;
5959
static const std::string QUERY_SCAN_BYTES_PER_SECOND;
60+
static const std::string MAX_DISK_IO_UTIL_PERCENT;
61+
static const std::string MAX_NETWORK_SEND_BYTES_RATE;
62+
static const std::string MAX_NETWORK_RECEIVE_BYTES_RATE;
6063
};
6164

6265
const std::string SimpleCoreMetricsVisitor::PROCESS_FD_NUM_USED = "process_fd_num_used";
6366
const std::string SimpleCoreMetricsVisitor::PROCESS_THREAD_NUM = "process_thread_num";
6467
const std::string SimpleCoreMetricsVisitor::PUSH_REQUEST_WRITE_BYTES_PER_SECOND = "push_request_write_bytes_per_second";
6568
const std::string SimpleCoreMetricsVisitor::QUERY_SCAN_BYTES_PER_SECOND = "query_scan_bytes_per_second";
69+
const std::string SimpleCoreMetricsVisitor::MAX_DISK_IO_UTIL_PERCENT = "max_disk_io_util_percent";
70+
const std::string SimpleCoreMetricsVisitor::MAX_NETWORK_SEND_BYTES_RATE= "max_network_send_bytes_rate";
71+
const std::string SimpleCoreMetricsVisitor::MAX_NETWORK_RECEIVE_BYTES_RATE= "max_network_receive_bytes_rate";
6672

6773
void PrometheusMetricsVisitor::visit(const std::string& prefix,
6874
const std::string& name,
@@ -82,7 +88,7 @@ void PrometheusMetricsVisitor::visit(const std::string& prefix,
8288
case MetricType::COUNTER:
8389
case MetricType::GAUGE:
8490
for (auto& it : collector->metrics()) {
85-
_visit_simple_metric(metric_name, it.first, (SimpleMetric*)it.second);
91+
_visit_simple_metric(metric_name, it.first, (SimpleMetric*) it.second);
8692
}
8793
break;
8894
default:
@@ -117,7 +123,10 @@ void SimpleCoreMetricsVisitor::visit(const std::string& prefix,
117123

118124
if (name != PROCESS_FD_NUM_USED && name != PROCESS_THREAD_NUM
119125
&& name != PUSH_REQUEST_WRITE_BYTES_PER_SECOND
120-
&& name != QUERY_SCAN_BYTES_PER_SECOND) {
126+
&& name != QUERY_SCAN_BYTES_PER_SECOND
127+
&& name != MAX_DISK_IO_UTIL_PERCENT
128+
&& name != MAX_NETWORK_SEND_BYTES_RATE
129+
&& name != MAX_NETWORK_RECEIVE_BYTES_RATE) {
121130
return;
122131
}
123132

be/src/olap/olap_engine.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -1828,16 +1828,19 @@ void OLAPEngine::perform_cumulative_compaction() {
18281828
OLAPTablePtr best_table = _find_best_tablet_to_compaction(CompactionType::CUMULATIVE_COMPACTION);
18291829
if (best_table == nullptr) { return; }
18301830

1831+
DorisMetrics::cumulative_compaction_request_total.increment(1);
18311832
CumulativeCompaction cumulative_compaction;
18321833
OLAPStatus res = cumulative_compaction.init(best_table);
18331834
if (res != OLAP_SUCCESS) {
1835+
DorisMetrics::cumulative_compaction_request_failed.increment(1);
18341836
LOG(WARNING) << "failed to init cumulative compaction."
18351837
<< "table=" << best_table->full_name();
18361838
return;
18371839
}
18381840

18391841
res = cumulative_compaction.run();
18401842
if (res != OLAP_SUCCESS) {
1843+
DorisMetrics::cumulative_compaction_request_failed.increment(1);
18411844
LOG(WARNING) << "failed to do cumulative compaction."
18421845
<< "table=" << best_table->full_name();
18431846
return;
@@ -1848,16 +1851,19 @@ void OLAPEngine::perform_base_compaction() {
18481851
OLAPTablePtr best_table = _find_best_tablet_to_compaction(CompactionType::BASE_COMPACTION);
18491852
if (best_table == nullptr) { return; }
18501853

1854+
DorisMetrics::base_compaction_request_total.increment(1);
18511855
BaseCompaction base_compaction;
18521856
OLAPStatus res = base_compaction.init(best_table);
18531857
if (res != OLAP_SUCCESS) {
1858+
DorisMetrics::base_compaction_request_failed.increment(1);
18541859
LOG(WARNING) << "failed to init base compaction."
18551860
<< "table=" << best_table->full_name();
18561861
return;
18571862
}
18581863

18591864
res = base_compaction.run();
18601865
if (res != OLAP_SUCCESS) {
1866+
DorisMetrics::base_compaction_request_failed.increment(1);
18611867
LOG(WARNING) << "failed to init base compaction."
18621868
<< "table=" << best_table->full_name();
18631869
return;

be/src/util/doris_metrics.cpp

+6
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ IntGaugeMetricsMap DorisMetrics::disks_state;
9898

9999
IntGauge DorisMetrics::push_request_write_bytes_per_second;
100100
IntGauge DorisMetrics::query_scan_bytes_per_second;
101+
IntGauge DorisMetrics::max_disk_io_util_percent;
102+
IntGauge DorisMetrics::max_network_send_bytes_rate;
103+
IntGauge DorisMetrics::max_network_receive_bytes_rate;
101104

102105
DorisMetrics::DorisMetrics() : _metrics(nullptr), _system_metrics(nullptr) {
103106
}
@@ -220,6 +223,9 @@ void DorisMetrics::initialize(
220223

221224
REGISTER_DORIS_METRIC(push_request_write_bytes_per_second);
222225
REGISTER_DORIS_METRIC(query_scan_bytes_per_second);
226+
REGISTER_DORIS_METRIC(max_disk_io_util_percent);
227+
REGISTER_DORIS_METRIC(max_network_send_bytes_rate);
228+
REGISTER_DORIS_METRIC(max_network_receive_bytes_rate);
223229

224230
_metrics->register_hook(_s_hook_name, std::bind(&DorisMetrics::update, this));
225231

be/src/util/doris_metrics.h

+4
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,9 @@ class DorisMetrics {
123123
// by metric calculator
124124
static IntGauge push_request_write_bytes_per_second;
125125
static IntGauge query_scan_bytes_per_second;
126+
static IntGauge max_disk_io_util_percent;
127+
static IntGauge max_network_send_bytes_rate;
128+
static IntGauge max_network_receive_bytes_rate;
126129

127130
~DorisMetrics();
128131
// call before calling metrics
@@ -135,6 +138,7 @@ class DorisMetrics {
135138

136139
static DorisMetrics* instance() { return &_s_doris_metrics; }
137140
static MetricRegistry* metrics() { return _s_doris_metrics._metrics; }
141+
static SystemMetrics* system_metrics() { return _s_doris_metrics._system_metrics; }
138142
private:
139143
// Don't allow constrctor
140144
DorisMetrics();

be/src/util/metrics.h

+17-3
Original file line numberDiff line numberDiff line change
@@ -302,14 +302,28 @@ class MetricRegistry {
302302

303303
void collect(MetricsVisitor* visitor) {
304304
std::lock_guard<SpinLock> l(_lock);
305-
// Before we collect, need to call hooks
306-
for (auto& it : _hooks) {
307-
it.second();
305+
if (!config::enable_metric_calculator) {
306+
// Before we collect, need to call hooks
307+
unprotected_trigger_hook();
308308
}
309+
309310
for (auto& it : _collectors) {
310311
it.second->collect(_name, it.first, visitor);
311312
}
312313
}
314+
315+
void trigger_hook() {
316+
std::lock_guard<SpinLock> l(_lock);
317+
unprotected_trigger_hook();
318+
}
319+
320+
private:
321+
void unprotected_trigger_hook() {
322+
for (auto& it : _hooks) {
323+
it.second();
324+
}
325+
}
326+
313327
private:
314328
void _deregister_locked(Metric* metric);
315329

be/src/util/system_metrics.cpp

+60
Original file line numberDiff line numberDiff line change
@@ -435,4 +435,64 @@ void SystemMetrics::_update_fd_metrics() {
435435
fclose(fp);
436436
}
437437

438+
int64_t SystemMetrics::get_max_io_util(
439+
const std::map<std::string, int64_t>& lst_value, int64_t interval_sec) {
440+
int64_t max = 0;
441+
for (auto& it : _disk_metrics) {
442+
int64_t cur = it.second->io_time_ms.value();
443+
const auto find = lst_value.find(it.first);
444+
if (find == lst_value.end()) {
445+
continue;
446+
}
447+
int64_t incr = cur - find->second;
448+
if (incr > max) max = incr;
449+
}
450+
return max / interval_sec / 10;
451+
}
452+
453+
void SystemMetrics::get_disks_io_time(std::map<std::string, int64_t>* map) {
454+
map->clear();
455+
for (auto& it : _disk_metrics) {
456+
map->emplace(it.first, it.second->io_time_ms.value());
457+
}
458+
}
459+
460+
void SystemMetrics::get_network_traffic(
461+
std::map<std::string, int64_t>* send_map,
462+
std::map<std::string, int64_t>* rcv_map) {
463+
send_map->clear();
464+
rcv_map->clear();
465+
for (auto& it : _net_metrics) {
466+
if (it.first == "lo") { continue; }
467+
send_map->emplace(it.first, it.second->send_bytes.value());
468+
rcv_map->emplace(it.first, it.second->receive_bytes.value());
469+
}
470+
}
471+
472+
void SystemMetrics::get_max_net_traffic(
473+
const std::map<std::string, int64_t>& lst_send_map,
474+
const std::map<std::string, int64_t>& lst_rcv_map,
475+
int64_t interval_sec,
476+
int64_t* send_rate, int64_t* rcv_rate) {
477+
int64_t max_send = 0;
478+
int64_t max_rcv = 0;
479+
for (auto& it : _net_metrics) {
480+
int64_t cur_send = it.second->send_bytes.value();
481+
int64_t cur_rcv = it.second->receive_bytes.value();
482+
483+
const auto find_send = lst_send_map.find(it.first);
484+
if (find_send != lst_send_map.end()) {
485+
int64_t incr = cur_send - find_send->second;
486+
if (incr > max_send) max_send = incr;
487+
}
488+
const auto find_rcv= lst_rcv_map.find(it.first);
489+
if (find_rcv != lst_rcv_map.end()) {
490+
int64_t incr = cur_rcv - find_rcv->second;
491+
if (incr > max_rcv) max_rcv = incr;
492+
}
493+
}
494+
495+
*send_rate = max_send / interval_sec;
496+
*rcv_rate = max_rcv / interval_sec;
438497
}
498+
} // end namespace

be/src/util/system_metrics.h

+14
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
#include "util/metrics.h"
1919

20+
#include <map>
2021
#include <memory>
2122

2223
namespace doris {
@@ -40,6 +41,19 @@ class SystemMetrics {
4041
// update metrics
4142
void update();
4243

44+
void get_disks_io_time(std::map<std::string, int64_t>* map);
45+
int64_t get_max_io_util(
46+
const std::map<std::string, int64_t>& lst_value, int64_t interval_sec);
47+
48+
void get_network_traffic(
49+
std::map<std::string, int64_t>* send_map,
50+
std::map<std::string, int64_t>* rcv_map);
51+
void get_max_net_traffic(
52+
const std::map<std::string, int64_t>& lst_send_map,
53+
const std::map<std::string, int64_t>& lst_rcv_map,
54+
int64_t interval_sec,
55+
int64_t* send_rate, int64_t* rcv_rate);
56+
4357
private:
4458
void _install_cpu_metrics(MetricRegistry*);
4559
// On Intel(R) Xeon(R) CPU E5-2450 0 @ 2.10GHz;

fe/src/main/java/org/apache/doris/metric/MetricRepo.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ public final class MetricRepo {
6161
public static LongCounterMetric COUNTER_EDIT_LOG_SIZE_BYTES;
6262
public static LongCounterMetric COUNTER_IMAGE_WRITE;
6363
public static LongCounterMetric COUNTER_IMAGE_PUSH;
64+
public static LongCounterMetric COUNTER_TXN_REJECT;
6465
public static LongCounterMetric COUNTER_TXN_BEGIN;
6566
public static LongCounterMetric COUNTER_TXN_FAILED;
6667
public static LongCounterMetric COUNTER_TXN_SUCCESS;
@@ -208,9 +209,10 @@ public Long getValue() {
208209
"counter of image succeeded in pushing to other frontends");
209210
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_IMAGE_PUSH);
210211

212+
COUNTER_TXN_REJECT = new LongCounterMetric("txn_reject", "counter of rejected transactions");
213+
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_REJECT);
211214
COUNTER_TXN_BEGIN = new LongCounterMetric("txn_begin", "counter of begining transactions");
212215
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_BEGIN);
213-
COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed", "counter of failed transactions");
214216
COUNTER_TXN_SUCCESS = new LongCounterMetric("txn_success", "counter of success transactions");
215217
PALO_METRIC_REGISTER.addPaloMetrics(COUNTER_TXN_SUCCESS);
216218
COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed", "counter of failed transactions");

fe/src/main/java/org/apache/doris/transaction/GlobalTransactionMgr.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
import org.apache.doris.common.LabelAlreadyUsedException;
3636
import org.apache.doris.common.MetaNotFoundException;
3737
import org.apache.doris.common.UserException;
38-
import org.apache.doris.common.util.LogBuilder;
39-
import org.apache.doris.common.util.LogKey;
4038
import org.apache.doris.common.util.TimeUtils;
4139
import org.apache.doris.common.util.Util;
4240
import org.apache.doris.load.Load;
@@ -168,10 +166,15 @@ public long beginTransaction(long dbId, String label, long timestamp,
168166
unprotectUpsertTransactionState(transactionState);
169167

170168
if (MetricRepo.isInit.get()) {
171-
MetricRepo.COUNTER_TXN_SUCCESS.increase(1L);
169+
MetricRepo.COUNTER_TXN_BEGIN.increase(1L);
172170
}
173171

174172
return tid;
173+
} catch (Exception e) {
174+
if (MetricRepo.isInit.get()) {
175+
MetricRepo.COUNTER_TXN_REJECT.increase(1L);
176+
}
177+
throw e;
175178
} finally {
176179
writeUnlock();
177180
}

0 commit comments

Comments
 (0)