diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/RequestStatistics.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/RequestStatistics.java index 514eb12a3086..d95a5f52dc83 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/RequestStatistics.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/RequestStatistics.java @@ -45,6 +45,61 @@ public class RequestStatistics { private long _numSegmentsMatched; private long _offlineThreadCpuTimeNs; private long _realtimeThreadCpuTimeNs; + private long _offlineSystemActivitiesCpuTimeNs; + private long _realtimeSystemActivitiesCpuTimeNs; + private long _offlineResponseSerializationCpuTimeNs; + private long _realtimeResponseSerializationCpuTimeNs; + private long _offlineTotalCpuTimeNs; + private long _realtimeTotalCpuTimeNs; + + public long getOfflineSystemActivitiesCpuTimeNs() { + return _offlineSystemActivitiesCpuTimeNs; + } + + public void setOfflineSystemActivitiesCpuTimeNs(long offlineSystemActivitiesCpuTimeNs) { + _offlineSystemActivitiesCpuTimeNs = offlineSystemActivitiesCpuTimeNs; + } + + public long getRealtimeSystemActivitiesCpuTimeNs() { + return _realtimeSystemActivitiesCpuTimeNs; + } + + public void setRealtimeSystemActivitiesCpuTimeNs(long realtimeSystemActivitiesCpuTimeNs) { + _realtimeSystemActivitiesCpuTimeNs = realtimeSystemActivitiesCpuTimeNs; + } + + public long getOfflineResponseSerializationCpuTimeNs() { + return _offlineResponseSerializationCpuTimeNs; + } + + public void setOfflineResponseSerializationCpuTimeNs(long offlineResponseSerializationCpuTimeNs) { + _offlineResponseSerializationCpuTimeNs = offlineResponseSerializationCpuTimeNs; + } + + public long getOfflineTotalCpuTimeNs() { + return _offlineTotalCpuTimeNs; + } + + public void setOfflineTotalCpuTimeNs(long offlineTotalCpuTimeNs) { + _offlineTotalCpuTimeNs = _offlineTotalCpuTimeNs; + } + + public long getRealtimeResponseSerializationCpuTimeNs() { + return _realtimeResponseSerializationCpuTimeNs; + } + + public void setRealtimeResponseSerializationCpuTimeNs(long realtimeResponseSerializationCpuTimeNs) { + _realtimeResponseSerializationCpuTimeNs = realtimeResponseSerializationCpuTimeNs; + } + + public long getRealtimeTotalCpuTimeNs() { + return _realtimeTotalCpuTimeNs; + } + + public void setRealtimeTotalCpuTimeNs(long realtimeTotalCpuTimeNs) { + _realtimeTotalCpuTimeNs = realtimeTotalCpuTimeNs; + } + private int _numServersQueried; private int _numServersResponded; private boolean _isNumGroupsLimitReached; @@ -123,6 +178,12 @@ public void setStatistics(BrokerResponse brokerResponse) { _numExceptions = brokerResponse.getExceptionsSize(); _offlineThreadCpuTimeNs = brokerResponse.getOfflineThreadCpuTimeNs(); _realtimeThreadCpuTimeNs = brokerResponse.getRealtimeThreadCpuTimeNs(); + _offlineSystemActivitiesCpuTimeNs = brokerResponse.getOfflineSystemActivitiesCpuTimeNs(); + _realtimeSystemActivitiesCpuTimeNs = brokerResponse.getRealtimeSystemActivitiesCpuTimeNs(); + _offlineResponseSerializationCpuTimeNs = brokerResponse.getOfflineResponseSerializationCpuTimeNs(); + _realtimeResponseSerializationCpuTimeNs = brokerResponse.getRealtimeResponseSerializationCpuTimeNs(); + _offlineTotalCpuTimeNs = brokerResponse.getOfflineTotalCpuTimeNs(); + _realtimeTotalCpuTimeNs = brokerResponse.getRealtimeTotalCpuTimeNs(); _numRowsResultSet = brokerResponse.getNumRowsResultSet(); } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 547c89517190..f9ae5c2cd084 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -542,7 +542,8 @@ private void logBrokerResponse(long requestId, String query, RequestStatistics r LOGGER.info("requestId={},table={},timeMs={},docs={}/{},entries={}/{}," + "segments(queried/processed/matched/consuming/unavailable):{}/{}/{}/{}/{},consumingFreshnessTimeMs={}," + "servers={}/{},groupLimitReached={},brokerReduceTimeMs={},exceptions={},serverStats={}," - + "offlineThreadCpuTimeNs={},realtimeThreadCpuTimeNs={},query={}", requestId, + + "offlineThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{}," + + "realtimeThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{}," + "query={}", requestId, brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(), brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(), brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(), @@ -551,8 +552,11 @@ private void logBrokerResponse(long requestId, String query, RequestStatistics r brokerResponse.getMinConsumingFreshnessTimeMs(), brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(), brokerResponse.isNumGroupsLimitReached(), requestStatistics.getReduceTimeMillis(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(), - brokerResponse.getOfflineThreadCpuTimeNs(), brokerResponse.getRealtimeThreadCpuTimeNs(), - StringUtils.substring(query, 0, _queryLogLength)); + brokerResponse.getOfflineTotalCpuTimeNs(), brokerResponse.getOfflineThreadCpuTimeNs(), + brokerResponse.getOfflineSystemActivitiesCpuTimeNs(), + brokerResponse.getOfflineResponseSerializationCpuTimeNs(), brokerResponse.getRealtimeTotalCpuTimeNs(), + brokerResponse.getRealtimeThreadCpuTimeNs(), brokerResponse.getRealtimeSystemActivitiesCpuTimeNs(), + brokerResponse.getRealtimeResponseSerializationCpuTimeNs(), StringUtils.substring(query, 0, _queryLogLength)); // Limit the dropping log message at most once per second. if (_numDroppedLogRateLimiter.tryAcquire()) { @@ -836,8 +840,9 @@ private BrokerResponseNative handlePQLRequest(long requestId, String query, Json // Table name might have been changed (with suffix _OFFLINE/_REALTIME appended) LOGGER.info("requestId={},table={},timeMs={},docs={}/{},entries={}/{}," + "segments(queried/processed/matched/consuming/unavailable):{}/{}/{}/{}/{},consumingFreshnessTimeMs={}," - + "servers={}/{},groupLimitReached={},brokerReduceTimeMs={},exceptions={},serverStats={},query={}," - + "offlineThreadCpuTimeNs={},realtimeThreadCpuTimeNs={}", requestId, + + "servers={}/{},groupLimitReached={},brokerReduceTimeMs={},exceptions={},serverStats={}," + + "offlineThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{}," + + "realtimeThreadCpuTimeNs(total/thread/sysActivity/resSer):{}/{}/{}/{}," + "query={}", requestId, brokerRequest.getQuerySource().getTableName(), totalTimeMs, brokerResponse.getNumDocsScanned(), brokerResponse.getTotalDocs(), brokerResponse.getNumEntriesScannedInFilter(), brokerResponse.getNumEntriesScannedPostFilter(), brokerResponse.getNumSegmentsQueried(), @@ -846,8 +851,11 @@ private BrokerResponseNative handlePQLRequest(long requestId, String query, Json brokerResponse.getMinConsumingFreshnessTimeMs(), brokerResponse.getNumServersResponded(), brokerResponse.getNumServersQueried(), brokerResponse.isNumGroupsLimitReached(), requestStatistics.getReduceTimeMillis(), brokerResponse.getExceptionsSize(), serverStats.getServerStats(), - StringUtils.substring(query, 0, _queryLogLength), brokerResponse.getOfflineThreadCpuTimeNs(), - brokerResponse.getRealtimeThreadCpuTimeNs()); + brokerResponse.getOfflineTotalCpuTimeNs(), brokerResponse.getOfflineThreadCpuTimeNs(), + brokerResponse.getOfflineSystemActivitiesCpuTimeNs(), + brokerResponse.getOfflineResponseSerializationCpuTimeNs(), brokerResponse.getRealtimeTotalCpuTimeNs(), + brokerResponse.getRealtimeThreadCpuTimeNs(), brokerResponse.getRealtimeSystemActivitiesCpuTimeNs(), + brokerResponse.getRealtimeResponseSerializationCpuTimeNs(), StringUtils.substring(query, 0, _queryLogLength)); // Limit the dropping log message at most once per second. if (_numDroppedLogRateLimiter.tryAcquire()) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java index 8bc8410a2dfb..661e42ae69dc 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerTimer.java @@ -33,10 +33,24 @@ public enum BrokerTimer implements AbstractMetrics.Timer { // The latency of sending the request from broker to server NETTY_CONNECTION_SEND_REQUEST_LATENCY(false), - // aggregated query processing cost (thread cpu time in nanoseconds) from offline servers - OFFLINE_THREAD_CPU_TIME_NS( - false), // aggregated query processing cost (thread cpu time in nanoseconds) from realtime servers - REALTIME_THREAD_CPU_TIME_NS(false); + // aggregated thread cpu time in nanoseconds for query processing from offline servers + OFFLINE_THREAD_CPU_TIME_NS(false), + // aggregated thread cpu time in nanoseconds for query processing from realtime servers + REALTIME_THREAD_CPU_TIME_NS(false), + // aggregated system activities cpu time in nanoseconds for query processing from offline servers + OFFLINE_SYSTEM_ACTIVITIES_CPU_TIME_NS(false), + // aggregated system activities cpu time in nanoseconds for query processing from realtime servers + REALTIME_SYSTEM_ACTIVITIES_CPU_TIME_NS(false), + // aggregated response serialization cpu time in nanoseconds for query processing from offline servers + OFFLINE_RESPONSE_SER_CPU_TIME_NS(false), + // aggregated response serialization cpu time in nanoseconds for query processing from realtime servers + REALTIME_RESPONSE_SER_CPU_TIME_NS(false), + // aggregated total cpu time(thread + system activities + response serialization) in nanoseconds for query + // processing from offline servers + OFFLINE_TOTAL_CPU_TIME_NS(false), + // aggregated total cpu time(thread + system activities + response serialization) in nanoseconds for query + // processing from realtime servers + REALTIME_TOTAL_CPU_TIME_NS(false); private final String _timerName; private final boolean _global; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java index c1e1615b95a8..b6d83bcf6c7a 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java @@ -32,8 +32,18 @@ public enum ServerTimer implements AbstractMetrics.Timer { // The latency of sending the response from server to broker NETTY_CONNECTION_SEND_RESPONSE_LATENCY("nettyConnection", false), - // Query cost (thread cpu time) for query processing on server - EXECUTION_THREAD_CPU_TIME_NS("nanoseconds", false); + // Query cost (execution thread cpu time) for query processing on server + EXECUTION_THREAD_CPU_TIME_NS("nanoseconds", false), + + // Query cost (system activities cpu time) for query processing on server + SYSTEM_ACTIVITIES_CPU_TIME_NS("nanoseconds", false), + + // Query cost (response serialization cpu time) for query processing on server + RESPONSE_SER_CPU_TIME_NS("nanoseconds", false), + + // Total query cost (thread cpu time + system activities cpu time + response serialization cpu time) for query + // processing on server + TOTAL_CPU_TIME_NS("nanoseconds", false); private final String _timerName; private final boolean _global; diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java index a1966685f63b..5119f02cd35e 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/BrokerResponse.java @@ -51,16 +51,6 @@ public interface BrokerResponse { */ void setTimeUsedMs(long timeUsedMs); - /** - * Set the total thread cpu time used against realtime table in request handling, into the broker response. - */ - void setRealtimeThreadCpuTimeNs(long realtimeThreadCpuTimeNs); - - /** - * Set the total thread cpu time used against offline table in request handling, into the broker response. - */ - void setOfflineThreadCpuTimeNs(long offlineThreadCpuTimeNs); - /** * Set the total number of rows in result set */ @@ -143,17 +133,91 @@ String toJsonString() List getProcessingExceptions(); /** - * Get the total thread cpu time used against realtime table in request handling, into the broker response. + * Get the total number of rows in result set */ - long getRealtimeThreadCpuTimeNs(); + int getNumRowsResultSet(); /** - * Get the total thread cpu time used against offline table in request handling, into the broker response. + * Set the total thread cpu time used against offline table in request handling, into the broker response. + */ + void setOfflineThreadCpuTimeNs(long offlineThreadCpuTimeNs); + + /** + * Get the thread cpu time used against offline table in request handling, from the broker response. */ long getOfflineThreadCpuTimeNs(); /** - * Get the total number of rows in result set + * Get the thread cpu time used against realtime table in request handling, from the broker response. */ - int getNumRowsResultSet(); + long getRealtimeThreadCpuTimeNs(); + + /** + * Set the total thread cpu time used against realtime table in request handling, into the broker response. + */ + void setRealtimeThreadCpuTimeNs(long realtimeThreadCpuTimeNs); + + /** + * Get the system activities cpu time used against offline table in request handling, from the broker response. + */ + long getOfflineSystemActivitiesCpuTimeNs(); + + /** + * Set the system activities cpu time used against offline table in request handling, into the broker response. + */ + void setOfflineSystemActivitiesCpuTimeNs(long offlineSystemActivitiesCpuTimeNs); + + /** + * Get the system activities cpu time used against realtime table in request handling, from the broker response. + */ + long getRealtimeSystemActivitiesCpuTimeNs(); + + /** + * Set the system activities cpu time used against realtime table in request handling, into the broker response. + */ + void setRealtimeSystemActivitiesCpuTimeNs(long realtimeSystemActivitiesCpuTimeNs); + + /** + * Get the response serialization cpu time used against offline table in request handling, from the broker response. + */ + long getOfflineResponseSerializationCpuTimeNs(); + + /** + * Set the response serialization cpu time used against offline table in request handling, into the broker response. + */ + void setOfflineResponseSerializationCpuTimeNs(long offlineResponseSerializationCpuTimeNs); + + /** + * Get the response serialization cpu time used against realtime table in request handling, from the broker response. + */ + long getRealtimeResponseSerializationCpuTimeNs(); + + /** + * Set the response serialization cpu time used against realtime table in request handling, into the broker response. + */ + void setRealtimeResponseSerializationCpuTimeNs(long realtimeResponseSerializationCpuTimeNs); + + /** + * Get the total cpu time(thread cpu time + system activities cpu time + response serialization cpu time) used + * against offline table in request handling, from the broker response. + */ + long getOfflineTotalCpuTimeNs(); + + /** + * Set the total cpu time(thread cpu time + system activities cpu time + response serialization cpu time) used + * against offline table in request handling, into the broker response. + */ + void setOfflineTotalCpuTimeNs(long offlineTotalCpuTimeNs); + + /** + * Get the total cpu time(thread cpu time + system activities cpu time + response serialization cpu time) used + * against realtime table in request handling, from the broker response. + */ + long getRealtimeTotalCpuTimeNs(); + + /** + * Set the total cpu time(thread cpu time + system activities cpu time + response serialization cpu time) used + * against realtime table in request handling, into the broker response. + */ + void setRealtimeTotalCpuTimeNs(long realtimeTotalCpuTimeNs); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java index c6fc91ca35ef..665fb29339f5 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java @@ -43,7 +43,10 @@ "selectionResults", "aggregationResults", "resultTable", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", - "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "segmentStatistics", "traceInfo" + "offlineThreadCpuTimeNs", "realtimeThreadCpuTimeNs", "offlineSystemActivitiesCpuTimeNs", + "realtimeSystemActivitiesCpuTimeNs", "offlineResponseSerializationCpuTimeNs", + "realtimeResponseSerializationCpuTimeNs", "offlineTotalCpuTimeNs", "realtimeTotalCpuTimeNs", "segmentStatistics", + "traceInfo" }) public class BrokerResponseNative implements BrokerResponse { public static final BrokerResponseNative EMPTY_RESULT = BrokerResponseNative.empty(); @@ -70,12 +73,16 @@ public class BrokerResponseNative implements BrokerResponse { private long _timeUsedMs = 0L; private long _offlineThreadCpuTimeNs = 0L; private long _realtimeThreadCpuTimeNs = 0L; + private long _offlineSystemActivitiesCpuTimeNs = 0L; + private long _realtimeSystemActivitiesCpuTimeNs = 0L; + private long _offlineResponseSerializationCpuTimeNs = 0L; + private long _realtimeResponseSerializationCpuTimeNs = 0L; + private long _offlineTotalCpuTimeNs = 0L; + private long _realtimeTotalCpuTimeNs = 0L; private int _numRowsResultSet = 0; - private SelectionResults _selectionResults; private List _aggregationResults; private ResultTable _resultTable; - private Map _traceInfo = new HashMap<>(); private List _processingExceptions = new ArrayList<>(); private List _segmentStatistics = new ArrayList<>(); @@ -100,6 +107,107 @@ public static BrokerResponseNative empty() { return new BrokerResponseNative(); } + public static BrokerResponseNative fromJsonString(String jsonString) + throws IOException { + return JsonUtils.stringToObject(jsonString, BrokerResponseNative.class); + } + + @JsonProperty("offlineSystemActivitiesCpuTimeNs") + @Override + public long getOfflineSystemActivitiesCpuTimeNs() { + return _offlineSystemActivitiesCpuTimeNs; + } + + @JsonProperty("offlineSystemActivitiesCpuTimeNs") + @Override + public void setOfflineSystemActivitiesCpuTimeNs(long offlineSystemActivitiesCpuTimeNs) { + _offlineSystemActivitiesCpuTimeNs = offlineSystemActivitiesCpuTimeNs; + } + + @JsonProperty("realtimeSystemActivitiesCpuTimeNs") + @Override + public long getRealtimeSystemActivitiesCpuTimeNs() { + return _realtimeSystemActivitiesCpuTimeNs; + } + + @JsonProperty("realtimeSystemActivitiesCpuTimeNs") + @Override + public void setRealtimeSystemActivitiesCpuTimeNs(long realtimeSystemActivitiesCpuTimeNs) { + _realtimeSystemActivitiesCpuTimeNs = realtimeSystemActivitiesCpuTimeNs; + } + + @JsonProperty("offlineThreadCpuTimeNs") + @Override + public long getOfflineThreadCpuTimeNs() { + return _offlineThreadCpuTimeNs; + } + + @JsonProperty("offlineThreadCpuTimeNs") + @Override + public void setOfflineThreadCpuTimeNs(long timeUsedMs) { + _offlineThreadCpuTimeNs = timeUsedMs; + } + + @JsonProperty("realtimeThreadCpuTimeNs") + @Override + public long getRealtimeThreadCpuTimeNs() { + return _realtimeThreadCpuTimeNs; + } + + @JsonProperty("realtimeThreadCpuTimeNs") + @Override + public void setRealtimeThreadCpuTimeNs(long timeUsedMs) { + _realtimeThreadCpuTimeNs = timeUsedMs; + } + + @JsonProperty("offlineResponseSerializationCpuTimeNs") + @Override + public long getOfflineResponseSerializationCpuTimeNs() { + return _offlineResponseSerializationCpuTimeNs; + } + + @JsonProperty("offlineResponseSerializationCpuTimeNs") + @Override + public void setOfflineResponseSerializationCpuTimeNs(long offlineResponseSerializationCpuTimeNs) { + _offlineResponseSerializationCpuTimeNs = offlineResponseSerializationCpuTimeNs; + } + + @JsonProperty("realtimeResponseSerializationCpuTimeNs") + @Override + public long getRealtimeResponseSerializationCpuTimeNs() { + return _realtimeResponseSerializationCpuTimeNs; + } + + @JsonProperty("realtimeResponseSerializationCpuTimeNs") + @Override + public void setRealtimeResponseSerializationCpuTimeNs(long realtimeResponseSerializationCpuTimeNs) { + _realtimeResponseSerializationCpuTimeNs = realtimeResponseSerializationCpuTimeNs; + } + + @JsonProperty("offlineTotalCpuTimeNs") + @Override + public long getOfflineTotalCpuTimeNs() { + return _offlineTotalCpuTimeNs; + } + + @JsonProperty("offlineTotalCpuTimeNs") + @Override + public void setOfflineTotalCpuTimeNs(long offlineTotalCpuTimeNs) { + _offlineTotalCpuTimeNs = offlineTotalCpuTimeNs; + } + + @JsonProperty("realtimeTotalCpuTimeNs") + @Override + public long getRealtimeTotalCpuTimeNs() { + return _realtimeTotalCpuTimeNs; + } + + @JsonProperty("realtimeTotalCpuTimeNs") + @Override + public void setRealtimeTotalCpuTimeNs(long realtimeTotalCpuTimeNs) { + _realtimeTotalCpuTimeNs = realtimeTotalCpuTimeNs; + } + @JsonProperty("selectionResults") @JsonInclude(JsonInclude.Include.NON_NULL) public SelectionResults getSelectionResults() { @@ -288,42 +396,18 @@ public void setTimeUsedMs(long timeUsedMs) { _timeUsedMs = timeUsedMs; } - @JsonProperty("offlineThreadCpuTimeNs") - @Override - public long getOfflineThreadCpuTimeNs() { - return _offlineThreadCpuTimeNs; - } - @JsonProperty("numRowsResultSet") @Override public int getNumRowsResultSet() { return _numRowsResultSet; } - @JsonProperty("offlineThreadCpuTimeNs") - @Override - public void setOfflineThreadCpuTimeNs(long timeUsedMs) { - _offlineThreadCpuTimeNs = timeUsedMs; - } - @JsonProperty("numRowsResultSet") @Override public void setNumRowsResultSet(int numRowsResultSet) { _numRowsResultSet = numRowsResultSet; } - @JsonProperty("realtimeThreadCpuTimeNs") - @Override - public long getRealtimeThreadCpuTimeNs() { - return _realtimeThreadCpuTimeNs; - } - - @JsonProperty("realtimeThreadCpuTimeNs") - @Override - public void setRealtimeThreadCpuTimeNs(long timeUsedMs) { - _realtimeThreadCpuTimeNs = timeUsedMs; - } - @JsonProperty("segmentStatistics") public List getSegmentStatistics() { return _segmentStatistics; @@ -350,11 +434,6 @@ public String toJsonString() return JsonUtils.objectToString(this); } - public static BrokerResponseNative fromJsonString(String jsonString) - throws IOException { - return JsonUtils.stringToObject(jsonString, BrokerResponseNative.class); - } - @JsonIgnore @Override public void setExceptions(List exceptions) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java index 8e4abd8aba08..9bea554c652d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java @@ -99,7 +99,9 @@ enum MetadataKey { REQUEST_ID("requestId", MetadataValueType.LONG), NUM_RESIZES("numResizes", MetadataValueType.INT), RESIZE_TIME_MS("resizeTimeMs", MetadataValueType.LONG), - THREAD_CPU_TIME_NS("threadCpuTimeNs", MetadataValueType.LONG); + THREAD_CPU_TIME_NS("threadCpuTimeNs", MetadataValueType.LONG), + SYSTEM_ACTIVITIES_CPU_TIME_NS("systemActivitiesCpuTimeNs", MetadataValueType.LONG), + RESPONSE_SER_CPU_TIME_NS("responseSerializationCpuTimeNs", MetadataValueType.LONG); private static final Map NAME_TO_ENUM_KEY_MAP = new HashMap<>(); private final String _name; diff --git a/pinot-controller/src/main/resources/app/interfaces/types.d.ts b/pinot-controller/src/main/resources/app/interfaces/types.d.ts index f79cc243022f..a7a446a9b462 100644 --- a/pinot-controller/src/main/resources/app/interfaces/types.d.ts +++ b/pinot-controller/src/main/resources/app/interfaces/types.d.ts @@ -121,6 +121,12 @@ declare module 'Models' { minConsumingFreshnessTimeMs: number offlineThreadCpuTimeNs: number realtimeThreadCpuTimeNs: number + offlineSystemActivitiesCpuTimeNs: number + realtimeSystemActivitiesCpuTimeNs: number + offlineResponseSerializationCpuTimeNs: number + realtimeResponseSerializationCpuTimeNs: number + offlineTotalCpuTimeNs: number + realtimeTotalCpuTimeNs: number }; export type ClusterName = { diff --git a/pinot-controller/src/main/resources/app/pages/Query.tsx b/pinot-controller/src/main/resources/app/pages/Query.tsx index 77f658ee00f2..b5fd74793068 100644 --- a/pinot-controller/src/main/resources/app/pages/Query.tsx +++ b/pinot-controller/src/main/resources/app/pages/Query.tsx @@ -138,7 +138,13 @@ const responseStatCols = [ 'partialResponse', 'minConsumingFreshnessTimeMs', 'offlineThreadCpuTimeNs', - 'realtimeThreadCpuTimeNs' + 'realtimeThreadCpuTimeNs', + 'offlineSystemActivitiesCpuTimeNs', + 'realtimeSystemActivitiesCpuTimeNs', + 'offlineResponseSerializationCpuTimeNs', + 'realtimeResponseSerializationCpuTimeNs', + 'offlineTotalCpuTimeNs', + 'realtimeTotalCpuTimeNs' ]; const QueryPage = () => { @@ -189,7 +195,7 @@ const QueryPage = () => { const handleOutputDataChange = (editor, data, value) => { setInputQuery(value); }; - + const handleQueryInterfaceKeyDown = (editor, event) => { // Map Cmd + Enter KeyPress to executing the query if (event.metaKey == true && event.keyCode == 13) { diff --git a/pinot-controller/src/main/resources/app/utils/PinotMethodUtils.ts b/pinot-controller/src/main/resources/app/utils/PinotMethodUtils.ts index 468852dee5c4..8634a112068b 100644 --- a/pinot-controller/src/main/resources/app/utils/PinotMethodUtils.ts +++ b/pinot-controller/src/main/resources/app/utils/PinotMethodUtils.ts @@ -185,7 +185,7 @@ const getClusterConfigJSON = () => { // Expected Output: {columns: [], records: []} const getQueryTablesList = ({bothType = false}) => { const promiseArr = bothType ? [getQueryTables('realtime'), getQueryTables('offline')] : [getQueryTables()]; - + return Promise.all(promiseArr).then((results) => { const responseObj = { columns: ['Tables'], @@ -295,7 +295,14 @@ const getQueryResults = (params, url, checkedOptions) => { 'partialResponse', 'minConsumingFreshnessTimeMs', 'offlineThreadCpuTimeNs', - 'realtimeThreadCpuTimeNs']; + 'realtimeThreadCpuTimeNs', + 'offlineSystemActivitiesCpuTimeNs', + 'realtimeSystemActivitiesCpuTimeNs', + 'offlineResponseSerializationCpuTimeNs', + 'realtimeResponseSerializationCpuTimeNs', + 'offlineTotalCpuTimeNs', + 'realtimeTotalCpuTimeNs' + ]; return { result: { @@ -308,7 +315,10 @@ const getQueryResults = (params, url, checkedOptions) => { queryResponse.numSegmentsQueried, queryResponse.numSegmentsProcessed, queryResponse.numSegmentsMatched, queryResponse.numConsumingSegmentsQueried, queryResponse.numEntriesScannedInFilter, queryResponse.numEntriesScannedPostFilter, queryResponse.numGroupsLimitReached, queryResponse.partialResponse ? queryResponse.partialResponse : '-', queryResponse.minConsumingFreshnessTimeMs, - queryResponse.offlineThreadCpuTimeNs, queryResponse.realtimeThreadCpuTimeNs]] + queryResponse.offlineThreadCpuTimeNs, queryResponse.realtimeThreadCpuTimeNs, + queryResponse.offlineSystemActivitiesCpuTimeNs, queryResponse.realtimeSystemActivitiesCpuTimeNs, + queryResponse.offlineResponseSerializationCpuTimeNs, queryResponse.realtimeResponseSerializationCpuTimeNs, + queryResponse.offlineTotalCpuTimeNs, queryResponse.realtimeTotalCpuTimeNs]] }, data: queryResponse, }; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java index f88607592884..930c1dd5d64b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java @@ -192,6 +192,26 @@ public byte[] toBytes() ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + writeLeadingSections(dataOutputStream); + + // Add table serialization time metadata if thread timer is enabled. + if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) { + long responseSerializationCpuTimeNs = threadTimer.stopAndGetThreadTimeNs(); + getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), String.valueOf(responseSerializationCpuTimeNs)); + } + + // Write metadata: length followed by actual metadata bytes. + // NOTE: We ignore metadata serialization time in "responseSerializationCpuTimeNs" as it's negligible while + // considering it will bring a lot code complexity. + byte[] metadataBytes = serializeMetadata(); + dataOutputStream.writeInt(metadataBytes.length); + dataOutputStream.write(metadataBytes); + + return byteArrayOutputStream.toByteArray(); + } + + private void writeLeadingSections(DataOutputStream dataOutputStream) + throws IOException { dataOutputStream.writeInt(DataTableBuilder.VERSION_3); dataOutputStream.writeInt(_numRows); dataOutputStream.writeInt(_numColumns); @@ -262,22 +282,6 @@ public byte[] toBytes() if (_variableSizeDataBytes != null) { dataOutputStream.write(_variableSizeDataBytes); } - - // Update the value of "threadCpuTimeNs" to account data table serialization time. - long responseSerializationCpuTimeNs = threadTimer.stopAndGetThreadTimeNs(); - // TODO: currently log/emit a total thread cpu time for query execution time and data table serialization time. - // Figure out a way to log/emit separately. Probably via providing an API on the DataTable to get/set query - // context, which is supposed to be used at server side only. - long threadCpuTimeNs = Long.parseLong(getMetadata().getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(), "0")) - + responseSerializationCpuTimeNs; - getMetadata().put(MetadataKey.THREAD_CPU_TIME_NS.getName(), String.valueOf(threadCpuTimeNs)); - - // Write metadata: length followed by actual metadata bytes. - byte[] metadataBytes = serializeMetadata(); - dataOutputStream.writeInt(metadataBytes.length); - dataOutputStream.write(metadataBytes); - - return byteArrayOutputStream.toByteArray(); } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java index a60ce70036eb..a6b863071894 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/InstanceResponseOperator.java @@ -19,6 +19,7 @@ package org.apache.pinot.core.operator; import java.util.List; +import java.util.Map; import org.apache.pinot.common.utils.DataTable.MetadataKey; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; @@ -44,14 +45,40 @@ public InstanceResponseOperator(BaseCombineOperator combinedOperator, List responseMetaData = instanceResponseBlock.getInstanceResponseDataTable().getMetadata(); + responseMetaData.put(MetadataKey.THREAD_CPU_TIME_NS.getName(), String.valueOf(threadCpuTimeNs)); + responseMetaData + .put(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), String.valueOf(systemActivitiesCpuTimeNs)); - instanceResponseBlock.getInstanceResponseDataTable().getMetadata() - .put(MetadataKey.THREAD_CPU_TIME_NS.getName(), String.valueOf(totalThreadCpuTimeNs)); return instanceResponseBlock; } else { return new InstanceResponseBlock(getCombinedResults()); @@ -79,36 +111,6 @@ private IntermediateResultsBlock getCombinedResults() { } } - /* - * Calculate totalThreadCpuTimeNs based on totalWallClockTimeNs, multipleThreadCpuTimeNs, and numServerThreads. - * System activities time such as OS paging, GC, context switching are not captured by totalThreadCpuTimeNs. - * For example, let's divide query processing into 4 phases: - * - phase 1: single thread preparing. Time used: T1 - * - phase 2: N threads processing segments in parallel, each thread use time T2 - * - phase 3: GC/OS paging. Time used: T3 - * - phase 4: single thread merging intermediate results blocks. Time used: T4 - * - * Then we have following equations: - * - singleThreadCpuTimeNs = T1 + T4 - * - multipleThreadCpuTimeNs = T2 * N - * - totalWallClockTimeNs = T1 + T2 + T3 + T4 = singleThreadCpuTimeNs + T2 + T3 - * - totalThreadCpuTimeNsWithoutSystemActivities = T1 + T2 * N + T4 = singleThreadCpuTimeNs + T2 * N - * - systemActivitiesTimeNs = T3 = (totalWallClockTimeNs - totalThreadCpuTimeNsWithoutSystemActivities) + T2 * (N - 1) - * - * Thus: - * totalThreadCpuTimeNsWithSystemActivities = totalThreadCpuTimeNsWithoutSystemActivities + systemActivitiesTimeNs - * = totalThreadCpuTimeNsWithoutSystemActivities + T3 - * = totalThreadCpuTimeNsWithoutSystemActivities + (totalWallClockTimeNs - - * totalThreadCpuTimeNsWithoutSystemActivities) + T2 * (N - 1) - * = totalWallClockTimeNs + T2 * (N - 1) - * = totalWallClockTimeNs + (multipleThreadCpuTimeNs / N) * (N - 1) - */ - public static long calTotalThreadCpuTimeNs(long totalWallClockTimeNs, long multipleThreadCpuTimeNs, - int numServerThreads) { - double perThreadCpuTimeNs = multipleThreadCpuTimeNs * 1.0 / numServerThreads; - return Math.round(totalWallClockTimeNs + perThreadCpuTimeNs * (numServerThreads - 1)); - } - @Override public String getOperatorName() { return OPERATOR_NAME; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java index 77aae3df86cd..fa22e54ac1f2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java @@ -140,6 +140,13 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, long numTotalDocs = 0L; long offlineThreadCpuTimeNs = 0L; long realtimeThreadCpuTimeNs = 0L; + long offlineSystemActivitiesCpuTimeNs = 0L; + long realtimeSystemActivitiesCpuTimeNs = 0L; + long offlineResponseSerializationCpuTimeNs = 0L; + long realtimeResponseSerializationCpuTimeNs = 0L; + long offlineTotalCpuTimeNs = 0L; + long realtimeTotalCpuTimeNs = 0L; + boolean numGroupsLimitReached = false; PinotQuery pinotQuery = brokerRequest.getPinotQuery(); @@ -217,6 +224,28 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, } } + String systemActivitiesCpuTimeNsString = metadata.get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName()); + if (systemActivitiesCpuTimeNsString != null) { + if (entry.getKey().getTableType() == TableType.OFFLINE) { + offlineSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString); + } else { + realtimeSystemActivitiesCpuTimeNs += Long.parseLong(systemActivitiesCpuTimeNsString); + } + } + + String responseSerializationCpuTimeNsString = metadata.get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()); + if (responseSerializationCpuTimeNsString != null) { + if (entry.getKey().getTableType() == TableType.OFFLINE) { + offlineResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString); + } else { + realtimeResponseSerializationCpuTimeNs += Long.parseLong(responseSerializationCpuTimeNsString); + } + } + offlineTotalCpuTimeNs = + offlineThreadCpuTimeNs + offlineSystemActivitiesCpuTimeNs + offlineResponseSerializationCpuTimeNs; + realtimeTotalCpuTimeNs = + realtimeThreadCpuTimeNs + realtimeSystemActivitiesCpuTimeNs + realtimeResponseSerializationCpuTimeNs; + String numTotalDocsString = metadata.get(MetadataKey.TOTAL_DOCS.getName()); if (numTotalDocsString != null) { numTotalDocs += Long.parseLong(numTotalDocsString); @@ -251,6 +280,12 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, brokerResponseNative.setNumGroupsLimitReached(numGroupsLimitReached); brokerResponseNative.setOfflineThreadCpuTimeNs(offlineThreadCpuTimeNs); brokerResponseNative.setRealtimeThreadCpuTimeNs(realtimeThreadCpuTimeNs); + brokerResponseNative.setOfflineSystemActivitiesCpuTimeNs(offlineSystemActivitiesCpuTimeNs); + brokerResponseNative.setRealtimeSystemActivitiesCpuTimeNs(realtimeSystemActivitiesCpuTimeNs); + brokerResponseNative.setOfflineResponseSerializationCpuTimeNs(offlineResponseSerializationCpuTimeNs); + brokerResponseNative.setRealtimeResponseSerializationCpuTimeNs(realtimeResponseSerializationCpuTimeNs); + brokerResponseNative.setOfflineTotalCpuTimeNs(offlineTotalCpuTimeNs); + brokerResponseNative.setRealtimeTotalCpuTimeNs(realtimeTotalCpuTimeNs); if (numConsumingSegmentsProcessed > 0) { brokerResponseNative.setNumConsumingSegmentsQueried(numConsumingSegmentsProcessed); brokerResponseNative.setMinConsumingFreshnessTimeMs(minConsumingFreshnessTimeMs); @@ -261,14 +296,27 @@ public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest, String rawTableName = TableNameBuilder.extractRawTableName(tableName); if (brokerMetrics != null) { brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.DOCUMENTS_SCANNED, numDocsScanned); - brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, - numEntriesScannedInFilter); - brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, - numEntriesScannedPostFilter); + brokerMetrics + .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_IN_FILTER, numEntriesScannedInFilter); + brokerMetrics + .addMeteredTableValue(rawTableName, BrokerMeter.ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter); brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_THREAD_CPU_TIME_NS, offlineThreadCpuTimeNs, TimeUnit.NANOSECONDS); brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_THREAD_CPU_TIME_NS, realtimeThreadCpuTimeNs, TimeUnit.NANOSECONDS); + brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_SYSTEM_ACTIVITIES_CPU_TIME_NS, + offlineSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS); + brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_SYSTEM_ACTIVITIES_CPU_TIME_NS, + realtimeSystemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS); + brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_RESPONSE_SER_CPU_TIME_NS, + offlineResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS); + brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_RESPONSE_SER_CPU_TIME_NS, + realtimeResponseSerializationCpuTimeNs, TimeUnit.NANOSECONDS); + brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.OFFLINE_TOTAL_CPU_TIME_NS, offlineTotalCpuTimeNs, + TimeUnit.NANOSECONDS); + brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.REALTIME_TOTAL_CPU_TIME_NS, realtimeTotalCpuTimeNs, + TimeUnit.NANOSECONDS); + if (numConsumingSegmentsProcessed > 0 && minConsumingFreshnessTimeMs > 0) { brokerMetrics.addTimedTableValue(rawTableName, BrokerTimer.FRESHNESS_LAG_MS, System.currentTimeMillis() - minConsumingFreshnessTimeMs, TimeUnit.MILLISECONDS); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java index 3ed9b9026e0c..07cc18b2421b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java @@ -188,6 +188,11 @@ protected byte[] processQueryAndSerialize(@Nonnull ServerQueryRequest queryReque Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.RESIZE_TIME_MS.getName(), INVALID_RESIZE_TIME_MS)); long threadCpuTimeNs = Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.THREAD_CPU_TIME_NS.getName(), "0")); + long systemActivitiesCpuTimeNs = + Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName(), "0")); + long responseSerializationCpuTimeNs = + Long.parseLong(dataTableMetadata.getOrDefault(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), "0")); + long totalCpuTimeNs = threadCpuTimeNs + systemActivitiesCpuTimeNs + responseSerializationCpuTimeNs; if (numDocsScanned > 0) { _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_DOCS_SCANNED, numDocsScanned); @@ -210,6 +215,18 @@ protected byte[] processQueryAndSerialize(@Nonnull ServerQueryRequest queryReque _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.EXECUTION_THREAD_CPU_TIME_NS, threadCpuTimeNs, TimeUnit.NANOSECONDS); } + if (systemActivitiesCpuTimeNs > 0) { + _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.SYSTEM_ACTIVITIES_CPU_TIME_NS, + systemActivitiesCpuTimeNs, TimeUnit.NANOSECONDS); + } + if (responseSerializationCpuTimeNs > 0) { + _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.RESPONSE_SER_CPU_TIME_NS, + responseSerializationCpuTimeNs, TimeUnit.NANOSECONDS); + } + if (totalCpuTimeNs > 0) { + _serverMetrics.addTimedTableValue(tableNameWithType, ServerTimer.TOTAL_CPU_TIME_NS, totalCpuTimeNs, + TimeUnit.NANOSECONDS); + } TimerContext timerContext = queryRequest.getTimerContext(); int numSegmentsQueried = queryRequest.getSegmentsToQuery().size(); @@ -221,14 +238,15 @@ protected byte[] processQueryAndSerialize(@Nonnull ServerQueryRequest queryReque LOGGER.info("Processed requestId={},table={},segments(queried/processed/matched/consuming)={}/{}/{}/{}," + "schedulerWaitMs={},reqDeserMs={},totalExecMs={},resSerMs={},totalTimeMs={}," + "minConsumingFreshnessMs={},broker={}," - + "numDocsScanned={},scanInFilter={},scanPostFilter={},sched={},threadCpuTimeNs={}", requestId, + + "numDocsScanned={},scanInFilter={},scanPostFilter={},sched={}," + + "threadCpuTimeNs(total/thread/sysActivity/resSer)={}/{}/{}/{}", requestId, tableNameWithType, numSegmentsQueried, numSegmentsProcessed, numSegmentsMatched, numSegmentsConsuming, schedulerWaitMs, timerContext.getPhaseDurationMs(ServerQueryPhase.REQUEST_DESERIALIZATION), timerContext.getPhaseDurationMs(ServerQueryPhase.QUERY_PROCESSING), timerContext.getPhaseDurationMs(ServerQueryPhase.RESPONSE_SERIALIZATION), timerContext.getPhaseDurationMs(ServerQueryPhase.TOTAL_QUERY_TIME), minConsumingFreshnessMs, queryRequest.getBrokerId(), numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter, name(), - threadCpuTimeNs); + totalCpuTimeNs, threadCpuTimeNs, systemActivitiesCpuTimeNs, responseSerializationCpuTimeNs); // Limit the dropping log message at most once per second. if (_numDroppedLogRateLimiter.tryAcquire()) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java index 9dc06b2c7b8a..00b864886b74 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/common/datatable/DataTableSerDeTest.java @@ -202,7 +202,9 @@ public void testV2V3Compatibility() Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0); Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); - // Verify V3 broker can deserialize (has data, but has no metadata) send by V3 server. + // Verify V3 broker can deserialize (has data, but has no metadata) send by V3 server(with ThreadCpuTimeMeasurement + // disabled) + ThreadTimer.setThreadCpuTimeMeasurementEnabled(false); DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3); DataTableBuilder dataTableBuilderV3WithDataOnly = new DataTableBuilder(dataSchema); fillDataTableWithRandomData(dataTableBuilderV3WithDataOnly, columnDataTypes, numColumns); @@ -212,22 +214,22 @@ public void testV2V3Compatibility() Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - // DataTable V3 serialization logic will add an extra THREAD_CPU_TIME_NS KV pair into metadata - Assert.assertEquals(newDataTable.getMetadata().size(), 1); - Assert.assertTrue(newDataTable.getMetadata().containsKey(MetadataKey.THREAD_CPU_TIME_NS.getName())); + Assert.assertEquals(newDataTable.getMetadata().size(), 0); - // Verify V3 broker can deserialize data table (has data and metadata) send by V3 server + // Verify V3 broker can deserialize data table (has data and metadata) send by V3 server(with + // ThreadCpuTimeMeasurement disabled) for (String key : EXPECTED_METADATA.keySet()) { dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key)); } + // Deserialize data table bytes as V3 newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3 Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); verifyDataIsSame(newDataTable, columnDataTypes, numColumns); - newDataTable.getMetadata().remove(MetadataKey.THREAD_CPU_TIME_NS.getName()); Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); - // Verify V3 broker can deserialize data table (only has metadata) send by V3 server + // Verify V3 broker can deserialize data table (only has metadata) send by V3 server(with + // ThreadCpuTimeMeasurement disabled) DataTableBuilder dataTableBuilderV3WithMetadataDataOnly = new DataTableBuilder(dataSchema); dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a V3 data table for (String key : EXPECTED_METADATA.keySet()) { @@ -236,7 +238,50 @@ public void testV2V3Compatibility() newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3 Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0); - newDataTable.getMetadata().remove(MetadataKey.THREAD_CPU_TIME_NS.getName()); + Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); + + // Verify V3 broker can deserialize (has data, but has no metadata) send by V3 server(with ThreadCpuTimeMeasurement + // enabled) + ThreadTimer.setThreadCpuTimeMeasurementEnabled(true); + DataTableBuilder.setCurrentDataTableVersion(DataTableBuilder.VERSION_3); + dataTableV3 = dataTableBuilderV3WithDataOnly.build(); // create a V3 data table + // Deserialize data table bytes as V3 + newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); + Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); + Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); + verifyDataIsSame(newDataTable, columnDataTypes, numColumns); + Assert.assertEquals(newDataTable.getMetadata().size(), 1); + Assert.assertTrue(newDataTable.getMetadata().containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName())); + + // Verify V3 broker can deserialize data table (has data and metadata) send by V3 server(with + // ThreadCpuTimeMeasurement enabled) + for (String key : EXPECTED_METADATA.keySet()) { + dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key)); + } + // Deserialize data table bytes as V3 + newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3 + Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); + Assert.assertEquals(newDataTable.getNumberOfRows(), NUM_ROWS, ERROR_MESSAGE); + verifyDataIsSame(newDataTable, columnDataTypes, numColumns); + if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) { + Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1); + newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()); + } + Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); + + // Verify V3 broker can deserialize data table (only has metadata) send by V3 server(with + // ThreadCpuTimeMeasurement enabled) + dataTableV3 = dataTableBuilderV3WithMetadataDataOnly.build(); // create a V3 data table + for (String key : EXPECTED_METADATA.keySet()) { + dataTableV3.getMetadata().put(key, EXPECTED_METADATA.get(key)); + } + newDataTable = DataTableFactory.getDataTable(dataTableV3.toBytes()); // Broker deserialize data table bytes as V3 + Assert.assertEquals(newDataTable.getDataSchema(), dataSchema, ERROR_MESSAGE); + Assert.assertEquals(newDataTable.getNumberOfRows(), 0, 0); + if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) { + Assert.assertEquals(newDataTable.getMetadata().size(), EXPECTED_METADATA.keySet().size() + 1); + newDataTable.getMetadata().remove(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName()); + } Assert.assertEquals(newDataTable.getMetadata(), EXPECTED_METADATA); } @@ -255,15 +300,24 @@ public void testExecutionThreadCpuTimeNs() fillDataTableWithRandomData(dataTableBuilder, columnDataTypes, numColumns); DataTable dataTable = dataTableBuilder.build(); + + // Disable ThreadCpuTimeMeasurement, serialize/de-serialize data table. + ThreadTimer.setThreadCpuTimeMeasurementEnabled(false); DataTable newDataTable = DataTableFactory.getDataTable(dataTable.toBytes()); - // When ThreadCpuTimeMeasurement is disabled, value of threadCpuTimeNs is 0. - Assert.assertEquals(newDataTable.getMetadata().get(MetadataKey.THREAD_CPU_TIME_NS.getName()), String.valueOf(0)); + // When ThreadCpuTimeMeasurement is disabled, no value for + // threadCpuTimeNs/systemActivitiesCpuTimeNs/responseSerializationCpuTimeNs. + Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.THREAD_CPU_TIME_NS.getName())); + Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName())); + Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName())); - // Enable ThreadCpuTimeMeasurement, serialize/de-serialize data table again. + // Enable ThreadCpuTimeMeasurement, serialize/de-serialize data table. ThreadTimer.setThreadCpuTimeMeasurementEnabled(true); newDataTable = DataTableFactory.getDataTable(dataTable.toBytes()); - // When ThreadCpuTimeMeasurement is enabled, value of threadCpuTimeNs is not 0. - Assert.assertNotEquals(newDataTable.getMetadata().get(MetadataKey.THREAD_CPU_TIME_NS.getName()), String.valueOf(0)); + // When ThreadCpuTimeMeasurement is enabled, value of responseSerializationCpuTimeNs is not 0. + Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.THREAD_CPU_TIME_NS.getName())); + Assert.assertNull(newDataTable.getMetadata().get(MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS.getName())); + Assert.assertTrue( + Integer.parseInt(newDataTable.getMetadata().get(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName())) > 0); } @Test @@ -311,8 +365,8 @@ public void testDataTableMetadataBytesLayout() try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(metadataBytes); DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) { int numEntries = dataInputStream.readInt(); - // DataTable V3 serialization logic will add an extra THREAD_CPU_TIME_NS KV pair into metadata - Assert.assertEquals(numEntries, EXPECTED_METADATA.size() + 1); + // DataTable V3 serialization logic will add an extra RESPONSE_SER_CPU_TIME_NS KV pair into metadata + Assert.assertEquals(numEntries, EXPECTED_METADATA.size()); for (int i = 0; i < numEntries; i++) { int keyOrdinal = dataInputStream.readInt(); DataTable.MetadataKey key = MetadataKey.getByOrdinal(keyOrdinal); @@ -324,8 +378,10 @@ public void testDataTableMetadataBytesLayout() } else if (key.getValueType() == DataTable.MetadataValueType.LONG) { byte[] actualBytes = new byte[Long.BYTES]; dataInputStream.read(actualBytes); - // Ignore the THREAD_CPU_TIME_NS key since it's added during data serialization. - if (key != MetadataKey.THREAD_CPU_TIME_NS) { + // Ignore the THREAD_CPU_TIME_NS/SYSTEM_ACTIVITIES_CPU_TIME_NS/RESPONSE_SER_CPU_TIME_NS key since their value + // are evaluated during query execution. + if (key != MetadataKey.THREAD_CPU_TIME_NS && key != MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS + && key != MetadataKey.RESPONSE_SER_CPU_TIME_NS) { Assert.assertEquals(actualBytes, Longs.toByteArray(Long.parseLong(EXPECTED_METADATA.get(key.getName())))); } } else { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/ThreadCpuTimeMeasurementTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/ThreadCpuTimeMeasurementTest.java index a771965f390f..543da755cac4 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/operator/ThreadCpuTimeMeasurementTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/ThreadCpuTimeMeasurementTest.java @@ -30,32 +30,36 @@ public void testCalTotalThreadCpuTimeNs() { class TestCase { final long _totalWallClockTimeNs; final long _multipleThreadCpuTimeNs; + final long _singleThreadCpuTimeNs; final int _numServerThreads; - final long _totalThreadCpuTimeNs; + final long _systemActivitiesCpuTimeNs; - TestCase(long totalWallClockTimeNs, long multipleThreadCpuTimeNs, int numServerThreads, - long totalThreadCpuTimeNs) { + TestCase(long totalWallClockTimeNs, long multipleThreadCpuTimeNs, long singleThreadCpuTimeNs, + int numServerThreads, long systemActivitiesCpuTimeNs) { _totalWallClockTimeNs = totalWallClockTimeNs; _multipleThreadCpuTimeNs = multipleThreadCpuTimeNs; + _singleThreadCpuTimeNs = singleThreadCpuTimeNs; _numServerThreads = numServerThreads; - _totalThreadCpuTimeNs = totalThreadCpuTimeNs; + _systemActivitiesCpuTimeNs = systemActivitiesCpuTimeNs; } } - TestCase[] testCases = new TestCase[]{ - new TestCase(4245673, 7124487, 3, 8995331), new TestCase(21500000, 10962161, 2, 26981081), - new TestCase(59000000, 23690790, 1, 59000000), new TestCase(59124358, 11321792, 5, 68181792), - new TestCase(79888780, 35537324, 7, 110349343), new TestCase(915432, 2462128, 4, 2762028) - }; + TestCase[] testCases = + new TestCase[]{new TestCase(4245673, 7124487, 1717171, 3, 153673), new TestCase(21500000, 10962161, 837, 2, + 16018083), new TestCase(59000000, 23690790, 4875647, 1, 30433563), new TestCase(59124358, 11321792, 164646, + 5, 56695354), new TestCase(79888780, 35537324, 16464, 7, 74795555), new TestCase(915432, 2462128, 63383, 4, + 236517)}; for (TestCase testCase : testCases) { long totalWallClockTimeNs = testCase._totalWallClockTimeNs; long multipleThreadCpuTimeNs = testCase._multipleThreadCpuTimeNs; + long singleThreadCpuTimeNs = testCase._singleThreadCpuTimeNs; int numServerThreads = testCase._numServerThreads; - long expectedTotalThreadCpuTimeNs = testCase._totalThreadCpuTimeNs; - long actualTotalThreadCpuTimeNs = InstanceResponseOperator - .calTotalThreadCpuTimeNs(totalWallClockTimeNs, multipleThreadCpuTimeNs, numServerThreads); - Assert.equals(expectedTotalThreadCpuTimeNs, actualTotalThreadCpuTimeNs); + long expectedSystemActivitiesCpuTimeNs = testCase._systemActivitiesCpuTimeNs; + long actualSystemActivitiesCpuTimeNs = InstanceResponseOperator + .calSystemActivitiesCpuTimeNs(totalWallClockTimeNs, multipleThreadCpuTimeNs, singleThreadCpuTimeNs, + numServerThreads); + Assert.equals(expectedSystemActivitiesCpuTimeNs, actualSystemActivitiesCpuTimeNs); } } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 609b26d13375..3643095e6644 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -1856,9 +1856,9 @@ private void testStreamingRequest(Iterator streamingRespo String responseType = streamingResponse.getMetadataMap().get(CommonConstants.Query.Response.MetadataKeys.RESPONSE_TYPE); if (responseType.equals(CommonConstants.Query.Response.ResponseType.DATA)) { - // verify the returned data table metadata only contains "threadCpuTimeNs". + // verify the returned data table metadata only contains "responseSerializationCpuTimeNs". Map metadata = dataTable.getMetadata(); - assertTrue(metadata.size() == 1 && metadata.containsKey(MetadataKey.THREAD_CPU_TIME_NS.getName())); + assertTrue(metadata.size() == 1 && metadata.containsKey(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName())); assertNotNull(dataTable.getDataSchema()); numTotalDocs += dataTable.getNumberOfRows(); } else {