diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index 188de5aa64..7d0a452e1b 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -203,6 +203,12 @@ public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext contex TableFunctionImplementation tableFunctionImplementation = (TableFunctionImplementation) repository.compile( catalogSchemaIdentifierNameResolver.getCatalogName(), functionName, arguments); + context.push(); + TypeEnvironment curEnv = context.peek(); + Table table = tableFunctionImplementation.applyArguments(); + table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v)); + curEnv.define(new Symbol(Namespace.INDEX_NAME, + catalogSchemaIdentifierNameResolver.getIdentifierName()), STRUCT); return new LogicalRelation(catalogSchemaIdentifierNameResolver.getIdentifierName(), tableFunctionImplementation.applyArguments()); } diff --git a/docs/user/ppl/admin/prometheus_connector.rst b/docs/user/ppl/admin/prometheus_connector.rst index 53c862dfae..aced79cbdb 100644 --- a/docs/user/ppl/admin/prometheus_connector.rst +++ b/docs/user/ppl/admin/prometheus_connector.rst @@ -72,3 +72,116 @@ AWSSigV4 Auth:: } }] +PPL Query support for prometheus connector +========================================== + +Metric as a Table +--------------------------- +Each connector has to abstract the underlying datasource constructs into a table as part of the interface contract with the PPL query engine. +Prometheus connector abstracts each metric as a table and the columns of this table are ``@value``, ``@timestamp``, ``label1``, ``label2``---. +``@value`` represents metric measurement and ``@timestamp`` represents the timestamp at which the metric is collected. labels are tags associated with metric queried. +For eg: ``handler``, ``code``, ``instance``, ``code`` are the labels associated with ``prometheus_http_requests_total`` metric. With this abstraction, we can query prometheus +data using PPL syntax similar to opensearch indices. + +Sample Example:: + + > source = my_prometheus.prometheus_http_requests_total; + + +------------+------------------------+--------------------------------+---------------+-------------+-------------+ + | @value | @timestamp | handler | code | instance | job | + |------------+------------------------+--------------------------------+---------------+-------------+-------------| + | 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | 192.15.1.1 | prometheus | + | 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | 192.15.1.1 | prometheus | + | 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | 192.15.1.1 | prometheus | + | 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 | 192.15.2.1 | prometheus | + | 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus | + | 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus | + +------------+------------------------+--------------------------------+---------------+-------------+-------------+ + + + +Default time range and resolution +--------------------------------- +Since time range and resolution are required parameters for query apis and these parameters are determined in the following manner from the PPL commands. +* Time range is determined through filter clause on ``@timestamp``. If there is no such filter clause, time range will be set to 1h with endtime set to now(). +* In case of stats, resolution is determined by ``span(@timestamp,15s)`` expression. For normal select queries, resolution is auto determined from the time range set. + +Prometheus Connector Limitations +-------------------------------- +* Only one aggregation is supported in stats command. +* Span Expression is compulsory in stats command. +* AVG, MAX, MIN, SUM, COUNT are the only aggregations supported in prometheus connector. + +Example queries +--------------- + +1. Metric Selection Query:: + + > source = my_prometheus.prometheus_http_requests_total + +------------+------------------------+--------------------------------+---------------+-------------+-------------+ + | @value | @timestamp | handler | code | instance | job | + |------------+------------------------+--------------------------------+---------------+-------------+-------------| + | 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | 192.15.1.1 | prometheus | + | 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | 192.15.1.1 | prometheus | + | 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | 192.15.1.1 | prometheus | + | 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 | 192.15.2.1 | prometheus | + | 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus | + | 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus | + +------------+------------------------+--------------------------------+---------------+-------------+-------------+ + +2. Metric Selecting Query with specific dimensions:: + + > source = my_prometheus.prometheus_http_requests_total | where handler='/-/ready' and code='200' + +------------+------------------------+--------------------------------+---------------+-------------+-------------+ + | @value | @timestamp | handler | code | instance | job | + |------------+------------------------+--------------------------------+---------------+-------------+-------------| + | 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | 192.15.1.1 | prometheus | + | 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | 192.15.1.1 | prometheus | + | 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | 192.15.1.1 | prometheus | + | 2 | "2022-11-03 07:18:44" | "/-/ready" | 200 | 192.15.2.1 | prometheus | + | 9 | "2022-11-03 07:18:54" | "/-/ready" | 200 | 192.15.2.1 | prometheus | + | 11 | "2022-11-03 07:18:64" | "/-/ready" | 200 | 192.15.2.1 | prometheus | + +------------+------------------------+--------------------------------+---------------+-------------+-------------+ + +3. Average aggregation on a metric:: + + > source = my_prometheus.prometheus_http_requests_total | stats avg(@value) by span(@timestamp,15s) + +------------+------------------------+ + | avg(@value)| span(@timestamp,15s) | + |------------+------------------------+ + | 5 | "2022-11-03 07:18:14" | + | 3 | "2022-11-03 07:18:24" | + | 7 | "2022-11-03 07:18:34" | + | 2 | "2022-11-03 07:18:44" | + | 9 | "2022-11-03 07:18:54" | + | 11 | "2022-11-03 07:18:64" | + +------------+------------------------+ + +4. Average aggregation grouped by dimensions:: + + > source = my_prometheus.prometheus_http_requests_total | stats avg(@value) by span(@timestamp,15s), handler, code + +------------+------------------------+--------------------------------+---------------+ + | avg(@value)| span(@timestamp,15s) | handler | code | + |------------+------------------------+--------------------------------+---------------+ + | 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | + | 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | + | 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | + | 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 | + | 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | + | 11 | "2022-11-03 07:18:64" | "/-/metrics" | 500 | + +------------+------------------------+--------------------------------+---------------+ + +5. Count aggregation query:: + + > source = my_prometheus.prometheus_http_requests_total | stats count() by span(@timestamp,15s), handler, code + +------------+------------------------+--------------------------------+---------------+ + | count() | span(@timestamp,15s) | handler | code | + |------------+------------------------+--------------------------------+---------------+ + | 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | + | 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | + | 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | + | 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 | + | 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | + | 11 | "2022-11-03 07:18:64" | "/-/metrics" | 500 | + +------------+------------------------+--------------------------------+---------------+ + diff --git a/docs/user/ppl/cmd/describe.rst b/docs/user/ppl/cmd/describe.rst index c683aa1781..12fcf35ded 100644 --- a/docs/user/ppl/cmd/describe.rst +++ b/docs/user/ppl/cmd/describe.rst @@ -75,11 +75,10 @@ The example retrieves table info for ``prometheus_http_requests_total`` metric i PPL query:: os> describe my_prometheus.prometheus_http_requests_total; - fetched rows / total rows = 7/7 + fetched rows / total rows = 6/6 +-----------------+----------------+--------------------------------+---------------+-------------+ | TABLE_CATALOG | TABLE_SCHEMA | TABLE_NAME | COLUMN_NAME | DATA_TYPE | |-----------------+----------------+--------------------------------+---------------+-------------| - | my_prometheus | default | prometheus_http_requests_total | @labels | keyword | | my_prometheus | default | prometheus_http_requests_total | handler | keyword | | my_prometheus | default | prometheus_http_requests_total | code | keyword | | my_prometheus | default | prometheus_http_requests_total | instance | keyword | diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/DescribeCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/DescribeCommandIT.java index 88ef028fbe..77fd910f35 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/DescribeCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/DescribeCommandIT.java @@ -101,7 +101,6 @@ public void testDescribeCommandWithPrometheusCatalog() throws IOException { columnName("DATA_TYPE") ); verifyDataRows(result, - rows("my_prometheus", "default", "prometheus_http_requests_total", "@labels", "keyword"), rows("my_prometheus", "default", "prometheus_http_requests_total", "handler", "keyword"), rows("my_prometheus", "default", "prometheus_http_requests_total", "code", "keyword"), rows("my_prometheus", "default", "prometheus_http_requests_total", "instance", "keyword"), diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusCatalogCommandsIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusCatalogCommandsIT.java new file mode 100644 index 0000000000..9e197bbb27 --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/PrometheusCatalogCommandsIT.java @@ -0,0 +1,158 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.ppl; + +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifySchema; + +import lombok.SneakyThrows; +import org.apache.commons.lang3.StringUtils; +import org.json.JSONArray; +import org.json.JSONObject; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class PrometheusCatalogCommandsIT extends PPLIntegTestCase { + + @Test + @SneakyThrows + public void testSourceMetricCommand() { + JSONObject response = + executeQuery("source=my_prometheus.prometheus_http_requests_total"); + verifySchema(response, + schema(VALUE, "double"), + schema(TIMESTAMP, "timestamp"), + schema("handler", "string"), + schema("code", "string"), + schema("instance", "string"), + schema("job", "string")); + Assertions.assertTrue(response.getInt("size") > 0); + Assertions.assertEquals(6, response.getJSONArray("datarows").getJSONArray(0).length()); + JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0); + for (int i = 0; i < firstRow.length(); i++) { + Assertions.assertNotNull(firstRow.get(i)); + Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString())); + } + } + + @Test + @SneakyThrows + public void testMetricAvgAggregationCommand() { + JSONObject response = + executeQuery("source=my_prometheus.prometheus_http_requests_total | stats avg(@value) by span(@timestamp, 15s), handler, job"); + verifySchema(response, + schema("avg(@value)", "double"), + schema("span(@timestamp,15s)", "timestamp"), + schema("handler", "string"), + schema("job", "string")); + Assertions.assertTrue(response.getInt("size") > 0); + Assertions.assertEquals(4, response.getJSONArray("datarows").getJSONArray(0).length()); + JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0); + for (int i = 0; i < firstRow.length(); i++) { + Assertions.assertNotNull(firstRow.get(i)); + Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString())); + } + } + + @Test + @SneakyThrows + public void testMetricAvgAggregationCommandWithAlias() { + JSONObject response = + executeQuery("source=my_prometheus.prometheus_http_requests_total | stats avg(@value) as agg by span(@timestamp, 15s), handler, job"); + verifySchema(response, + schema("agg", "double"), + schema("span(@timestamp,15s)", "timestamp"), + schema("handler", "string"), + schema("job", "string")); + Assertions.assertTrue(response.getInt("size") > 0); + Assertions.assertEquals(4, response.getJSONArray("datarows").getJSONArray(0).length()); + JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0); + for (int i = 0; i < firstRow.length(); i++) { + Assertions.assertNotNull(firstRow.get(i)); + Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString())); + } + } + + + @Test + @SneakyThrows + public void testMetricMaxAggregationCommand() { + JSONObject response = + executeQuery("source=my_prometheus.prometheus_http_requests_total | stats max(@value) by span(@timestamp, 15s)"); + verifySchema(response, + schema("max(@value)", "double"), + schema("span(@timestamp,15s)", "timestamp")); + Assertions.assertTrue(response.getInt("size") > 0); + Assertions.assertEquals(2, response.getJSONArray("datarows").getJSONArray(0).length()); + JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0); + for (int i = 0; i < firstRow.length(); i++) { + Assertions.assertNotNull(firstRow.get(i)); + Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString())); + } + } + + + @Test + @SneakyThrows + public void testMetricMinAggregationCommand() { + JSONObject response = + executeQuery("source=my_prometheus.prometheus_http_requests_total | stats min(@value) by span(@timestamp, 15s), handler"); + verifySchema(response, + schema("min(@value)", "double"), + schema("span(@timestamp,15s)", "timestamp"), + schema("handler", "string")); + Assertions.assertTrue(response.getInt("size") > 0); + Assertions.assertEquals(3, response.getJSONArray("datarows").getJSONArray(0).length()); + JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0); + for (int i = 0; i < firstRow.length(); i++) { + Assertions.assertNotNull(firstRow.get(i)); + Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString())); + } + } + + @Test + @SneakyThrows + public void testMetricCountAggregationCommand() { + JSONObject response = + executeQuery("source=my_prometheus.prometheus_http_requests_total | stats count() by span(@timestamp, 15s), handler, job"); + verifySchema(response, + schema("count()", "integer"), + schema("span(@timestamp,15s)", "timestamp"), + schema("handler", "string"), + schema("job", "string")); + Assertions.assertTrue(response.getInt("size") > 0); + Assertions.assertEquals(4, response.getJSONArray("datarows").getJSONArray(0).length()); + JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0); + for (int i = 0; i < firstRow.length(); i++) { + Assertions.assertNotNull(firstRow.get(i)); + Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString())); + } + } + + @Test + @SneakyThrows + public void testMetricSumAggregationCommand() { + JSONObject response = + executeQuery("source=my_prometheus.prometheus_http_requests_total | stats sum(@value) by span(@timestamp, 15s), handler, job"); + verifySchema(response, + schema("sum(@value)", "double"), + schema("span(@timestamp,15s)", "timestamp"), + schema("handler", "string"), + schema("job", "string")); + Assertions.assertTrue(response.getInt("size") > 0); + Assertions.assertEquals(4, response.getJSONArray("datarows").getJSONArray(0).length()); + JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0); + for (int i = 0; i < firstRow.length(); i++) { + Assertions.assertNotNull(firstRow.get(i)); + Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString())); + } + } + +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/catalog/CatalogServiceImpl.java b/plugin/src/main/java/org/opensearch/sql/plugin/catalog/CatalogServiceImpl.java index 4f2f5e0f57..2e88a49f35 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/catalog/CatalogServiceImpl.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/catalog/CatalogServiceImpl.java @@ -79,7 +79,7 @@ public void loadConnectors(Settings settings) { } catch (IOException e) { LOG.error("Catalog Configuration File uploaded is malformed. Verify and re-upload.", e); } catch (Throwable e) { - LOG.error("Catalog constructed failed.", e); + LOG.error("Catalog construction failed.", e); } } return null; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 99ff931f39..14ebbe717a 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -161,8 +161,6 @@ mlArg fromClause : SOURCE EQUAL tableSourceClause | INDEX EQUAL tableSourceClause - | SOURCE EQUAL tableFunction - | INDEX EQUAL tableFunction ; tableSourceClause diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 699d671bb2..2638fc9a42 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Optional; import java.util.stream.Collectors; +import lombok.Generated; import lombok.RequiredArgsConstructor; import org.antlr.v4.runtime.ParserRuleContext; import org.antlr.v4.runtime.Token; @@ -348,11 +349,7 @@ public UnresolvedPlan visitTopCommand(TopCommandContext ctx) { */ @Override public UnresolvedPlan visitFromClause(FromClauseContext ctx) { - if (ctx.tableFunction() != null) { - return visitTableFunction(ctx.tableFunction()); - } else { - return visitTableSourceClause(ctx.tableSourceClause()); - } + return visitTableSourceClause(ctx.tableSourceClause()); } @Override @@ -363,16 +360,10 @@ public UnresolvedPlan visitTableSourceClause(TableSourceClauseContext ctx) { } @Override + @Generated //To exclude from jacoco..will remove https://github.com/opensearch-project/sql/issues/1019 public UnresolvedPlan visitTableFunction(OpenSearchPPLParser.TableFunctionContext ctx) { - ImmutableList.Builder builder = ImmutableList.builder(); - ctx.functionArgs().functionArg().forEach(arg - -> { - String argName = (arg.ident() != null) ? arg.ident().getText() : null; - builder.add( - new UnresolvedArgument(argName, - this.internalVisitExpression(arg.valueExpression()))); - }); - return new TableFunction(this.internalVisitExpression(ctx.qualifiedName()), builder.build()); + // + return null; } /** diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 314d97009c..be81fcb06c 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -11,6 +11,7 @@ import com.google.common.collect.ImmutableMap; import java.util.List; import java.util.stream.Collectors; +import lombok.Generated; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.opensearch.sql.ast.AbstractNodeVisitor; @@ -80,13 +81,10 @@ public String visitRelation(Relation node, String context) { } @Override + @Generated //To exclude from jacoco..will remove https://github.com/opensearch-project/sql/issues/1019 public String visitTableFunction(TableFunction node, String context) { - String arguments = - node.getArguments().stream() - .map(unresolvedExpression - -> this.expressionAnalyzer.analyze(unresolvedExpression, context)) - .collect(Collectors.joining(",")); - return StringUtils.format("source=%s(%s)", node.getFunctionName().toString(), arguments); + // + return null; } @Override diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index 26e44b674d..658bf1d295 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -97,6 +97,7 @@ public void testSearchCommandWithDotInIndexName() { ); } + @Ignore @Test public void testSearchWithPrometheusQueryRangeWithPositionedArguments() { assertEqual("search source = prometheus.query_range(\"test{code='200'}\",1234, 12345, 3)", @@ -108,6 +109,7 @@ public void testSearchWithPrometheusQueryRangeWithPositionedArguments() { )); } + @Ignore @Test public void testSearchWithPrometheusQueryRangeWithNamedArguments() { assertEqual("search source = prometheus.query_range(query = \"test{code='200'}\", " diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index 52f2f18b72..4a4c0a5f22 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -15,6 +15,7 @@ import com.google.common.collect.ImmutableSet; import java.util.Collections; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; @@ -39,6 +40,7 @@ public void testSearchCommand() { } @Test + @Ignore public void testTableFunctionCommand() { assertEquals("source=prometheus.query_range(***,***,***,***)", anonymize("source=prometheus.query_range('afsd',123,123,3)") diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/client/PrometheusClientImpl.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/client/PrometheusClientImpl.java index 7068b848ca..4a469c7bbb 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/client/PrometheusClientImpl.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/client/PrometheusClientImpl.java @@ -55,15 +55,17 @@ public JSONObject queryRange(String query, Long start, Long end, String step) th @Override public List getLabels(String metricName) throws IOException { - String queryUrl = String.format("%s/api/v1/labels?match[]=%s", - uri.toString().replaceAll("/$", ""), metricName); + String queryUrl = String.format("%s/api/v1/labels?%s=%s", + uri.toString().replaceAll("/$", ""), + URLEncoder.encode("match[]", StandardCharsets.UTF_8), + URLEncoder.encode(metricName, StandardCharsets.UTF_8)); logger.debug("queryUrl: " + queryUrl); Request request = new Request.Builder() .url(queryUrl) .build(); Response response = this.okHttpClient.newCall(request).execute(); JSONObject jsonObject = readResponse(response); - return toListOfStrings(jsonObject.getJSONArray("data")); + return toListOfLabels(jsonObject.getJSONArray("data")); } @Override @@ -81,7 +83,7 @@ public Map> getAllMetrics() throws IOException { return new ObjectMapper().readValue(jsonObject.getJSONObject("data").toString(), typeRef); } - private List toListOfStrings(JSONArray array) { + private List toListOfLabels(JSONArray array) { List result = new ArrayList<>(); for (int i = 0; i < array.length(); i++) { //__name__ is internal label in prometheus representing the metric name. diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/implementation/QueryRangeFunctionImplementation.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/implementation/QueryRangeFunctionImplementation.java index 8238a3a4e0..bccb9c3bff 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/implementation/QueryRangeFunctionImplementation.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/functions/implementation/QueryRangeFunctionImplementation.java @@ -87,7 +87,7 @@ private PrometheusQueryRequest buildQueryFromQueryRangeFunction(List switch (argName) { case QUERY: prometheusQueryRequest - .getPromQl().append((String) literalValue.value()); + .setPromQl((String) literalValue.value()); break; case STARTTIME: prometheusQueryRequest.setStartTime(((Number) literalValue.value()).longValue()); diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalMetricAgg.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalMetricAgg.java new file mode 100644 index 0000000000..f348c699a1 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalMetricAgg.java @@ -0,0 +1,76 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.planner.logical; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.NamedExpression; +import org.opensearch.sql.expression.aggregation.NamedAggregator; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor; + + +/** + * Logical Metric Scan along with aggregation Operation. + */ +@Getter +@ToString +@EqualsAndHashCode(callSuper = false) +public class PrometheusLogicalMetricAgg extends LogicalPlan { + + private final String metricName; + + /** + * Filter Condition. + */ + @Setter + private Expression filter; + + /** + * Aggregation List. + */ + @Setter + private List aggregatorList; + + /** + * Group List. + */ + @Setter + private List groupByList; + + /** + * Constructor for LogicalMetricAgg Logical Plan. + * + * @param metricName metricName + * @param filter filter + * @param aggregatorList aggregatorList + * @param groupByList groupByList. + */ + @Builder + public PrometheusLogicalMetricAgg(String metricName, + Expression filter, + List aggregatorList, + List groupByList) { + super(ImmutableList.of()); + this.metricName = metricName; + this.filter = filter; + this.aggregatorList = aggregatorList; + this.groupByList = groupByList; + } + + @Override + public R accept(LogicalPlanNodeVisitor visitor, C context) { + return visitor.visitNode(this, context); + } +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalMetricScan.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalMetricScan.java new file mode 100644 index 0000000000..5e07d6899f --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalMetricScan.java @@ -0,0 +1,54 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.planner.logical; + +import com.google.common.collect.ImmutableList; +import lombok.Builder; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.ToString; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor; + +/** + * Prometheus Logical Metric Scan Operation. + * In an optimized plan this node represents both Relation and Filter Operation. + */ +@Getter +@ToString +@EqualsAndHashCode(callSuper = false) +public class PrometheusLogicalMetricScan extends LogicalPlan { + + private final String metricName; + + /** + * Filter Condition. + */ + private final Expression filter; + + /** + * PrometheusLogicalMetricScan constructor. + * + * @param metricName metricName. + * @param filter filter. + */ + @Builder + public PrometheusLogicalMetricScan(String metricName, + Expression filter) { + super(ImmutableList.of()); + this.metricName = metricName; + this.filter = filter; + } + + @Override + public R accept(LogicalPlanNodeVisitor visitor, C context) { + return visitor.visitNode(this, context); + } + +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalPlanOptimizerFactory.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalPlanOptimizerFactory.java new file mode 100644 index 0000000000..8a365b2786 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicalPlanOptimizerFactory.java @@ -0,0 +1,32 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.prometheus.planner.logical; + + +import java.util.Arrays; +import lombok.experimental.UtilityClass; +import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; +import org.opensearch.sql.prometheus.planner.logical.rules.MergeAggAndIndexScan; +import org.opensearch.sql.prometheus.planner.logical.rules.MergeAggAndRelation; +import org.opensearch.sql.prometheus.planner.logical.rules.MergeFilterAndRelation; + +/** + * Prometheus storage engine specified logical plan optimizer. + */ +@UtilityClass +public class PrometheusLogicalPlanOptimizerFactory { + + /** + * Create Prometheus storage specified logical plan optimizer. + */ + public static LogicalPlanOptimizer create() { + return new LogicalPlanOptimizer(Arrays.asList( + new MergeFilterAndRelation(), + new MergeAggAndIndexScan(), + new MergeAggAndRelation() + )); + } +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/rules/MergeAggAndIndexScan.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/rules/MergeAggAndIndexScan.java new file mode 100644 index 0000000000..76bc6cc840 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/rules/MergeAggAndIndexScan.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.sql.prometheus.planner.logical.rules; + +import static com.facebook.presto.matching.Pattern.typeOf; +import static org.opensearch.sql.planner.optimizer.pattern.Patterns.source; + +import com.facebook.presto.matching.Capture; +import com.facebook.presto.matching.Captures; +import com.facebook.presto.matching.Pattern; +import lombok.Getter; +import lombok.experimental.Accessors; +import org.opensearch.sql.planner.logical.LogicalAggregation; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.optimizer.Rule; +import org.opensearch.sql.prometheus.planner.logical.PrometheusLogicalMetricAgg; +import org.opensearch.sql.prometheus.planner.logical.PrometheusLogicalMetricScan; + +/** + * Merge Aggregation -- Relation to MetricScanAggregation. + */ +public class MergeAggAndIndexScan implements Rule { + + private final Capture capture; + + @Accessors(fluent = true) + @Getter + private final Pattern pattern; + + /** + * Constructor of MergeAggAndIndexScan. + */ + public MergeAggAndIndexScan() { + this.capture = Capture.newCapture(); + this.pattern = typeOf(LogicalAggregation.class) + .with(source().matching(typeOf(PrometheusLogicalMetricScan.class) + .capturedAs(capture))); + } + + @Override + public LogicalPlan apply(LogicalAggregation aggregation, + Captures captures) { + PrometheusLogicalMetricScan indexScan = captures.get(capture); + return PrometheusLogicalMetricAgg + .builder() + .metricName(indexScan.getMetricName()) + .filter(indexScan.getFilter()) + .aggregatorList(aggregation.getAggregatorList()) + .groupByList(aggregation.getGroupByList()) + .build(); + } +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/rules/MergeAggAndRelation.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/rules/MergeAggAndRelation.java new file mode 100644 index 0000000000..fa9b0c7206 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/rules/MergeAggAndRelation.java @@ -0,0 +1,54 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.sql.prometheus.planner.logical.rules; + +import static com.facebook.presto.matching.Pattern.typeOf; +import static org.opensearch.sql.planner.optimizer.pattern.Patterns.source; + +import com.facebook.presto.matching.Capture; +import com.facebook.presto.matching.Captures; +import com.facebook.presto.matching.Pattern; +import lombok.Getter; +import lombok.experimental.Accessors; +import org.opensearch.sql.planner.logical.LogicalAggregation; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.logical.LogicalRelation; +import org.opensearch.sql.planner.optimizer.Rule; +import org.opensearch.sql.prometheus.planner.logical.PrometheusLogicalMetricAgg; + +/** + * Merge Aggregation -- Relation to IndexScanAggregation. + */ +public class MergeAggAndRelation implements Rule { + + private final Capture relationCapture; + + @Accessors(fluent = true) + @Getter + private final Pattern pattern; + + /** + * Constructor of MergeAggAndRelation. + */ + public MergeAggAndRelation() { + this.relationCapture = Capture.newCapture(); + this.pattern = typeOf(LogicalAggregation.class) + .with(source().matching(typeOf(LogicalRelation.class).capturedAs(relationCapture))); + } + + @Override + public LogicalPlan apply(LogicalAggregation aggregation, + Captures captures) { + LogicalRelation relation = captures.get(relationCapture); + return PrometheusLogicalMetricAgg + .builder() + .metricName(relation.getRelationName()) + .aggregatorList(aggregation.getAggregatorList()) + .groupByList(aggregation.getGroupByList()) + .build(); + } +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/rules/MergeFilterAndRelation.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/rules/MergeFilterAndRelation.java new file mode 100644 index 0000000000..a99eb695be --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/planner/logical/rules/MergeFilterAndRelation.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.sql.prometheus.planner.logical.rules; + +import static com.facebook.presto.matching.Pattern.typeOf; +import static org.opensearch.sql.planner.optimizer.pattern.Patterns.source; + +import com.facebook.presto.matching.Capture; +import com.facebook.presto.matching.Captures; +import com.facebook.presto.matching.Pattern; +import org.opensearch.sql.planner.logical.LogicalFilter; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.logical.LogicalRelation; +import org.opensearch.sql.planner.optimizer.Rule; +import org.opensearch.sql.prometheus.planner.logical.PrometheusLogicalMetricScan; + +/** + * Merge Filter -- Relation to LogicalMetricScan. + */ +public class MergeFilterAndRelation implements Rule { + + private final Capture relationCapture; + private final Pattern pattern; + + /** + * Constructor of MergeFilterAndRelation. + */ + public MergeFilterAndRelation() { + this.relationCapture = Capture.newCapture(); + this.pattern = typeOf(LogicalFilter.class) + .with(source().matching(typeOf(LogicalRelation.class).capturedAs(relationCapture))); + } + + @Override + public Pattern pattern() { + return pattern; + } + + @Override + public LogicalPlan apply(LogicalFilter filter, + Captures captures) { + LogicalRelation relation = captures.get(relationCapture); + return PrometheusLogicalMetricScan + .builder() + .metricName(relation.getRelationName()) + .filter(filter.getCondition()) + .build(); + } +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/request/PrometheusQueryRequest.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/request/PrometheusQueryRequest.java index 5d4bf2ae7c..176a52a1d9 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/request/PrometheusQueryRequest.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/request/PrometheusQueryRequest.java @@ -7,8 +7,10 @@ package org.opensearch.sql.prometheus.request; import lombok.AllArgsConstructor; +import lombok.Data; import lombok.EqualsAndHashCode; import lombok.Getter; +import lombok.NoArgsConstructor; import lombok.Setter; import lombok.ToString; @@ -16,38 +18,30 @@ * Prometheus metric query request. */ @EqualsAndHashCode -@Getter +@Data @ToString @AllArgsConstructor +@NoArgsConstructor public class PrometheusQueryRequest { /** * PromQL. */ - private final StringBuilder promQl; + private String promQl; /** * startTime of the query. */ - @Setter private Long startTime; /** * endTime of the query. */ - @Setter private Long endTime; /** * step is the resolution required between startTime and endTime. */ - @Setter private String step; - /** - * Constructor of PrometheusQueryRequest. - */ - public PrometheusQueryRequest() { - this.promQl = new StringBuilder(); - } } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java index 98a54a7731..e26e006403 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/response/PrometheusResponse.java @@ -5,9 +5,10 @@ package org.opensearch.sql.prometheus.response; +import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE; +import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; +import static org.opensearch.sql.data.type.ExprCoreType.LONG; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS; -import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; -import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; import java.time.Instant; import java.util.ArrayList; @@ -18,17 +19,37 @@ import org.json.JSONArray; import org.json.JSONObject; import org.opensearch.sql.data.model.ExprDoubleValue; +import org.opensearch.sql.data.model.ExprIntegerValue; +import org.opensearch.sql.data.model.ExprLongValue; import org.opensearch.sql.data.model.ExprStringValue; import org.opensearch.sql.data.model.ExprTimestampValue; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.prometheus.storage.model.PrometheusResponseFieldNames; public class PrometheusResponse implements Iterable { private final JSONObject responseObject; - public PrometheusResponse(JSONObject responseObject) { + private final PrometheusResponseFieldNames prometheusResponseFieldNames; + + private final Boolean isQueryRangeFunctionScan; + + /** + * Constructor. + * + * @param responseObject Prometheus responseObject. + * @param prometheusResponseFieldNames data model which + * contains field names for the metric measurement + * and timestamp fieldName. + */ + public PrometheusResponse(JSONObject responseObject, + PrometheusResponseFieldNames prometheusResponseFieldNames, + Boolean isQueryRangeFunctionScan) { this.responseObject = responseObject; + this.prometheusResponseFieldNames = prometheusResponseFieldNames; + this.isQueryRangeFunctionScan = isQueryRangeFunctionScan; } @NonNull @@ -44,10 +65,28 @@ public Iterator iterator() { for (int j = 0; j < values.length(); j++) { LinkedHashMap linkedHashMap = new LinkedHashMap<>(); JSONArray val = values.getJSONArray(j); - linkedHashMap.put(TIMESTAMP, + linkedHashMap.put(prometheusResponseFieldNames.getTimestampFieldName(), new ExprTimestampValue(Instant.ofEpochMilli((long) (val.getDouble(0) * 1000)))); - linkedHashMap.put(VALUE, new ExprDoubleValue(val.getDouble(1))); - linkedHashMap.put(LABELS, new ExprStringValue(metric.toString())); + linkedHashMap.put(prometheusResponseFieldNames.getValueFieldName(), getValue(val, 1, + prometheusResponseFieldNames.getValueType())); + // Concept: + // {\"instance\":\"localhost:9090\",\"__name__\":\"up\",\"job\":\"prometheus\"}" + // This is the label string in the prometheus response. + // Q: how do we map this to columns in a table. + // For queries like source = prometheus.metric_name | .... + // we can get the labels list in prior as we know which metric we are working on. + // In case of commands like source = prometheus.query_range('promQL'); + // Any arbitrary command can be written and we don't know the labels + // in the prometheus response in prior. + // So for PPL like commands...output structure is @value, @timestamp + // and each label is treated as a separate column where as in case of query_range + // function irrespective of promQL, the output structure is + // @value, @timestamp, @labels [jsonfied string of all the labels for a data point] + if (isQueryRangeFunctionScan) { + linkedHashMap.put(LABELS, new ExprStringValue(metric.toString())); + } else { + insertLabels(linkedHashMap, metric); + } result.add(new ExprTupleValue(linkedHashMap)); } } @@ -58,4 +97,20 @@ public Iterator iterator() { } return result.iterator(); } + + private void insertLabels(LinkedHashMap linkedHashMap, JSONObject metric) { + for (String key : metric.keySet()) { + linkedHashMap.put(key, new ExprStringValue(metric.getString(key))); + } + } + + private ExprValue getValue(JSONArray jsonArray, Integer index, ExprType exprType) { + if (INTEGER.equals(exprType)) { + return new ExprIntegerValue(jsonArray.getInt(index)); + } else if (LONG.equals(exprType)) { + return new ExprLongValue(jsonArray.getLong(index)); + } + return new ExprDoubleValue(jsonArray.getDouble(index)); + } + } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricDefaultSchema.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricDefaultSchema.java index 5fc7edfc73..790189d903 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricDefaultSchema.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricDefaultSchema.java @@ -7,7 +7,6 @@ package org.opensearch.sql.prometheus.storage; -import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; @@ -25,10 +24,8 @@ public enum PrometheusMetricDefaultSchema { DEFAULT_MAPPING(new ImmutableMap.Builder() .put(TIMESTAMP, ExprCoreType.TIMESTAMP) .put(VALUE, ExprCoreType.DOUBLE) - .put(LABELS, ExprCoreType.STRING) .build()); private final Map mapping; - } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScan.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScan.java index d8ab97709b..8611ae04f1 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScan.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScan.java @@ -20,6 +20,7 @@ import org.opensearch.sql.prometheus.client.PrometheusClient; import org.opensearch.sql.prometheus.request.PrometheusQueryRequest; import org.opensearch.sql.prometheus.response.PrometheusResponse; +import org.opensearch.sql.prometheus.storage.model.PrometheusResponseFieldNames; import org.opensearch.sql.storage.TableScanOperator; /** @@ -39,11 +40,25 @@ public class PrometheusMetricScan extends TableScanOperator { private Iterator iterator; + @Setter + @Getter + private Boolean isQueryRangeFunctionScan = Boolean.FALSE; + + @Setter + private PrometheusResponseFieldNames prometheusResponseFieldNames; + + private static final Logger LOG = LogManager.getLogger(); + /** + * Constructor. + * + * @param prometheusClient prometheusClient. + */ public PrometheusMetricScan(PrometheusClient prometheusClient) { this.prometheusClient = prometheusClient; this.request = new PrometheusQueryRequest(); + this.prometheusResponseFieldNames = new PrometheusResponseFieldNames(); } @Override @@ -52,9 +67,10 @@ public void open() { this.iterator = AccessController.doPrivileged((PrivilegedAction>) () -> { try { JSONObject responseObject = prometheusClient.queryRange( - request.getPromQl().toString(), + request.getPromQl(), request.getStartTime(), request.getEndTime(), request.getStep()); - return new PrometheusResponse(responseObject).iterator(); + return new PrometheusResponse(responseObject, prometheusResponseFieldNames, + isQueryRangeFunctionScan).iterator(); } catch (IOException e) { LOG.error(e.getMessage()); throw new RuntimeException("Error fetching data from prometheus server. " + e.getMessage()); diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java index 3d8ef69b17..b81314d936 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTable.java @@ -6,14 +6,18 @@ package org.opensearch.sql.prometheus.storage; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS; + +import java.util.HashMap; import java.util.Map; -import java.util.Optional; import javax.annotation.Nonnull; import lombok.Getter; +import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.data.type.ExprType; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.planner.logical.PrometheusLogicalPlanOptimizerFactory; import org.opensearch.sql.prometheus.request.PrometheusQueryRequest; import org.opensearch.sql.prometheus.request.system.PrometheusDescribeMetricRequest; import org.opensearch.sql.prometheus.storage.implementor.PrometheusDefaultImplementor; @@ -29,10 +33,10 @@ public class PrometheusMetricTable implements Table { private final PrometheusClient prometheusClient; @Getter - private final Optional metricName; + private final String metricName; @Getter - private final Optional prometheusQueryRequest; + private final PrometheusQueryRequest prometheusQueryRequest; /** @@ -45,8 +49,8 @@ public class PrometheusMetricTable implements Table { */ public PrometheusMetricTable(PrometheusClient prometheusService, @Nonnull String metricName) { this.prometheusClient = prometheusService; - this.metricName = Optional.of(metricName); - this.prometheusQueryRequest = Optional.empty(); + this.metricName = metricName; + this.prometheusQueryRequest = null; } /** @@ -55,19 +59,21 @@ public PrometheusMetricTable(PrometheusClient prometheusService, @Nonnull String public PrometheusMetricTable(PrometheusClient prometheusService, @Nonnull PrometheusQueryRequest prometheusQueryRequest) { this.prometheusClient = prometheusService; - this.metricName = Optional.empty(); - this.prometheusQueryRequest = Optional.of(prometheusQueryRequest); + this.metricName = null; + this.prometheusQueryRequest = prometheusQueryRequest; } @Override public Map getFieldTypes() { if (cachedFieldTypes == null) { - if (metricName.isPresent()) { + if (metricName != null) { cachedFieldTypes = new PrometheusDescribeMetricRequest(prometheusClient, null, - metricName.orElse(null)).getFieldTypes(); + metricName).getFieldTypes(); } else { - cachedFieldTypes = PrometheusMetricDefaultSchema.DEFAULT_MAPPING.getMapping(); + cachedFieldTypes = new HashMap<>(PrometheusMetricDefaultSchema.DEFAULT_MAPPING + .getMapping()); + cachedFieldTypes.put(LABELS, ExprCoreType.STRING); } } return cachedFieldTypes; @@ -77,13 +83,16 @@ public Map getFieldTypes() { public PhysicalPlan implement(LogicalPlan plan) { PrometheusMetricScan metricScan = new PrometheusMetricScan(prometheusClient); - prometheusQueryRequest.ifPresent(metricScan::setRequest); + if (prometheusQueryRequest != null) { + metricScan.setRequest(prometheusQueryRequest); + metricScan.setIsQueryRangeFunctionScan(Boolean.TRUE); + } return plan.accept(new PrometheusDefaultImplementor(), metricScan); } @Override public LogicalPlan optimize(LogicalPlan plan) { - return plan; + return PrometheusLogicalPlanOptimizerFactory.create().optimize(plan); } } \ No newline at end of file diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java index f18ff1bba8..f8ae0936ee 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngine.java @@ -45,7 +45,7 @@ public Table getTable(CatalogSchemaName catalogSchemaName, String tableName) { } else if (INFORMATION_SCHEMA_NAME.equals(catalogSchemaName.getSchemaName())) { return resolveInformationSchemaTable(catalogSchemaName, tableName); } else { - return null; + return new PrometheusMetricTable(prometheusClient, tableName); } } diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/implementor/PrometheusDefaultImplementor.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/implementor/PrometheusDefaultImplementor.java index 6235ecc14b..071cd7ba8c 100644 --- a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/implementor/PrometheusDefaultImplementor.java +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/implementor/PrometheusDefaultImplementor.java @@ -7,22 +7,34 @@ package org.opensearch.sql.prometheus.storage.implementor; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS; -import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; -import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import lombok.RequiredArgsConstructor; -import org.opensearch.sql.data.type.ExprCoreType; +import org.apache.commons.math3.util.Pair; +import org.opensearch.sql.common.utils.StringUtils; +import org.opensearch.sql.expression.Expression; import org.opensearch.sql.expression.NamedExpression; import org.opensearch.sql.expression.ReferenceExpression; +import org.opensearch.sql.expression.span.SpanExpression; import org.opensearch.sql.planner.DefaultImplementor; +import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.logical.LogicalProject; import org.opensearch.sql.planner.logical.LogicalRelation; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.ProjectOperator; +import org.opensearch.sql.prometheus.planner.logical.PrometheusLogicalMetricAgg; +import org.opensearch.sql.prometheus.planner.logical.PrometheusLogicalMetricScan; import org.opensearch.sql.prometheus.storage.PrometheusMetricScan; +import org.opensearch.sql.prometheus.storage.PrometheusMetricTable; +import org.opensearch.sql.prometheus.storage.model.PrometheusResponseFieldNames; +import org.opensearch.sql.prometheus.storage.querybuilder.AggregationQueryBuilder; +import org.opensearch.sql.prometheus.storage.querybuilder.SeriesSelectionQueryBuilder; +import org.opensearch.sql.prometheus.storage.querybuilder.StepParameterResolver; +import org.opensearch.sql.prometheus.storage.querybuilder.TimeRangeParametersResolver; /** * Default Implementor of Logical plan for prometheus. @@ -31,26 +43,104 @@ public class PrometheusDefaultImplementor extends DefaultImplementor { + + @Override + public PhysicalPlan visitNode(LogicalPlan plan, PrometheusMetricScan context) { + if (plan instanceof PrometheusLogicalMetricScan) { + return visitIndexScan((PrometheusLogicalMetricScan) plan, context); + } else if (plan instanceof PrometheusLogicalMetricAgg) { + return visitIndexAggregation((PrometheusLogicalMetricAgg) plan, context); + } else { + throw new IllegalStateException(StringUtils.format("unexpected plan node type %s", + plan.getClass())); + } + } + + /** + * Implement PrometheusLogicalMetricScan. + */ + public PhysicalPlan visitIndexScan(PrometheusLogicalMetricScan node, + PrometheusMetricScan context) { + String query = SeriesSelectionQueryBuilder.build(node.getMetricName(), node.getFilter()); + + context.getRequest().setPromQl(query); + setTimeRangeParameters(node.getFilter(), context); + context.getRequest() + .setStep(StepParameterResolver.resolve(context.getRequest().getStartTime(), + context.getRequest().getEndTime(), null)); + return context; + } + + /** + * Implement PrometheusLogicalMetricAgg. + */ + public PhysicalPlan visitIndexAggregation(PrometheusLogicalMetricAgg node, + PrometheusMetricScan context) { + setTimeRangeParameters(node.getFilter(), context); + context.getRequest() + .setStep(StepParameterResolver.resolve(context.getRequest().getStartTime(), + context.getRequest().getEndTime(), node.getGroupByList())); + String step = context.getRequest().getStep(); + String seriesSelectionQuery + = SeriesSelectionQueryBuilder.build(node.getMetricName(), node.getFilter()); + + String aggregateQuery + = AggregationQueryBuilder.build(node.getAggregatorList(), + node.getGroupByList()); + + String finalQuery = String.format(aggregateQuery, seriesSelectionQuery + "[" + step + "]"); + context.getRequest().setPromQl(finalQuery); + + //Since prometheus response doesn't have any fieldNames in its output. + //the field names are sent to PrometheusResponse constructor via context. + setPrometheusResponseFieldNames(node, context); + return context; + } + @Override public PhysicalPlan visitRelation(LogicalRelation node, PrometheusMetricScan context) { + PrometheusMetricTable prometheusMetricTable = (PrometheusMetricTable) node.getTable(); + if (prometheusMetricTable.getMetricName() != null) { + String query = SeriesSelectionQueryBuilder.build(node.getRelationName(), null); + context.getRequest().setPromQl(query); + setTimeRangeParameters(null, context); + context.getRequest() + .setStep(StepParameterResolver.resolve(context.getRequest().getStartTime(), + context.getRequest().getEndTime(), null)); + } return context; } - // Since getFieldTypes include labels - // we are explicitly specifying the output column names; - @Override - public PhysicalPlan visitProject(LogicalProject node, PrometheusMetricScan context) { - List finalProjectList = new ArrayList<>(); - finalProjectList.add( - new NamedExpression(LABELS, new ReferenceExpression(LABELS, ExprCoreType.STRING))); - finalProjectList.add( - new NamedExpression(TIMESTAMP, - new ReferenceExpression(TIMESTAMP, ExprCoreType.TIMESTAMP))); - finalProjectList.add( - new NamedExpression(VALUE, new ReferenceExpression(VALUE, ExprCoreType.DOUBLE))); - return new ProjectOperator(visitChild(node, context), finalProjectList, - node.getNamedParseExpressions()); + private void setTimeRangeParameters(Expression filter, PrometheusMetricScan context) { + TimeRangeParametersResolver timeRangeParametersResolver = new TimeRangeParametersResolver(); + Pair timeRange = timeRangeParametersResolver.resolve(filter); + context.getRequest().setStartTime(timeRange.getFirst()); + context.getRequest().setEndTime(timeRange.getSecond()); } + private void setPrometheusResponseFieldNames(PrometheusLogicalMetricAgg node, + PrometheusMetricScan context) { + Optional spanExpression = getSpanExpression(node.getGroupByList()); + if (spanExpression.isEmpty()) { + throw new RuntimeException( + "Prometheus Catalog doesn't support aggregations without span expression"); + } + PrometheusResponseFieldNames prometheusResponseFieldNames = new PrometheusResponseFieldNames(); + prometheusResponseFieldNames.setValueFieldName(node.getAggregatorList().get(0).getName()); + prometheusResponseFieldNames.setValueType(node.getAggregatorList().get(0).type()); + prometheusResponseFieldNames.setTimestampFieldName(spanExpression.get().getNameOrAlias()); + context.setPrometheusResponseFieldNames(prometheusResponseFieldNames); + } + + private Optional getSpanExpression(List namedExpressionList) { + if (namedExpressionList == null) { + return Optional.empty(); + } + return namedExpressionList.stream() + .filter(expression -> expression.getDelegated() instanceof SpanExpression) + .findFirst(); + } + + } \ No newline at end of file diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/model/PrometheusResponseFieldNames.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/model/PrometheusResponseFieldNames.java new file mode 100644 index 0000000000..4276848aa2 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/model/PrometheusResponseFieldNames.java @@ -0,0 +1,27 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.storage.model; + +import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; + +import lombok.Getter; +import lombok.Setter; +import org.opensearch.sql.data.type.ExprType; + + +@Getter +@Setter +public class PrometheusResponseFieldNames { + + private String valueFieldName = VALUE; + private ExprType valueType = DOUBLE; + private String timestampFieldName = TIMESTAMP; + +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/model/QueryRangeParameters.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/model/QueryRangeParameters.java new file mode 100644 index 0000000000..86ca99cea8 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/model/QueryRangeParameters.java @@ -0,0 +1,25 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.storage.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@AllArgsConstructor +@NoArgsConstructor +public class QueryRangeParameters { + + private Long start; + private Long end; + private String step; + +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/AggregationQueryBuilder.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/AggregationQueryBuilder.java new file mode 100644 index 0000000000..1aff9eca88 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/AggregationQueryBuilder.java @@ -0,0 +1,78 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.storage.querybuilder; + +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import lombok.NoArgsConstructor; +import org.opensearch.sql.expression.NamedExpression; +import org.opensearch.sql.expression.aggregation.NamedAggregator; +import org.opensearch.sql.expression.function.BuiltinFunctionName; +import org.opensearch.sql.expression.span.SpanExpression; + +/** + * This class builds aggregation query for the given stats commands. + * In the generated query a placeholder(%s) is added in place of metric selection query + * and later replaced by metric selection query. + */ +@NoArgsConstructor +public class AggregationQueryBuilder { + + private static final Set allowedStatsFunctions = Set.of( + BuiltinFunctionName.MAX.getName().getFunctionName(), + BuiltinFunctionName.MIN.getName().getFunctionName(), + BuiltinFunctionName.COUNT.getName().getFunctionName(), + BuiltinFunctionName.SUM.getName().getFunctionName(), + BuiltinFunctionName.AVG.getName().getFunctionName() + ); + + + /** + * Build Aggregation query from series selector query from expression. + * + * @return query string. + */ + public static String build(List namedAggregatorList, + List groupByList) { + + if (namedAggregatorList.size() > 1) { + throw new RuntimeException( + "Prometheus Catalog doesn't multiple aggregations in stats command"); + } + + if (!allowedStatsFunctions + .contains(namedAggregatorList.get(0).getFunctionName().getFunctionName())) { + throw new RuntimeException(String.format( + "Prometheus Catalog only supports %s aggregations.", allowedStatsFunctions)); + } + + StringBuilder aggregateQuery = new StringBuilder(); + aggregateQuery.append(namedAggregatorList.get(0).getFunctionName().getFunctionName()) + .append(" "); + + if (groupByList != null && !groupByList.isEmpty()) { + groupByList = groupByList.stream() + .filter(expression -> !(expression.getDelegated() instanceof SpanExpression)) + .collect(Collectors.toList()); + if (groupByList.size() > 0) { + aggregateQuery.append("by("); + aggregateQuery.append( + groupByList.stream().map(NamedExpression::getName).collect(Collectors.joining(", "))); + aggregateQuery.append(")"); + } + } + aggregateQuery + .append(" (") + .append(namedAggregatorList.get(0).getFunctionName().getFunctionName()) + .append("_over_time") + .append("(%s))"); + return aggregateQuery.toString(); + } + +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/SeriesSelectionQueryBuilder.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/SeriesSelectionQueryBuilder.java new file mode 100644 index 0000000000..baa235aa89 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/SeriesSelectionQueryBuilder.java @@ -0,0 +1,70 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.storage.querybuilder; + + +import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; + +import java.util.stream.Collectors; +import lombok.NoArgsConstructor; +import org.apache.commons.lang3.StringUtils; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.ExpressionNodeVisitor; +import org.opensearch.sql.expression.FunctionExpression; +import org.opensearch.sql.expression.ReferenceExpression; + +/** + * This class builds metric selection query from the filter condition + * and metric name. + */ +@NoArgsConstructor +public class SeriesSelectionQueryBuilder { + + + /** + * Build Prometheus series selector query from expression. + * + * @param filterCondition expression. + * @return query string + */ + public static String build(String metricName, Expression filterCondition) { + if (filterCondition != null) { + SeriesSelectionExpressionNodeVisitor seriesSelectionExpressionNodeVisitor + = new SeriesSelectionExpressionNodeVisitor(); + String selectorQuery = filterCondition.accept(seriesSelectionExpressionNodeVisitor, null); + return metricName + "{" + selectorQuery + "}"; + } + return metricName; + } + + static class SeriesSelectionExpressionNodeVisitor extends ExpressionNodeVisitor { + @Override + public String visitFunction(FunctionExpression func, Object context) { + if (func.getFunctionName().getFunctionName().equals("and")) { + return func.getArguments().stream() + .map(arg -> visitFunction((FunctionExpression) arg, context)) + .filter(StringUtils::isNotEmpty) + .collect(Collectors.joining(" , ")); + } else if (func.getFunctionName().getFunctionName().contains("=")) { + ReferenceExpression ref = (ReferenceExpression) func.getArguments().get(0); + if (!ref.getAttr().equals(TIMESTAMP)) { + return func.getArguments().get(0) + + func.getFunctionName().getFunctionName() + + func.getArguments().get(1); + } else { + return null; + } + } else { + throw new RuntimeException( + String.format("Prometheus Catalog doesn't support %s in where command.", + func.getFunctionName().getFunctionName())); + } + } + } + +} diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/StepParameterResolver.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/StepParameterResolver.java new file mode 100644 index 0000000000..54315bb792 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/StepParameterResolver.java @@ -0,0 +1,63 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.storage.querybuilder; + +import java.util.List; +import java.util.Optional; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import org.apache.commons.lang3.StringUtils; +import org.opensearch.sql.expression.NamedExpression; +import org.opensearch.sql.expression.span.SpanExpression; + +/** + * This class resolves step parameter required for + * query_range api of prometheus. + */ +@NoArgsConstructor +public class StepParameterResolver { + + /** + * Extract step from groupByList or apply heuristic arithmetic + * on endTime and startTime. + * + * + * @param startTime startTime. + * @param endTime endTime. + * @param groupByList groupByList. + * @return Step String. + */ + public static String resolve(@NonNull Long startTime, @NonNull Long endTime, + List groupByList) { + Optional spanExpression = getSpanExpression(groupByList); + if (spanExpression.isPresent()) { + if (StringUtils.isEmpty(spanExpression.get().getUnit().getName())) { + throw new RuntimeException("Missing TimeUnit in the span expression"); + } else { + return spanExpression.get().getValue().toString() + + spanExpression.get().getUnit().getName(); + } + } else { + return Math.max((endTime - startTime) / 250, 1) + "s"; + } + } + + private static Optional getSpanExpression( + List namedExpressionList) { + if (namedExpressionList == null) { + return Optional.empty(); + } + return namedExpressionList.stream() + .filter(expression -> expression.getDelegated() instanceof SpanExpression) + .map(expression -> (SpanExpression) expression.getDelegated()) + .findFirst(); + } + + + +} \ No newline at end of file diff --git a/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/TimeRangeParametersResolver.java b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/TimeRangeParametersResolver.java new file mode 100644 index 0000000000..6c338d61a6 --- /dev/null +++ b/prometheus/src/main/java/org/opensearch/sql/prometheus/storage/querybuilder/TimeRangeParametersResolver.java @@ -0,0 +1,76 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.storage.querybuilder; + +import java.util.Date; +import lombok.NoArgsConstructor; +import org.apache.commons.math3.util.Pair; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.ExpressionNodeVisitor; +import org.opensearch.sql.expression.FunctionExpression; +import org.opensearch.sql.expression.ReferenceExpression; + +@NoArgsConstructor +public class TimeRangeParametersResolver extends ExpressionNodeVisitor { + + + private Long startTime; + private Long endTime; + + /** + * Build Range Query Parameters from filter expression. + * If the filter condition consists of @timestamp, startTime and + * endTime are derived. or else it will be defaulted to now() and now()-1hr. + * If one of starttime and endtime are provided, the other will be derived from them + * by fixing the time range duration to 1hr. + * + * @param filterCondition expression. + * @return query string + */ + public Pair resolve(Expression filterCondition) { + if (filterCondition == null) { + long endTime = new Date().getTime() / 1000; + return Pair.create(endTime - 3600, endTime); + } + filterCondition.accept(this, null); + if (startTime == null && endTime == null) { + long endTime = new Date().getTime() / 1000; + return Pair.create(endTime - 3600, endTime); + } else if (startTime == null) { + return Pair.create(endTime - 3600, endTime); + } else if (endTime == null) { + return Pair.create(startTime, startTime + 3600); + } else { + return Pair.create(startTime, endTime); + } + } + + @Override + public Void visitFunction(FunctionExpression func, Object context) { + if (func.getFunctionName().getFunctionName().contains("=")) { + ReferenceExpression ref = (ReferenceExpression) func.getArguments().get(0); + Expression rightExpr = func.getArguments().get(1); + if (ref.getAttr().equals("@timestamp")) { + ExprValue literalValue = rightExpr.valueOf(null); + if (func.getFunctionName().getFunctionName().contains(">")) { + startTime = literalValue.timestampValue().toEpochMilli() / 1000; + } + if (func.getFunctionName().getFunctionName().contains("<")) { + endTime = literalValue.timestampValue().toEpochMilli() / 1000; + } + } + } else { + func.getArguments() + .forEach(arg -> visitFunction((FunctionExpression) arg, context)); + } + return null; + } + + +} diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/QueryRangeFunctionImplementationTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/QueryRangeFunctionImplementationTest.java index b20ee6f7d6..f2a54b7347 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/QueryRangeFunctionImplementationTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/QueryRangeFunctionImplementationTest.java @@ -9,6 +9,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -66,10 +68,10 @@ void testApplyArguments() { = new QueryRangeFunctionImplementation(functionName, namedArgumentExpressionList, client); PrometheusMetricTable prometheusMetricTable = (PrometheusMetricTable) queryRangeFunctionImplementation.applyArguments(); - assertFalse(prometheusMetricTable.getMetricName().isPresent()); - assertTrue(prometheusMetricTable.getPrometheusQueryRequest().isPresent()); + assertNull(prometheusMetricTable.getMetricName()); + assertNotNull(prometheusMetricTable.getPrometheusQueryRequest()); PrometheusQueryRequest prometheusQueryRequest - = prometheusMetricTable.getPrometheusQueryRequest().get(); + = prometheusMetricTable.getPrometheusQueryRequest(); assertEquals("http_latency", prometheusQueryRequest.getPromQl().toString()); assertEquals(12345, prometheusQueryRequest.getStartTime()); assertEquals(1234, prometheusQueryRequest.getEndTime()); diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/QueryRangeTableFunctionResolverTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/QueryRangeTableFunctionResolverTest.java index 9cc6231eb3..caca48f834 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/QueryRangeTableFunctionResolverTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/functions/QueryRangeTableFunctionResolverTest.java @@ -8,6 +8,7 @@ package org.opensearch.sql.prometheus.functions; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.opensearch.sql.data.type.ExprCoreType.LONG; @@ -62,11 +63,10 @@ void testResolve() { assertTrue(functionImplementation instanceof QueryRangeFunctionImplementation); PrometheusMetricTable prometheusMetricTable = (PrometheusMetricTable) functionImplementation.applyArguments(); - assertTrue(prometheusMetricTable.getPrometheusQueryRequest().isPresent()); + assertNotNull(prometheusMetricTable.getPrometheusQueryRequest()); PrometheusQueryRequest prometheusQueryRequest = - prometheusMetricTable.getPrometheusQueryRequest() - .get(); - assertEquals("http_latency", prometheusQueryRequest.getPromQl().toString()); + prometheusMetricTable.getPrometheusQueryRequest(); + assertEquals("http_latency", prometheusQueryRequest.getPromQl()); assertEquals(12345L, prometheusQueryRequest.getStartTime()); assertEquals(12345L, prometheusQueryRequest.getEndTime()); assertEquals("14", prometheusQueryRequest.getStep()); @@ -97,11 +97,10 @@ void testArgumentsPassedByPosition() { assertTrue(functionImplementation instanceof QueryRangeFunctionImplementation); PrometheusMetricTable prometheusMetricTable = (PrometheusMetricTable) functionImplementation.applyArguments(); - assertTrue(prometheusMetricTable.getPrometheusQueryRequest().isPresent()); + assertNotNull(prometheusMetricTable.getPrometheusQueryRequest()); PrometheusQueryRequest prometheusQueryRequest = - prometheusMetricTable.getPrometheusQueryRequest() - .get(); - assertEquals("http_latency", prometheusQueryRequest.getPromQl().toString()); + prometheusMetricTable.getPrometheusQueryRequest(); + assertEquals("http_latency", prometheusQueryRequest.getPromQl()); assertEquals(12345L, prometheusQueryRequest.getStartTime()); assertEquals(12345L, prometheusQueryRequest.getEndTime()); assertEquals("14", prometheusQueryRequest.getStep()); @@ -133,11 +132,10 @@ void testArgumentsPassedByNameWithDifferentOrder() { assertTrue(functionImplementation instanceof QueryRangeFunctionImplementation); PrometheusMetricTable prometheusMetricTable = (PrometheusMetricTable) functionImplementation.applyArguments(); - assertTrue(prometheusMetricTable.getPrometheusQueryRequest().isPresent()); + assertNotNull(prometheusMetricTable.getPrometheusQueryRequest()); PrometheusQueryRequest prometheusQueryRequest = - prometheusMetricTable.getPrometheusQueryRequest() - .get(); - assertEquals("http_latency", prometheusQueryRequest.getPromQl().toString()); + prometheusMetricTable.getPrometheusQueryRequest(); + assertEquals("http_latency", prometheusQueryRequest.getPromQl()); assertEquals(12345L, prometheusQueryRequest.getStartTime()); assertEquals(12345L, prometheusQueryRequest.getEndTime()); assertEquals("14", prometheusQueryRequest.getStep()); diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicOptimizerTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicOptimizerTest.java new file mode 100644 index 0000000000..7d6d3bed28 --- /dev/null +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/planner/logical/PrometheusLogicOptimizerTest.java @@ -0,0 +1,121 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.planner.logical; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.opensearch.sql.data.model.ExprValueUtils.stringValue; +import static org.opensearch.sql.data.type.ExprCoreType.DOUBLE; +import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; +import static org.opensearch.sql.planner.logical.LogicalPlanDSL.aggregation; +import static org.opensearch.sql.planner.logical.LogicalPlanDSL.filter; +import static org.opensearch.sql.planner.logical.LogicalPlanDSL.project; +import static org.opensearch.sql.planner.logical.LogicalPlanDSL.relation; +import static org.opensearch.sql.prometheus.utils.LogicalPlanUtils.indexScan; +import static org.opensearch.sql.prometheus.utils.LogicalPlanUtils.indexScanAgg; + +import com.google.common.collect.ImmutableList; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.sql.expression.DSL; +import org.opensearch.sql.expression.config.ExpressionConfig; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.optimizer.LogicalPlanOptimizer; +import org.opensearch.sql.storage.Table; + +@ExtendWith(MockitoExtension.class) +public class PrometheusLogicOptimizerTest { + + private final DSL dsl = new ExpressionConfig().dsl(new ExpressionConfig().functionRepository()); + + @Mock + private Table table; + + @Test + void project_filter_merge_with_relation() { + assertEquals( + project( + indexScan("prometheus_http_total_requests", + dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200")))) + ), + optimize( + project( + filter( + relation("prometheus_http_total_requests", table), + dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))) + )) + ) + ); + } + + @Test + void aggregation_merge_relation() { + assertEquals( + project( + indexScanAgg("prometheus_http_total_requests", ImmutableList + .of(DSL.named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of(DSL.named("code", DSL.ref("code", STRING)))), + DSL.named("AVG(intV)", DSL.ref("AVG(intV)", DOUBLE))), + optimize( + project( + aggregation( + relation("prometheus_http_total_requests", table), + ImmutableList + .of(DSL.named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of(DSL.named("code", + DSL.ref("code", STRING)))), + DSL.named("AVG(intV)", DSL.ref("AVG(intV)", DOUBLE))) + ) + ); + } + + + @Test + void aggregation_merge_filter_relation() { + assertEquals( + project( + indexScanAgg("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/")))), + ImmutableList + .of(DSL.named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of(DSL.named("job", DSL.ref("job", STRING)))), + DSL.named("AVG(@value)", DSL.ref("AVG(@value)", DOUBLE))), + optimize( + project( + aggregation( + filter( + relation("prometheus_http_total_requests", table), + dsl.and( + dsl.equal(DSL.ref("code", STRING), + DSL.literal(stringValue("200"))), + dsl.equal(DSL.ref("handler", STRING), + DSL.literal(stringValue("/ready/")))) + ), + ImmutableList + .of(DSL.named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of(DSL.named("job", + DSL.ref("job", STRING)))), + DSL.named("AVG(@value)", DSL.ref("AVG(@value)", DOUBLE))) + ) + ); + } + + + private LogicalPlan optimize(LogicalPlan plan) { + final LogicalPlanOptimizer optimizer = PrometheusLogicalPlanOptimizerFactory.create(); + return optimizer.optimize(plan); + } + +} diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/request/PrometheusDescribeMetricRequestTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/request/PrometheusDescribeMetricRequestTest.java index fdd9ce6392..a190abb6a1 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/request/PrometheusDescribeMetricRequestTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/request/PrometheusDescribeMetricRequestTest.java @@ -9,11 +9,9 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static org.opensearch.sql.data.model.ExprValueUtils.stringValue; import static org.opensearch.sql.prometheus.constants.TestConstants.METRIC_NAME; -import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; @@ -46,18 +44,15 @@ public class PrometheusDescribeMetricRequestTest { @SneakyThrows void testGetFieldTypes() { when(prometheusClient.getLabels(METRIC_NAME)).thenReturn(new ArrayList() {{ - add("__name__"); add("call"); add("code"); } }); Map expected = new HashMap<>() {{ - put("__name__", ExprCoreType.STRING); put("call", ExprCoreType.STRING); put("code", ExprCoreType.STRING); put(VALUE, ExprCoreType.DOUBLE); put(TIMESTAMP, ExprCoreType.TIMESTAMP); - put(LABELS, ExprCoreType.STRING); }}; PrometheusDescribeMetricRequest prometheusDescribeMetricRequest = new PrometheusDescribeMetricRequest(prometheusClient, @@ -73,7 +68,6 @@ void testGetFieldTypesWithEmptyMetricName() { Map expected = new HashMap<>() {{ put(VALUE, ExprCoreType.DOUBLE); put(TIMESTAMP, ExprCoreType.TIMESTAMP); - put(LABELS, ExprCoreType.STRING); }}; assertThrows(NullPointerException.class, () -> new PrometheusDescribeMetricRequest(prometheusClient, @@ -112,15 +106,16 @@ void testGetFieldTypesWhenIOException() { @Test @SneakyThrows void testSearch() { - when(prometheusClient.getLabels(METRIC_NAME)).thenReturn(new ArrayList() {{ + when(prometheusClient.getLabels(METRIC_NAME)).thenReturn(new ArrayList<>() { + { add("call"); } - }); + }); PrometheusDescribeMetricRequest prometheusDescribeMetricRequest = new PrometheusDescribeMetricRequest(prometheusClient, new CatalogSchemaName("test", "default"), METRIC_NAME); List result = prometheusDescribeMetricRequest.search(); - assertEquals(4, result.size()); + assertEquals(3, result.size()); assertEquals(expectedRow(), result.get(0)); verify(prometheusClient, times(1)).getLabels(METRIC_NAME); } diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScanTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScanTest.java index 16a2a7be0b..ac99a996af 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScanTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricScanTest.java @@ -9,6 +9,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; +import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; +import static org.opensearch.sql.data.type.ExprCoreType.LONG; import static org.opensearch.sql.prometheus.constants.TestConstants.ENDTIME; import static org.opensearch.sql.prometheus.constants.TestConstants.QUERY; import static org.opensearch.sql.prometheus.constants.TestConstants.STARTTIME; @@ -29,10 +31,13 @@ import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.sql.data.model.ExprDoubleValue; +import org.opensearch.sql.data.model.ExprIntegerValue; +import org.opensearch.sql.data.model.ExprLongValue; import org.opensearch.sql.data.model.ExprStringValue; import org.opensearch.sql.data.model.ExprTimestampValue; import org.opensearch.sql.data.model.ExprTupleValue; import org.opensearch.sql.prometheus.client.PrometheusClient; +import org.opensearch.sql.prometheus.storage.model.PrometheusResponseFieldNames; @ExtendWith(MockitoExtension.class) public class PrometheusMetricScanTest { @@ -44,7 +49,7 @@ public class PrometheusMetricScanTest { @SneakyThrows void testQueryResponseIterator() { PrometheusMetricScan prometheusMetricScan = new PrometheusMetricScan(prometheusClient); - prometheusMetricScan.getRequest().getPromQl().append(QUERY); + prometheusMetricScan.getRequest().setPromQl(QUERY); prometheusMetricScan.getRequest().setStartTime(STARTTIME); prometheusMetricScan.getRequest().setEndTime(ENDTIME); prometheusMetricScan.getRequest().setStep(STEP); @@ -56,6 +61,125 @@ void testQueryResponseIterator() { ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{ put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); put(VALUE, new ExprDoubleValue(1)); + put("instance", new ExprStringValue("localhost:9090")); + put("__name__", new ExprStringValue("up")); + put("job", new ExprStringValue("prometheus")); + } + }); + assertEquals(firstRow, prometheusMetricScan.next()); + Assertions.assertTrue(prometheusMetricScan.hasNext()); + ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{ + put("@timestamp", new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); + put("@value", new ExprDoubleValue(0)); + put("instance", new ExprStringValue("localhost:9091")); + put("__name__", new ExprStringValue("up")); + put("job", new ExprStringValue("node")); + } + }); + assertEquals(secondRow, prometheusMetricScan.next()); + Assertions.assertFalse(prometheusMetricScan.hasNext()); + } + + @Test + @SneakyThrows + void testQueryResponseIteratorWithGivenPrometheusResponseFieldNames() { + PrometheusResponseFieldNames prometheusResponseFieldNames + = new PrometheusResponseFieldNames(); + prometheusResponseFieldNames.setValueFieldName("count()"); + prometheusResponseFieldNames.setValueType(INTEGER); + prometheusResponseFieldNames.setTimestampFieldName(TIMESTAMP); + PrometheusMetricScan prometheusMetricScan = new PrometheusMetricScan(prometheusClient); + prometheusMetricScan.setPrometheusResponseFieldNames(prometheusResponseFieldNames); + prometheusMetricScan.getRequest().setPromQl(QUERY); + prometheusMetricScan.getRequest().setStartTime(STARTTIME); + prometheusMetricScan.getRequest().setEndTime(ENDTIME); + prometheusMetricScan.getRequest().setStep(STEP); + + when(prometheusClient.queryRange(any(), any(), any(), any())) + .thenReturn(new JSONObject(getJson("query_range_result.json"))); + prometheusMetricScan.open(); + Assertions.assertTrue(prometheusMetricScan.hasNext()); + ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{ + put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); + put("count()", new ExprIntegerValue(1)); + put("instance", new ExprStringValue("localhost:9090")); + put("__name__", new ExprStringValue("up")); + put("job", new ExprStringValue("prometheus")); + } + }); + assertEquals(firstRow, prometheusMetricScan.next()); + Assertions.assertTrue(prometheusMetricScan.hasNext()); + ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{ + put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); + put("count()", new ExprIntegerValue(0)); + put("instance", new ExprStringValue("localhost:9091")); + put("__name__", new ExprStringValue("up")); + put("job", new ExprStringValue("node")); + } + }); + assertEquals(secondRow, prometheusMetricScan.next()); + Assertions.assertFalse(prometheusMetricScan.hasNext()); + } + + + @Test + @SneakyThrows + void testQueryResponseIteratorWithGivenPrometheusResponseWithLongInAggType() { + PrometheusResponseFieldNames prometheusResponseFieldNames + = new PrometheusResponseFieldNames(); + prometheusResponseFieldNames.setValueFieldName("testAgg"); + prometheusResponseFieldNames.setValueType(LONG); + prometheusResponseFieldNames.setTimestampFieldName(TIMESTAMP); + PrometheusMetricScan prometheusMetricScan = new PrometheusMetricScan(prometheusClient); + prometheusMetricScan.setPrometheusResponseFieldNames(prometheusResponseFieldNames); + prometheusMetricScan.getRequest().setPromQl(QUERY); + prometheusMetricScan.getRequest().setStartTime(STARTTIME); + prometheusMetricScan.getRequest().setEndTime(ENDTIME); + prometheusMetricScan.getRequest().setStep(STEP); + + when(prometheusClient.queryRange(any(), any(), any(), any())) + .thenReturn(new JSONObject(getJson("query_range_result.json"))); + prometheusMetricScan.open(); + Assertions.assertTrue(prometheusMetricScan.hasNext()); + ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{ + put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); + put("testAgg", new ExprLongValue(1)); + put("instance", new ExprStringValue("localhost:9090")); + put("__name__", new ExprStringValue("up")); + put("job", new ExprStringValue("prometheus")); + } + }); + assertEquals(firstRow, prometheusMetricScan.next()); + Assertions.assertTrue(prometheusMetricScan.hasNext()); + ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{ + put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); + put("testAgg", new ExprLongValue(0)); + put("instance", new ExprStringValue("localhost:9091")); + put("__name__", new ExprStringValue("up")); + put("job", new ExprStringValue("node")); + } + }); + assertEquals(secondRow, prometheusMetricScan.next()); + Assertions.assertFalse(prometheusMetricScan.hasNext()); + } + + @Test + @SneakyThrows + void testQueryResponseIteratorForQueryRangeFunction() { + PrometheusMetricScan prometheusMetricScan = new PrometheusMetricScan(prometheusClient); + prometheusMetricScan.setIsQueryRangeFunctionScan(Boolean.TRUE); + prometheusMetricScan.getRequest().setPromQl(QUERY); + prometheusMetricScan.getRequest().setStartTime(STARTTIME); + prometheusMetricScan.getRequest().setEndTime(ENDTIME); + prometheusMetricScan.getRequest().setStep(STEP); + + when(prometheusClient.queryRange(any(), any(), any(), any())) + .thenReturn(new JSONObject(getJson("query_range_result.json"))); + prometheusMetricScan.open(); + Assertions.assertTrue(prometheusMetricScan.hasNext()); + ExprTupleValue firstRow = new ExprTupleValue(new LinkedHashMap<>() {{ + put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); + put(VALUE, new ExprLongValue(1)); put(LABELS, new ExprStringValue( "{\"instance\":\"localhost:9090\",\"__name__\":\"up\",\"job\":\"prometheus\"}")); } @@ -64,7 +188,7 @@ void testQueryResponseIterator() { Assertions.assertTrue(prometheusMetricScan.hasNext()); ExprTupleValue secondRow = new ExprTupleValue(new LinkedHashMap<>() {{ put(TIMESTAMP, new ExprTimestampValue(Instant.ofEpochMilli(1435781430781L))); - put(VALUE, new ExprDoubleValue(0)); + put(VALUE, new ExprLongValue(0)); put(LABELS, new ExprStringValue( "{\"instance\":\"localhost:9091\",\"__name__\":\"up\",\"job\":\"node\"}")); } @@ -77,7 +201,7 @@ void testQueryResponseIterator() { @SneakyThrows void testEmptyQueryResponseIterator() { PrometheusMetricScan prometheusMetricScan = new PrometheusMetricScan(prometheusClient); - prometheusMetricScan.getRequest().getPromQl().append(QUERY); + prometheusMetricScan.getRequest().setPromQl(QUERY); prometheusMetricScan.getRequest().setStartTime(STARTTIME); prometheusMetricScan.getRequest().setEndTime(ENDTIME); prometheusMetricScan.getRequest().setStep(STEP); @@ -92,7 +216,7 @@ void testEmptyQueryResponseIterator() { @SneakyThrows void testEmptyQueryWithNoMatrixKeyInResultJson() { PrometheusMetricScan prometheusMetricScan = new PrometheusMetricScan(prometheusClient); - prometheusMetricScan.getRequest().getPromQl().append(QUERY); + prometheusMetricScan.getRequest().setPromQl(QUERY); prometheusMetricScan.getRequest().setStartTime(STARTTIME); prometheusMetricScan.getRequest().setEndTime(ENDTIME); prometheusMetricScan.getRequest().setStep(STEP); @@ -110,7 +234,7 @@ void testEmptyQueryWithNoMatrixKeyInResultJson() { @SneakyThrows void testEmptyQueryWithException() { PrometheusMetricScan prometheusMetricScan = new PrometheusMetricScan(prometheusClient); - prometheusMetricScan.getRequest().getPromQl().append(QUERY); + prometheusMetricScan.getRequest().setPromQl(QUERY); prometheusMetricScan.getRequest().setStartTime(STARTTIME); prometheusMetricScan.getRequest().setEndTime(ENDTIME); prometheusMetricScan.getRequest().setStep(STEP); @@ -127,7 +251,7 @@ void testEmptyQueryWithException() { @SneakyThrows void testExplain() { PrometheusMetricScan prometheusMetricScan = new PrometheusMetricScan(prometheusClient); - prometheusMetricScan.getRequest().getPromQl().append(QUERY); + prometheusMetricScan.getRequest().setPromQl(QUERY); prometheusMetricScan.getRequest().setStartTime(STARTTIME); prometheusMetricScan.getRequest().setEndTime(ENDTIME); prometheusMetricScan.getRequest().setStep(STEP); diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java index 116ff72a6a..ff5ae5dcf5 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusMetricTableTest.java @@ -6,32 +6,50 @@ package org.opensearch.sql.prometheus.storage; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.sql.data.model.ExprValueUtils.fromObjectValue; +import static org.opensearch.sql.data.model.ExprValueUtils.stringValue; +import static org.opensearch.sql.data.type.ExprCoreType.INTEGER; +import static org.opensearch.sql.data.type.ExprCoreType.STRING; +import static org.opensearch.sql.expression.DSL.named; +import static org.opensearch.sql.expression.DSL.ref; +import static org.opensearch.sql.planner.logical.LogicalPlanDSL.filter; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.project; import static org.opensearch.sql.planner.logical.LogicalPlanDSL.relation; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.LABELS; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP; import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE; +import static org.opensearch.sql.prometheus.utils.LogicalPlanUtils.indexScan; +import static org.opensearch.sql.prometheus.utils.LogicalPlanUtils.indexScanAgg; +import static org.opensearch.sql.prometheus.utils.LogicalPlanUtils.testLogicalPlanNode; +import com.google.common.collect.ImmutableList; +import java.text.DateFormat; +import java.text.SimpleDateFormat; import java.util.ArrayList; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; import lombok.SneakyThrows; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.sql.data.type.ExprCoreType; import org.opensearch.sql.data.type.ExprType; +import org.opensearch.sql.expression.DSL; import org.opensearch.sql.expression.NamedExpression; -import org.opensearch.sql.expression.ReferenceExpression; +import org.opensearch.sql.expression.config.ExpressionConfig; import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.planner.physical.ProjectOperator; @@ -45,9 +63,11 @@ class PrometheusMetricTableTest { @Mock private PrometheusClient client; + private final DSL dsl = new ExpressionConfig().dsl(new ExpressionConfig().functionRepository()); + @Test @SneakyThrows - void getFieldTypesFromMetric() { + void testGetFieldTypesFromMetric() { when(client.getLabels(TestConstants.METRIC_NAME)).thenReturn(List.of("label1", "label2")); PrometheusMetricTable prometheusMetricTable = new PrometheusMetricTable(client, TestConstants.METRIC_NAME); @@ -56,44 +76,51 @@ void getFieldTypesFromMetric() { expectedFieldTypes.put("label2", ExprCoreType.STRING); expectedFieldTypes.put(VALUE, ExprCoreType.DOUBLE); expectedFieldTypes.put(TIMESTAMP, ExprCoreType.TIMESTAMP); - expectedFieldTypes.put(LABELS, ExprCoreType.STRING); Map fieldTypes = prometheusMetricTable.getFieldTypes(); assertEquals(expectedFieldTypes, fieldTypes); verify(client, times(1)).getLabels(TestConstants.METRIC_NAME); - assertFalse(prometheusMetricTable.getPrometheusQueryRequest().isPresent()); - assertTrue(prometheusMetricTable.getMetricName().isPresent()); + assertNull(prometheusMetricTable.getPrometheusQueryRequest()); + assertNotNull(prometheusMetricTable.getMetricName()); + + //testing Caching fieldTypes = prometheusMetricTable.getFieldTypes(); + + assertEquals(expectedFieldTypes, fieldTypes); verifyNoMoreInteractions(client); + assertNull(prometheusMetricTable.getPrometheusQueryRequest()); + assertNotNull(prometheusMetricTable.getMetricName()); } @Test @SneakyThrows - void getFieldTypesFromPrometheusQueryRequest() { + void testGetFieldTypesFromPrometheusQueryRequest() { PrometheusMetricTable prometheusMetricTable = new PrometheusMetricTable(client, new PrometheusQueryRequest()); Map expectedFieldTypes = new HashMap<>(); expectedFieldTypes.put(VALUE, ExprCoreType.DOUBLE); expectedFieldTypes.put(TIMESTAMP, ExprCoreType.TIMESTAMP); - expectedFieldTypes.put(LABELS, ExprCoreType.STRING); + expectedFieldTypes.put(LABELS, STRING); Map fieldTypes = prometheusMetricTable.getFieldTypes(); assertEquals(expectedFieldTypes, fieldTypes); verifyNoMoreInteractions(client); - assertTrue(prometheusMetricTable.getPrometheusQueryRequest().isPresent()); - assertFalse(prometheusMetricTable.getMetricName().isPresent()); + assertNotNull(prometheusMetricTable.getPrometheusQueryRequest()); + assertNull(prometheusMetricTable.getMetricName()); } @Test - void testImplement() { + void testImplementWithQueryRangeFunction() { PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest(); + prometheusQueryRequest.setPromQl("test"); + prometheusQueryRequest.setStep("15m"); PrometheusMetricTable prometheusMetricTable = new PrometheusMetricTable(client, prometheusQueryRequest); List finalProjectList = new ArrayList<>(); - finalProjectList.add( - new NamedExpression(LABELS, new ReferenceExpression(LABELS, ExprCoreType.STRING))); + finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); + finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP))); PhysicalPlan plan = prometheusMetricTable.implement( project(relation("query_range", prometheusMetricTable), finalProjectList, null)); @@ -103,21 +130,608 @@ void testImplement() { List projectList = ((ProjectOperator) plan).getProjectList(); List outputFields = projectList.stream().map(NamedExpression::getName).collect(Collectors.toList()); - assertEquals(List.of(LABELS, TIMESTAMP, VALUE), outputFields); + assertEquals(List.of(VALUE, TIMESTAMP), outputFields); assertTrue(((ProjectOperator) plan).getInput() instanceof PrometheusMetricScan); PrometheusMetricScan prometheusMetricScan = (PrometheusMetricScan) ((ProjectOperator) plan).getInput(); assertEquals(prometheusQueryRequest, prometheusMetricScan.getRequest()); } + @Test + void testImplementWithBasicMetricQuery() { + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_requests_total"); + List finalProjectList = new ArrayList<>(); + finalProjectList.add(named("@value", ref("@value", ExprCoreType.DOUBLE))); + PhysicalPlan plan = prometheusMetricTable.implement( + project(relation("prometheus_http_requests_total", prometheusMetricTable), + finalProjectList, null)); + + assertTrue(plan instanceof ProjectOperator); + List projectList = ((ProjectOperator) plan).getProjectList(); + List outputFields + = projectList.stream().map(NamedExpression::getName).collect(Collectors.toList()); + assertEquals(List.of(VALUE), outputFields); + assertTrue(((ProjectOperator) plan).getInput() instanceof PrometheusMetricScan); + PrometheusMetricScan prometheusMetricScan = + (PrometheusMetricScan) ((ProjectOperator) plan).getInput(); + assertEquals("prometheus_http_requests_total", prometheusMetricScan.getRequest().getPromQl()); + assertEquals(3600 / 250 + "s", prometheusMetricScan.getRequest().getStep()); + } + + + @Test + void testImplementPrometheusQueryWithStatsQueryAndNoFilter() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + + // IndexScanAgg without Filter + PhysicalPlan plan = prometheusMetricTable.implement( + filter( + indexScanAgg("prometheus_http_total_requests", ImmutableList + .of(named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of(named("code", DSL.ref("code", STRING)), + named("span", DSL.span(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal(40), "s")))), + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/")))))); + + assertTrue(plan.getChild().get(0) instanceof PrometheusMetricScan); + PrometheusQueryRequest prometheusQueryRequest = + ((PrometheusMetricScan) plan.getChild().get(0)).getRequest(); + assertEquals( + "avg by(code) (avg_over_time(prometheus_http_total_requests[40s]))", + prometheusQueryRequest.getPromQl()); + } + + @Test + void testImplementPrometheusQueryWithStatsQueryAndFilter() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + + + // IndexScanAgg with Filter + PhysicalPlan plan = prometheusMetricTable.implement( + indexScanAgg("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/")))), + ImmutableList + .of(named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of(named("job", DSL.ref("job", STRING)), + named("span", DSL.span(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal(40), "s"))))); + assertTrue(plan instanceof PrometheusMetricScan); + PrometheusQueryRequest prometheusQueryRequest = ((PrometheusMetricScan) plan).getRequest(); + assertEquals( + "avg by(job) (avg_over_time" + + "(prometheus_http_total_requests{code=\"200\" , handler=\"/ready/\"}[40s]))", + prometheusQueryRequest.getPromQl()); + + } + + + @Test + void testImplementPrometheusQueryWithStatsQueryAndFilterAndProject() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + + // IndexScanAgg with Filter and Project + List finalProjectList = new ArrayList<>(); + finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); + finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP))); + PhysicalPlan plan = prometheusMetricTable.implement( + project(indexScanAgg("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/")))), + ImmutableList + .of(DSL.named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of(DSL.named("job", DSL.ref("job", STRING)), + named("span", DSL.span(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal(40), "s")))), + finalProjectList, null)); + assertTrue(plan instanceof ProjectOperator); + assertTrue(((ProjectOperator) plan).getInput() instanceof PrometheusMetricScan); + PrometheusQueryRequest request + = ((PrometheusMetricScan) ((ProjectOperator) plan).getInput()).getRequest(); + assertEquals(request.getStep(), "40s"); + assertEquals("avg by(job) (avg_over_time" + + "(prometheus_http_total_requests{code=\"200\" , handler=\"/ready/\"}[40s]))", + request.getPromQl()); + List projectList = ((ProjectOperator) plan).getProjectList(); + List outputFields + = projectList.stream().map(NamedExpression::getName).collect(Collectors.toList()); + assertEquals(List.of(VALUE, TIMESTAMP), outputFields); + } + + + @Test + void testTimeRangeResolver() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + + + //Both endTime and startTime are set. + List finalProjectList = new ArrayList<>(); + finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); + finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP))); + Long endTime = new Date(System.currentTimeMillis()).getTime(); + Long startTime = new Date(System.currentTimeMillis() - 4800 * 1000).getTime(); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + PhysicalPlan plan = prometheusMetricTable.implement( + project(indexScanAgg("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.and( + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))), + dsl.and(dsl.gte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(startTime)), + ExprCoreType.TIMESTAMP))), + dsl.lte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(endTime)), + ExprCoreType.TIMESTAMP)))))), + ImmutableList + .of(named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of(named("job", DSL.ref("job", STRING)), + named("span", DSL.span(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal(40), "s")))), + finalProjectList, null)); + assertTrue(plan instanceof ProjectOperator); + assertTrue(((ProjectOperator) plan).getInput() instanceof PrometheusMetricScan); + PrometheusQueryRequest request + = ((PrometheusMetricScan) ((ProjectOperator) plan).getInput()).getRequest(); + assertEquals("40s", request.getStep()); + assertEquals("avg by(job) (avg_over_time" + + "(prometheus_http_total_requests{code=\"200\" , handler=\"/ready/\"}[40s]))", + request.getPromQl()); + List projectList = ((ProjectOperator) plan).getProjectList(); + List outputFields + = projectList.stream().map(NamedExpression::getName).collect(Collectors.toList()); + assertEquals(List.of(VALUE, TIMESTAMP), outputFields); + } + + @Test + void testTimeRangeResolverWithOutEndTimeInFilter() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + + + //Both endTime and startTime are set. + List finalProjectList = new ArrayList<>(); + finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); + finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP))); + Long startTime = new Date(System.currentTimeMillis() - 4800 * 1000).getTime(); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + PhysicalPlan plan = prometheusMetricTable.implement( + project(indexScanAgg("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.and( + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))), + dsl.gte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(startTime)), + ExprCoreType.TIMESTAMP))))), + ImmutableList + .of(named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of(named("job", DSL.ref("job", STRING)), + named("span", DSL.span(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal(40), "s")))), + finalProjectList, null)); + assertTrue(plan instanceof ProjectOperator); + assertTrue(((ProjectOperator) plan).getInput() instanceof PrometheusMetricScan); + PrometheusQueryRequest request + = ((PrometheusMetricScan) ((ProjectOperator) plan).getInput()).getRequest(); + assertEquals("40s", request.getStep()); + assertEquals("avg by(job) (avg_over_time" + + "(prometheus_http_total_requests{code=\"200\" , handler=\"/ready/\"}[40s]))", + request.getPromQl()); + List projectList = ((ProjectOperator) plan).getProjectList(); + List outputFields + = projectList.stream().map(NamedExpression::getName).collect(Collectors.toList()); + assertEquals(List.of(VALUE, TIMESTAMP), outputFields); + } + + @Test + void testTimeRangeResolverWithOutStartTimeInFilter() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + + + //Both endTime and startTime are set. + List finalProjectList = new ArrayList<>(); + finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); + finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP))); + Long endTime = new Date(System.currentTimeMillis() - 4800 * 1000).getTime(); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + PhysicalPlan plan = prometheusMetricTable.implement( + project(indexScanAgg("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.and( + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))), + dsl.lte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(endTime)), + ExprCoreType.TIMESTAMP))))), + ImmutableList + .of(named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of(named("job", DSL.ref("job", STRING)), + named("span", DSL.span(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal(40), "s")))), + finalProjectList, null)); + assertTrue(plan instanceof ProjectOperator); + assertTrue(((ProjectOperator) plan).getInput() instanceof PrometheusMetricScan); + PrometheusQueryRequest request + = ((PrometheusMetricScan) ((ProjectOperator) plan).getInput()).getRequest(); + assertEquals("40s", request.getStep()); + assertEquals("avg by(job) (avg_over_time" + + "(prometheus_http_total_requests{code=\"200\" , handler=\"/ready/\"}[40s]))", + request.getPromQl()); + List projectList = ((ProjectOperator) plan).getProjectList(); + List outputFields + = projectList.stream().map(NamedExpression::getName).collect(Collectors.toList()); + assertEquals(List.of(VALUE, TIMESTAMP), outputFields); + } + + + @Test + void testSpanResolverWithoutSpanExpression() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + + + List finalProjectList = new ArrayList<>(); + finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); + Long endTime = new Date(System.currentTimeMillis()).getTime(); + Long startTime = new Date(System.currentTimeMillis() - 4800 * 1000).getTime(); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + LogicalPlan plan = project(indexScanAgg("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.and( + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))), + dsl.and(dsl.gte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(startTime)), + ExprCoreType.TIMESTAMP))), + dsl.lte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(endTime)), + ExprCoreType.TIMESTAMP)))))), + ImmutableList + .of(named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + null), + finalProjectList, null); + RuntimeException runtimeException + = Assertions.assertThrows(RuntimeException.class, + () -> prometheusMetricTable.implement(plan)); + Assertions.assertEquals("Prometheus Catalog doesn't support " + + "aggregations without span expression", + runtimeException.getMessage()); + } + + @Test + void testSpanResolverWithEmptyGroupByList() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + + + List finalProjectList = new ArrayList<>(); + finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); + Long endTime = new Date(System.currentTimeMillis()).getTime(); + Long startTime = new Date(System.currentTimeMillis() - 4800 * 1000).getTime(); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + LogicalPlan plan = project(indexScanAgg("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.and( + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))), + dsl.and(dsl.gte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(startTime)), + ExprCoreType.TIMESTAMP))), + dsl.lte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(endTime)), + ExprCoreType.TIMESTAMP)))))), + ImmutableList + .of(named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of()), + finalProjectList, null); + RuntimeException runtimeException + = Assertions.assertThrows(RuntimeException.class, + () -> prometheusMetricTable.implement(plan)); + Assertions.assertEquals("Prometheus Catalog doesn't support " + + "aggregations without span expression", + runtimeException.getMessage()); + } + + @Test + void testSpanResolverWithSpanExpression() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + + + List finalProjectList = new ArrayList<>(); + finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); + finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP))); + Long endTime = new Date(System.currentTimeMillis()).getTime(); + Long startTime = new Date(System.currentTimeMillis() - 4800 * 1000).getTime(); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + PhysicalPlan plan = prometheusMetricTable.implement( + project(indexScanAgg("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.and( + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))), + dsl.and(dsl.gte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(startTime)), + ExprCoreType.TIMESTAMP))), + dsl.lte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(endTime)), + ExprCoreType.TIMESTAMP)))))), + ImmutableList + .of(named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of(named("job", DSL.ref("job", STRING)), + named("span", DSL.span(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal(40), "s")))), + finalProjectList, null)); + assertTrue(plan instanceof ProjectOperator); + assertTrue(((ProjectOperator) plan).getInput() instanceof PrometheusMetricScan); + PrometheusQueryRequest request + = ((PrometheusMetricScan) ((ProjectOperator) plan).getInput()).getRequest(); + assertEquals("40s", request.getStep()); + assertEquals("avg by(job) (avg_over_time" + + "(prometheus_http_total_requests{code=\"200\" , handler=\"/ready/\"}[40s]))", + request.getPromQl()); + List projectList = ((ProjectOperator) plan).getProjectList(); + List outputFields + = projectList.stream().map(NamedExpression::getName).collect(Collectors.toList()); + assertEquals(List.of(VALUE, TIMESTAMP), outputFields); + } + + @Test + void testExpressionWithMissingTimeUnitInSpanExpression() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + + + List finalProjectList = new ArrayList<>(); + finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); + finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP))); + Long endTime = new Date(System.currentTimeMillis()).getTime(); + Long startTime = new Date(System.currentTimeMillis() - 4800 * 1000).getTime(); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + LogicalPlan logicalPlan = project(indexScanAgg("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.and( + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))), + dsl.and(dsl.gte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(startTime)), + ExprCoreType.TIMESTAMP))), + dsl.lte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(endTime)), + ExprCoreType.TIMESTAMP)))))), + ImmutableList + .of(named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of(named("job", DSL.ref("job", STRING)), + named("span", DSL.span(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal(40), "")))), + finalProjectList, null); + RuntimeException exception = + Assertions.assertThrows(RuntimeException.class, + () -> prometheusMetricTable.implement(logicalPlan)); + assertEquals("Missing TimeUnit in the span expression", exception.getMessage()); + } + + @Test + void testPrometheusQueryWithOnlySpanExpressionInGroupByList() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + + + List finalProjectList = new ArrayList<>(); + finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); + finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP))); + Long endTime = new Date(System.currentTimeMillis()).getTime(); + Long startTime = new Date(System.currentTimeMillis() - 4800 * 1000).getTime(); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + PhysicalPlan plan = prometheusMetricTable.implement( + project(indexScanAgg("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.and( + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))), + dsl.and(dsl.gte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(startTime)), + ExprCoreType.TIMESTAMP))), + dsl.lte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(endTime)), + ExprCoreType.TIMESTAMP)))))), + ImmutableList + .of(named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of( + named("span", DSL.span(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal(40), "s")))), + finalProjectList, null)); + assertTrue(plan instanceof ProjectOperator); + assertTrue(((ProjectOperator) plan).getInput() instanceof PrometheusMetricScan); + PrometheusQueryRequest request + = ((PrometheusMetricScan) ((ProjectOperator) plan).getInput()).getRequest(); + assertEquals("40s", request.getStep()); + assertEquals("avg (avg_over_time" + + "(prometheus_http_total_requests{code=\"200\" , handler=\"/ready/\"}[40s]))", + request.getPromQl()); + List projectList = ((ProjectOperator) plan).getProjectList(); + List outputFields + = projectList.stream().map(NamedExpression::getName).collect(Collectors.toList()); + assertEquals(List.of(VALUE, TIMESTAMP), outputFields); + } + + @Test + void testStatsWithNoGroupByList() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + + + List finalProjectList = new ArrayList<>(); + finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); + finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP))); + Long endTime = new Date(System.currentTimeMillis()).getTime(); + Long startTime = new Date(System.currentTimeMillis() - 4800 * 1000).getTime(); + DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + PhysicalPlan plan = prometheusMetricTable.implement( + project(indexScanAgg("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.and( + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))), + dsl.and(dsl.gte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(startTime)), + ExprCoreType.TIMESTAMP))), + dsl.lte(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal( + fromObjectValue(dateFormat.format(new Date(endTime)), + ExprCoreType.TIMESTAMP)))))), + ImmutableList + .of(named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of(named("span", + DSL.span(DSL.ref("@timestamp", ExprCoreType.TIMESTAMP), + DSL.literal(40), "s")))), + finalProjectList, null)); + assertTrue(plan instanceof ProjectOperator); + assertTrue(((ProjectOperator) plan).getInput() instanceof PrometheusMetricScan); + PrometheusQueryRequest request + = ((PrometheusMetricScan) ((ProjectOperator) plan).getInput()).getRequest(); + assertEquals("40s", request.getStep()); + assertEquals("avg (avg_over_time" + + "(prometheus_http_total_requests{code=\"200\" , handler=\"/ready/\"}[40s]))", + request.getPromQl()); + List projectList = ((ProjectOperator) plan).getProjectList(); + List outputFields + = projectList.stream().map(NamedExpression::getName).collect(Collectors.toList()); + assertEquals(List.of(VALUE, TIMESTAMP), outputFields); + } + + @Test + void testImplementWithUnexpectedLogicalNode() { + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + LogicalPlan plan = project(testLogicalPlanNode()); + RuntimeException runtimeException = Assertions.assertThrows(RuntimeException.class, + () -> prometheusMetricTable.implement(plan)); + assertEquals("unexpected plan node type class" + + " org.opensearch.sql.prometheus.utils.LogicalPlanUtils$TestLogicalPlan", + runtimeException.getMessage()); + } + + @Test + void testMultipleAggregationsThrowsRuntimeException() { + + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + LogicalPlan plan = project(indexScanAgg("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/")))), + ImmutableList + .of(named("AVG(@value)", + dsl.avg(DSL.ref("@value", INTEGER))), + named("SUM(@value)", + dsl.avg(DSL.ref("@value", INTEGER)))), + ImmutableList.of(named("job", DSL.ref("job", STRING))))); + + RuntimeException runtimeException = Assertions.assertThrows(RuntimeException.class, + () -> prometheusMetricTable.implement(plan)); + assertEquals("Prometheus Catalog doesn't multiple aggregations in stats command", + runtimeException.getMessage()); + } + + + @Test + void testUnSupportedAggregation() { + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + LogicalPlan plan = project(indexScanAgg("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/")))), + ImmutableList + .of(named("VAR_SAMP(@value)", + dsl.varSamp(DSL.ref("@value", INTEGER)))), + ImmutableList.of(named("job", DSL.ref("job", STRING))))); + + RuntimeException runtimeException = Assertions.assertThrows(RuntimeException.class, + () -> prometheusMetricTable.implement(plan)); + assertTrue(runtimeException.getMessage().contains("Prometheus Catalog only supports")); + } + + @Test + void testImplementWithORConditionInWhereClause() { + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + LogicalPlan plan = indexScan("prometheus_http_total_requests", + dsl.or(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))))); + RuntimeException exception + = assertThrows(RuntimeException.class, () -> prometheusMetricTable.implement(plan)); + assertEquals("Prometheus Catalog doesn't support or in where command.", exception.getMessage()); + } + + @Test + void testImplementWithRelationAndFilter() { + List finalProjectList = new ArrayList<>(); + finalProjectList.add(DSL.named(VALUE, DSL.ref(VALUE, STRING))); + finalProjectList.add(DSL.named(TIMESTAMP, DSL.ref(TIMESTAMP, ExprCoreType.TIMESTAMP))); + PrometheusMetricTable prometheusMetricTable = + new PrometheusMetricTable(client, "prometheus_http_total_requests"); + LogicalPlan logicalPlan = project(indexScan("prometheus_http_total_requests", + dsl.and(dsl.equal(DSL.ref("code", STRING), DSL.literal(stringValue("200"))), + dsl.equal(DSL.ref("handler", STRING), DSL.literal(stringValue("/ready/"))))), + finalProjectList, null); + PhysicalPlan physicalPlan = prometheusMetricTable.implement(logicalPlan); + assertTrue(physicalPlan instanceof ProjectOperator); + assertTrue(((ProjectOperator) physicalPlan).getInput() instanceof PrometheusMetricScan); + PrometheusQueryRequest request + = ((PrometheusMetricScan) ((ProjectOperator) physicalPlan).getInput()).getRequest(); + assertEquals((3600 / 250) + "s", request.getStep()); + assertEquals("prometheus_http_total_requests{code=\"200\" , handler=\"/ready/\"}", + request.getPromQl()); + List projectList = ((ProjectOperator) physicalPlan).getProjectList(); + List outputFields + = projectList.stream().map(NamedExpression::getName).collect(Collectors.toList()); + assertEquals(List.of(VALUE, TIMESTAMP), outputFields); + } + @Test void testOptimize() { PrometheusQueryRequest prometheusQueryRequest = new PrometheusQueryRequest(); PrometheusMetricTable prometheusMetricTable = new PrometheusMetricTable(client, prometheusQueryRequest); List finalProjectList = new ArrayList<>(); - finalProjectList.add( - new NamedExpression(LABELS, new ReferenceExpression(LABELS, ExprCoreType.STRING))); LogicalPlan inputPlan = project(relation("query_range", prometheusMetricTable), finalProjectList, null); LogicalPlan optimizedPlan = prometheusMetricTable.optimize( diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java index 992b17b549..fadd061072 100644 --- a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/PrometheusStorageEngineTest.java @@ -35,7 +35,8 @@ class PrometheusStorageEngineTest { public void getTable() { PrometheusStorageEngine engine = new PrometheusStorageEngine(client); Table table = engine.getTable(new CatalogSchemaName("prometheus", "default"), "test"); - assertNull(table); + assertNotNull(table); + assertTrue(table instanceof PrometheusMetricTable); } @Test diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/querybuilders/StepParameterResolverTest.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/querybuilders/StepParameterResolverTest.java new file mode 100644 index 0000000000..37e24a56b5 --- /dev/null +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/storage/querybuilders/StepParameterResolverTest.java @@ -0,0 +1,26 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.storage.querybuilders; + +import java.util.Collections; +import java.util.Date; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.prometheus.storage.querybuilder.StepParameterResolver; + +public class StepParameterResolverTest { + + @Test + void testNullChecks() { + StepParameterResolver stepParameterResolver = new StepParameterResolver(); + Assertions.assertThrows(NullPointerException.class, + () -> stepParameterResolver.resolve(null, new Date().getTime(), Collections.emptyList())); + Assertions.assertThrows(NullPointerException.class, + () -> stepParameterResolver.resolve(new Date().getTime(), null, Collections.emptyList())); + } +} diff --git a/prometheus/src/test/java/org/opensearch/sql/prometheus/utils/LogicalPlanUtils.java b/prometheus/src/test/java/org/opensearch/sql/prometheus/utils/LogicalPlanUtils.java new file mode 100644 index 0000000000..5fcebf52e6 --- /dev/null +++ b/prometheus/src/test/java/org/opensearch/sql/prometheus/utils/LogicalPlanUtils.java @@ -0,0 +1,77 @@ +/* + * + * * Copyright OpenSearch Contributors + * * SPDX-License-Identifier: Apache-2.0 + * + */ + +package org.opensearch.sql.prometheus.utils; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import org.opensearch.sql.expression.Expression; +import org.opensearch.sql.expression.NamedExpression; +import org.opensearch.sql.expression.aggregation.NamedAggregator; +import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor; +import org.opensearch.sql.prometheus.planner.logical.PrometheusLogicalMetricAgg; +import org.opensearch.sql.prometheus.planner.logical.PrometheusLogicalMetricScan; + +public class LogicalPlanUtils { + + /** + * Build PrometheusLogicalMetricScan. + */ + public static LogicalPlan indexScan(String metricName, Expression filter) { + return PrometheusLogicalMetricScan.builder().metricName(metricName) + .filter(filter) + .build(); + } + + /** + * Build PrometheusLogicalMetricAgg. + */ + public static LogicalPlan indexScanAgg(String metricName, Expression filter, + List aggregators, + List groupByList) { + return PrometheusLogicalMetricAgg.builder().metricName(metricName) + .filter(filter) + .aggregatorList(aggregators) + .groupByList(groupByList) + .build(); + } + + /** + * Build PrometheusLogicalMetricAgg. + */ + public static LogicalPlan indexScanAgg(String metricName, + List aggregators, + List groupByList) { + return PrometheusLogicalMetricAgg.builder().metricName(metricName) + .aggregatorList(aggregators) + .groupByList(groupByList) + .build(); + } + + /** + * Build PrometheusLogicalMetricAgg. + */ + public static LogicalPlan testLogicalPlanNode() { + return new TestLogicalPlan(); + } + + static class TestLogicalPlan extends LogicalPlan { + + public TestLogicalPlan() { + super(ImmutableList.of()); + } + + @Override + public R accept(LogicalPlanNodeVisitor visitor, C context) { + return visitor.visitNode(this, null); + } + } + + + +}