Skip to content

Commit

Permalink
Merge branch 'master' into pr_add_table_tests
Browse files Browse the repository at this point in the history
  • Loading branch information
walterddr authored Nov 28, 2022
2 parents 73ed469 + 6ef4dfc commit 36cab68
Show file tree
Hide file tree
Showing 163 changed files with 7,006 additions and 956 deletions.
4 changes: 4 additions & 0 deletions config/codestyle-intellij.xml
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@
<option name="THROWS_LIST_WRAP" value="1"/>
<option name="THROWS_KEYWORD_WRAP" value="2"/>
<option name="WRAP_COMMENTS" value="true"/>
<JSON>
<option name="OBJECT_WRAPPING" value="5" />
<option name="ARRAY_WRAPPING" value="5" />
</JSON>
<XML>
<option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/>
</XML>
Expand Down
28 changes: 28 additions & 0 deletions kubernetes/helm/index.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,34 @@
apiVersion: v1
entries:
pinot:
- apiVersion: v1
appVersion: 0.2.6
created: "2022-11-23T13:05:57.685715-08:00"
dependencies:
- condition: pinot.zookeeper.enabled,zookeeper.enabled
name: zookeeper
repository: https://charts.bitnami.com/bitnami
version: 9.x.x
description: Apache Pinot is a realtime distributed OLAP datastore, which is used
to deliver scalable real time analytics with low latency. It can ingest data
from offline data sources (such as Hadoop and flat files) as well as online
sources (such as Kafka). Pinot is designed to scale horizontally.
digest: a02cf25577d5cfe6a78c82dbb987e1817fe059d276168933635876467071d402
home: https://pinot.apache.org/
keywords:
- olap
- analytics
- database
- pinot
maintainers:
- email: [email protected]
name: pinot-dev
name: pinot
sources:
- https://github.com/apache/pinot/tree/master/kubernetes/helm
urls:
- pinot-0.2.6.tgz
version: 0.2.6
- apiVersion: v1
appVersion: 0.2.5
created: "2022-03-29T13:13:13.151614-07:00"
Expand Down
Binary file added kubernetes/helm/pinot-0.2.6.tgz
Binary file not shown.
4 changes: 2 additions & 2 deletions kubernetes/helm/pinot/Chart.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
#

apiVersion: v1
appVersion: 0.2.6-SNAPSHOT
appVersion: 0.2.7-SNAPSHOT
name: pinot
description: Apache Pinot is a realtime distributed OLAP datastore, which is used to deliver scalable real time analytics with low latency. It can ingest data from offline data sources (such as Hadoop and flat files) as well as online sources (such as Kafka). Pinot is designed to scale horizontally.
version: 0.2.6-SNAPSHOT
version: 0.2.7-SNAPSHOT
keywords:
- olap
- analytics
Expand Down
34 changes: 34 additions & 0 deletions kubernetes/helm/pinot/templates/broker/ingress.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,38 @@ spec:
{{- end }}

{{- if .Values.broker.ingress.v1.enabled -}}
{{- $ingressPath := .Values.broker.ingress.v1.path -}}
{{- $serviceName := include "pinot.broker.fullname" . -}}
{{- $servicePort := .Values.broker.service.port -}}
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: {{ $serviceName }}
{{- if .Values.broker.ingress.v1.annotations }}
annotations:
{{ toYaml .Values.broker.ingress.v1.annotations | indent 4 }}
{{- end }}
labels:
{{- include "pinot.brokerLabels" . | nindent 4 }}
spec:
{{- if .Values.broker.ingress.v1.ingressClassName }}
ingressClassName: {{ .Values.broker.ingress.v1.ingressClassName }}
{{- end }}
{{- if .Values.broker.ingress.v1.tls }}
tls:
{{ toYaml .Values.broker.ingress.v1.tls | indent 4 }}
{{- end }}
rules:
{{- range .Values.broker.ingress.v1.hosts }}
- host: {{ . }}
http:
paths:
- path: {{ $ingressPath }}
pathType: Prefix
backend:
service:
name: {{ $serviceName }}
port:
number: {{ $servicePort }}
{{- end }}
{{- end }}
34 changes: 34 additions & 0 deletions kubernetes/helm/pinot/templates/controller/ingress.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,38 @@ spec:
{{- end }}

{{- if .Values.controller.ingress.v1.enabled -}}
{{- $ingressPath := .Values.controller.ingress.v1.path -}}
{{- $serviceName := include "pinot.controller.fullname" . -}}
{{- $servicePort := .Values.controller.service.port -}}
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: {{ $serviceName }}
{{- if .Values.controller.ingress.v1.annotations }}
annotations:
{{ toYaml .Values.controller.ingress.v1.annotations | indent 4 }}
{{- end }}
labels:
{{- include "pinot.controllerLabels" . | nindent 4 }}
spec:
{{- if .Values.controller.ingress.v1.ingressClassName }}
ingressClassName: {{ .Values.controller.ingress.v1.ingressClassName }}
{{- end }}
{{- if .Values.controller.ingress.v1.tls }}
tls:
{{ toYaml .Values.controller.ingress.v1.tls | indent 4 }}
{{- end }}
rules:
{{- range .Values.controller.ingress.v1.hosts }}
- host: {{ . }}
http:
paths:
- path: {{ $ingressPath }}
pathType: Prefix
backend:
service:
name: {{ $serviceName }}
port:
number: {{ $servicePort }}
{{- end }}
{{- end }}
12 changes: 11 additions & 1 deletion kubernetes/helm/pinot/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

image:
repository: apachepinot/pinot
tag: latest # release-0.10.0
tag: latest # release-0.11.0
pullPolicy: Always # Use IfNotPresent when you pinged a version of image tag

cluster:
Expand Down Expand Up @@ -131,6 +131,11 @@ controller:
hosts: [ ]
v1:
enabled: false
ingressClassName: ""
annotations: {}
tls: []
path: /
hosts: []

resources:
requests:
Expand Down Expand Up @@ -226,6 +231,11 @@ broker:
hosts: []
v1:
enabled: false
ingressClassName: ""
annotations: {}
tls: []
path: /
hosts: []

resources:
requests:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public static InstanceAssignmentConfig getInstanceAssignmentConfig(TableConfig t

InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig;
SegmentsValidationAndRetentionConfig segmentConfig = tableConfig.getValidationConfig();
int numReplicaGroups = segmentConfig.getReplicationNumber();
int numReplicaGroups = tableConfig.getReplication();
ReplicaGroupStrategyConfig replicaGroupStrategyConfig = segmentConfig.getReplicaGroupStrategyConfig();
Preconditions.checkState(replicaGroupStrategyConfig != null, "Failed to find the replica-group strategy config");
String partitionColumn = replicaGroupStrategyConfig.getPartitionColumn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
import javax.annotation.Nullable;
import org.apache.pinot.common.datatable.DataTableImplV3;
import org.apache.pinot.common.datatable.DataTableUtils;
import org.apache.pinot.common.request.context.ThreadTimer;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;
Expand Down Expand Up @@ -419,7 +419,7 @@ public Map<Integer, String> getExceptions() {
@Override
public byte[] toBytes()
throws IOException {
ThreadTimer threadTimer = new ThreadTimer();
ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public static MetadataBlock getErrorDataBlock(Exception e) {
if (e instanceof ProcessingException) {
return getErrorDataBlock(Collections.singletonMap(((ProcessingException) e).getErrorCode(), errorMessage));
} else {
// TODO: Pass in meaningful error code.
return getErrorDataBlock(Collections.singletonMap(QueryException.UNKNOWN_ERROR_CODE, errorMessage));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.request.context.ThreadTimer;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;

import static java.nio.charset.StandardCharsets.UTF_8;

Expand Down Expand Up @@ -187,15 +187,15 @@ public Map<Integer, String> getExceptions() {
@Override
public byte[] toBytes()
throws IOException {
ThreadTimer threadTimer = new ThreadTimer();
ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider();

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
writeLeadingSections(dataOutputStream);

// Add table serialization time metadata if thread timer is enabled.
if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
long responseSerializationCpuTimeNs = threadTimer.getThreadTimeNs();
if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
long responseSerializationCpuTimeNs = threadResourceUsageProvider.getThreadTimeNs();
getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), String.valueOf(responseSerializationCpuTimeNs));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.request.context.ThreadTimer;
import org.apache.pinot.common.response.ProcessingException;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.RoaringBitmapUtils;
import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
import org.apache.pinot.spi.annotations.InterfaceStability;
import org.apache.pinot.spi.utils.BigDecimalUtils;
import org.apache.pinot.spi.utils.ByteArray;
Expand Down Expand Up @@ -376,14 +376,14 @@ public Map<Integer, String> getExceptions() {
@Override
public byte[] toBytes()
throws IOException {
ThreadTimer threadTimer = new ThreadTimer();
ThreadResourceUsageProvider threadTimer = new ThreadResourceUsageProvider();

ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
writeLeadingSections(dataOutputStream);

// Add table serialization time metadata if thread timer is enabled.
if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) {
if (ThreadResourceUsageProvider.isThreadCpuTimeMeasurementEnabled()) {
long responseSerializationCpuTimeNs = threadTimer.getThreadTimeNs();
getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), String.valueOf(responseSerializationCpuTimeNs));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,24 +96,31 @@ public static void init() {
*/
public static void registerFunction(Method method, boolean nullableParameters) {
registerFunction(method.getName(), method, nullableParameters);

// Calcite ScalarFunctionImpl doesn't allow customized named functions. TODO: fix me.
if (method.getAnnotation(Deprecated.class) == null) {
FUNCTION_MAP.put(method.getName(), ScalarFunctionImpl.create(method));
}
}

/**
* Registers a method with the given function name.
*/
public static void registerFunction(String functionName, Method method, boolean nullableParameters) {
registerFunctionInfoMap(functionName, method, nullableParameters);
registerCalciteNamedFunctionMap(functionName, method, nullableParameters);
}

private static void registerFunctionInfoMap(String functionName, Method method, boolean nullableParameters) {
FunctionInfo functionInfo = new FunctionInfo(method, method.getDeclaringClass(), nullableParameters);
String canonicalName = canonicalize(functionName);
Map<Integer, FunctionInfo> functionInfoMap = FUNCTION_INFO_MAP.computeIfAbsent(canonicalName, k -> new HashMap<>());
Preconditions.checkState(functionInfoMap.put(method.getParameterCount(), functionInfo) == null,
FunctionInfo existFunctionInfo = functionInfoMap.put(method.getParameterCount(), functionInfo);
Preconditions.checkState(existFunctionInfo == null || existFunctionInfo.getMethod() == functionInfo.getMethod(),
"Function: %s with %s parameters is already registered", functionName, method.getParameterCount());
}

private static void registerCalciteNamedFunctionMap(String functionName, Method method, boolean nullableParameters) {
if (method.getAnnotation(Deprecated.class) == null) {
FUNCTION_MAP.put(functionName, ScalarFunctionImpl.create(method));
}
}

public static Map<String, List<Function>> getRegisteredCalciteFunctionMap() {
return FUNCTION_MAP.map();
}
Expand Down Expand Up @@ -147,4 +154,42 @@ public static FunctionInfo getFunctionInfo(String functionName, int numParameter
private static String canonicalize(String functionName) {
return StringUtils.remove(functionName, '_').toLowerCase();
}

/**
* Placeholders for scalar function, they register and represents the signature for transform and filter predicate
* so that v2 engine can understand and plan them correctly.
*/
private static class PlaceholderScalarFunctions {

@ScalarFunction(names = {"jsonExtractScalar", "json_extract_scalar"})
public static Object jsonExtractScalar(String jsonFieldName, String jsonPath, String resultsType) {
throw new UnsupportedOperationException("Placeholder scalar function, should not reach here");
}

@ScalarFunction(names = {"jsonExtractScalar", "json_extract_scalar"})
public static Object jsonExtractScalar(String jsonFieldName, String jsonPath, String resultsType,
Object defaultValue) {
throw new UnsupportedOperationException("Placeholder scalar function, should not reach here");
}

@ScalarFunction(names = {"jsonExtractKey", "json_extract_key"})
public static String jsonExtractKey(String jsonFieldName, String jsonPath) {
throw new UnsupportedOperationException("Placeholder scalar function, should not reach here");
}

@ScalarFunction(names = {"textContains", "text_contains"})
public static boolean textContains(String text, String pattern) {
throw new UnsupportedOperationException("Placeholder scalar function, should not reach here");
}

@ScalarFunction(names = {"textMatch", "text_match"})
public static boolean textMatch(String text, String pattern) {
throw new UnsupportedOperationException("Placeholder scalar function, should not reach here");
}

@ScalarFunction(names = {"jsonMatch", "json_match"})
public static boolean jsonMatch(String text, String pattern) {
throw new UnsupportedOperationException("Placeholder scalar function, should not reach here");
}
}
}
Loading

0 comments on commit 36cab68

Please sign in to comment.