Skip to content

Commit

Permalink
Prometheus select metric and stats queries. (#956)
Browse files Browse the repository at this point in the history
* Prometheus select metric and stats queries.

Signed-off-by: reddyvam-amazon <[email protected]>
  • Loading branch information
vamsimanohar authored Nov 3, 2022
1 parent 1a52511 commit be4512e
Show file tree
Hide file tree
Showing 41 changed files with 2,181 additions and 130 deletions.
6 changes: 6 additions & 0 deletions core/src/main/java/org/opensearch/sql/analysis/Analyzer.java
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ public LogicalPlan visitTableFunction(TableFunction node, AnalysisContext contex
TableFunctionImplementation tableFunctionImplementation
= (TableFunctionImplementation) repository.compile(
catalogSchemaIdentifierNameResolver.getCatalogName(), functionName, arguments);
context.push();
TypeEnvironment curEnv = context.peek();
Table table = tableFunctionImplementation.applyArguments();
table.getFieldTypes().forEach((k, v) -> curEnv.define(new Symbol(Namespace.FIELD_NAME, k), v));
curEnv.define(new Symbol(Namespace.INDEX_NAME,
catalogSchemaIdentifierNameResolver.getIdentifierName()), STRUCT);
return new LogicalRelation(catalogSchemaIdentifierNameResolver.getIdentifierName(),
tableFunctionImplementation.applyArguments());
}
Expand Down
113 changes: 113 additions & 0 deletions docs/user/ppl/admin/prometheus_connector.rst
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,116 @@ AWSSigV4 Auth::
}
}]

PPL Query support for prometheus connector
==========================================

Metric as a Table
---------------------------
Each connector has to abstract the underlying datasource constructs into a table as part of the interface contract with the PPL query engine.
Prometheus connector abstracts each metric as a table and the columns of this table are ``@value``, ``@timestamp``, ``label1``, ``label2``---.
``@value`` represents metric measurement and ``@timestamp`` represents the timestamp at which the metric is collected. labels are tags associated with metric queried.
For eg: ``handler``, ``code``, ``instance``, ``code`` are the labels associated with ``prometheus_http_requests_total`` metric. With this abstraction, we can query prometheus
data using PPL syntax similar to opensearch indices.

Sample Example::

> source = my_prometheus.prometheus_http_requests_total;

+------------+------------------------+--------------------------------+---------------+-------------+-------------+
| @value | @timestamp | handler | code | instance | job |
|------------+------------------------+--------------------------------+---------------+-------------+-------------|
| 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 | 192.15.2.1 | prometheus |
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus |
| 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus |
+------------+------------------------+--------------------------------+---------------+-------------+-------------+



Default time range and resolution
---------------------------------
Since time range and resolution are required parameters for query apis and these parameters are determined in the following manner from the PPL commands.
* Time range is determined through filter clause on ``@timestamp``. If there is no such filter clause, time range will be set to 1h with endtime set to now().
* In case of stats, resolution is determined by ``span(@timestamp,15s)`` expression. For normal select queries, resolution is auto determined from the time range set.

Prometheus Connector Limitations
--------------------------------
* Only one aggregation is supported in stats command.
* Span Expression is compulsory in stats command.
* AVG, MAX, MIN, SUM, COUNT are the only aggregations supported in prometheus connector.

Example queries
---------------

1. Metric Selection Query::

> source = my_prometheus.prometheus_http_requests_total
+------------+------------------------+--------------------------------+---------------+-------------+-------------+
| @value | @timestamp | handler | code | instance | job |
|------------+------------------------+--------------------------------+---------------+-------------+-------------|
| 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 | 192.15.2.1 | prometheus |
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 | 192.15.2.1 | prometheus |
| 11 | "2022-11-03 07:18:64" |"/-/metrics" | 500 | 192.15.2.1 | prometheus |
+------------+------------------------+--------------------------------+---------------+-------------+-------------+

2. Metric Selecting Query with specific dimensions::

> source = my_prometheus.prometheus_http_requests_total | where handler='/-/ready' and code='200'
+------------+------------------------+--------------------------------+---------------+-------------+-------------+
| @value | @timestamp | handler | code | instance | job |
|------------+------------------------+--------------------------------+---------------+-------------+-------------|
| 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 | 192.15.1.1 | prometheus |
| 2 | "2022-11-03 07:18:44" | "/-/ready" | 200 | 192.15.2.1 | prometheus |
| 9 | "2022-11-03 07:18:54" | "/-/ready" | 200 | 192.15.2.1 | prometheus |
| 11 | "2022-11-03 07:18:64" | "/-/ready" | 200 | 192.15.2.1 | prometheus |
+------------+------------------------+--------------------------------+---------------+-------------+-------------+

3. Average aggregation on a metric::

> source = my_prometheus.prometheus_http_requests_total | stats avg(@value) by span(@timestamp,15s)
+------------+------------------------+
| avg(@value)| span(@timestamp,15s) |
|------------+------------------------+
| 5 | "2022-11-03 07:18:14" |
| 3 | "2022-11-03 07:18:24" |
| 7 | "2022-11-03 07:18:34" |
| 2 | "2022-11-03 07:18:44" |
| 9 | "2022-11-03 07:18:54" |
| 11 | "2022-11-03 07:18:64" |
+------------+------------------------+

4. Average aggregation grouped by dimensions::

> source = my_prometheus.prometheus_http_requests_total | stats avg(@value) by span(@timestamp,15s), handler, code
+------------+------------------------+--------------------------------+---------------+
| avg(@value)| span(@timestamp,15s) | handler | code |
|------------+------------------------+--------------------------------+---------------+
| 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 |
| 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 |
| 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 |
| 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 |
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 |
| 11 | "2022-11-03 07:18:64" | "/-/metrics" | 500 |
+------------+------------------------+--------------------------------+---------------+

5. Count aggregation query::

> source = my_prometheus.prometheus_http_requests_total | stats count() by span(@timestamp,15s), handler, code
+------------+------------------------+--------------------------------+---------------+
| count() | span(@timestamp,15s) | handler | code |
|------------+------------------------+--------------------------------+---------------+
| 5 | "2022-11-03 07:18:14" | "/-/ready" | 200 |
| 3 | "2022-11-03 07:18:24" | "/-/ready" | 200 |
| 7 | "2022-11-03 07:18:34" | "/-/ready" | 200 |
| 2 | "2022-11-03 07:18:44" | "/-/ready" | 400 |
| 9 | "2022-11-03 07:18:54" | "/-/promql" | 400 |
| 11 | "2022-11-03 07:18:64" | "/-/metrics" | 500 |
+------------+------------------------+--------------------------------+---------------+

3 changes: 1 addition & 2 deletions docs/user/ppl/cmd/describe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,10 @@ The example retrieves table info for ``prometheus_http_requests_total`` metric i
PPL query::

os> describe my_prometheus.prometheus_http_requests_total;
fetched rows / total rows = 7/7
fetched rows / total rows = 6/6
+-----------------+----------------+--------------------------------+---------------+-------------+
| TABLE_CATALOG | TABLE_SCHEMA | TABLE_NAME | COLUMN_NAME | DATA_TYPE |
|-----------------+----------------+--------------------------------+---------------+-------------|
| my_prometheus | default | prometheus_http_requests_total | @labels | keyword |
| my_prometheus | default | prometheus_http_requests_total | handler | keyword |
| my_prometheus | default | prometheus_http_requests_total | code | keyword |
| my_prometheus | default | prometheus_http_requests_total | instance | keyword |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ public void testDescribeCommandWithPrometheusCatalog() throws IOException {
columnName("DATA_TYPE")
);
verifyDataRows(result,
rows("my_prometheus", "default", "prometheus_http_requests_total", "@labels", "keyword"),
rows("my_prometheus", "default", "prometheus_http_requests_total", "handler", "keyword"),
rows("my_prometheus", "default", "prometheus_http_requests_total", "code", "keyword"),
rows("my_prometheus", "default", "prometheus_http_requests_total", "instance", "keyword"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
*
* * Copyright OpenSearch Contributors
* * SPDX-License-Identifier: Apache-2.0
*
*/

package org.opensearch.sql.ppl;

import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.TIMESTAMP;
import static org.opensearch.sql.prometheus.data.constants.PrometheusFieldConstants.VALUE;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifySchema;

import lombok.SneakyThrows;
import org.apache.commons.lang3.StringUtils;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class PrometheusCatalogCommandsIT extends PPLIntegTestCase {

@Test
@SneakyThrows
public void testSourceMetricCommand() {
JSONObject response =
executeQuery("source=my_prometheus.prometheus_http_requests_total");
verifySchema(response,
schema(VALUE, "double"),
schema(TIMESTAMP, "timestamp"),
schema("handler", "string"),
schema("code", "string"),
schema("instance", "string"),
schema("job", "string"));
Assertions.assertTrue(response.getInt("size") > 0);
Assertions.assertEquals(6, response.getJSONArray("datarows").getJSONArray(0).length());
JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0);
for (int i = 0; i < firstRow.length(); i++) {
Assertions.assertNotNull(firstRow.get(i));
Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString()));
}
}

@Test
@SneakyThrows
public void testMetricAvgAggregationCommand() {
JSONObject response =
executeQuery("source=my_prometheus.prometheus_http_requests_total | stats avg(@value) by span(@timestamp, 15s), handler, job");
verifySchema(response,
schema("avg(@value)", "double"),
schema("span(@timestamp,15s)", "timestamp"),
schema("handler", "string"),
schema("job", "string"));
Assertions.assertTrue(response.getInt("size") > 0);
Assertions.assertEquals(4, response.getJSONArray("datarows").getJSONArray(0).length());
JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0);
for (int i = 0; i < firstRow.length(); i++) {
Assertions.assertNotNull(firstRow.get(i));
Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString()));
}
}

@Test
@SneakyThrows
public void testMetricAvgAggregationCommandWithAlias() {
JSONObject response =
executeQuery("source=my_prometheus.prometheus_http_requests_total | stats avg(@value) as agg by span(@timestamp, 15s), handler, job");
verifySchema(response,
schema("agg", "double"),
schema("span(@timestamp,15s)", "timestamp"),
schema("handler", "string"),
schema("job", "string"));
Assertions.assertTrue(response.getInt("size") > 0);
Assertions.assertEquals(4, response.getJSONArray("datarows").getJSONArray(0).length());
JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0);
for (int i = 0; i < firstRow.length(); i++) {
Assertions.assertNotNull(firstRow.get(i));
Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString()));
}
}


@Test
@SneakyThrows
public void testMetricMaxAggregationCommand() {
JSONObject response =
executeQuery("source=my_prometheus.prometheus_http_requests_total | stats max(@value) by span(@timestamp, 15s)");
verifySchema(response,
schema("max(@value)", "double"),
schema("span(@timestamp,15s)", "timestamp"));
Assertions.assertTrue(response.getInt("size") > 0);
Assertions.assertEquals(2, response.getJSONArray("datarows").getJSONArray(0).length());
JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0);
for (int i = 0; i < firstRow.length(); i++) {
Assertions.assertNotNull(firstRow.get(i));
Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString()));
}
}


@Test
@SneakyThrows
public void testMetricMinAggregationCommand() {
JSONObject response =
executeQuery("source=my_prometheus.prometheus_http_requests_total | stats min(@value) by span(@timestamp, 15s), handler");
verifySchema(response,
schema("min(@value)", "double"),
schema("span(@timestamp,15s)", "timestamp"),
schema("handler", "string"));
Assertions.assertTrue(response.getInt("size") > 0);
Assertions.assertEquals(3, response.getJSONArray("datarows").getJSONArray(0).length());
JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0);
for (int i = 0; i < firstRow.length(); i++) {
Assertions.assertNotNull(firstRow.get(i));
Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString()));
}
}

@Test
@SneakyThrows
public void testMetricCountAggregationCommand() {
JSONObject response =
executeQuery("source=my_prometheus.prometheus_http_requests_total | stats count() by span(@timestamp, 15s), handler, job");
verifySchema(response,
schema("count()", "integer"),
schema("span(@timestamp,15s)", "timestamp"),
schema("handler", "string"),
schema("job", "string"));
Assertions.assertTrue(response.getInt("size") > 0);
Assertions.assertEquals(4, response.getJSONArray("datarows").getJSONArray(0).length());
JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0);
for (int i = 0; i < firstRow.length(); i++) {
Assertions.assertNotNull(firstRow.get(i));
Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString()));
}
}

@Test
@SneakyThrows
public void testMetricSumAggregationCommand() {
JSONObject response =
executeQuery("source=my_prometheus.prometheus_http_requests_total | stats sum(@value) by span(@timestamp, 15s), handler, job");
verifySchema(response,
schema("sum(@value)", "double"),
schema("span(@timestamp,15s)", "timestamp"),
schema("handler", "string"),
schema("job", "string"));
Assertions.assertTrue(response.getInt("size") > 0);
Assertions.assertEquals(4, response.getJSONArray("datarows").getJSONArray(0).length());
JSONArray firstRow = response.getJSONArray("datarows").getJSONArray(0);
for (int i = 0; i < firstRow.length(); i++) {
Assertions.assertNotNull(firstRow.get(i));
Assertions.assertTrue(StringUtils.isNotEmpty(firstRow.get(i).toString()));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void loadConnectors(Settings settings) {
} catch (IOException e) {
LOG.error("Catalog Configuration File uploaded is malformed. Verify and re-upload.", e);
} catch (Throwable e) {
LOG.error("Catalog constructed failed.", e);
LOG.error("Catalog construction failed.", e);
}
}
return null;
Expand Down
2 changes: 0 additions & 2 deletions ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ mlArg
fromClause
: SOURCE EQUAL tableSourceClause
| INDEX EQUAL tableSourceClause
| SOURCE EQUAL tableFunction
| INDEX EQUAL tableFunction
;

tableSourceClause
Expand Down
19 changes: 5 additions & 14 deletions ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.Generated;
import lombok.RequiredArgsConstructor;
import org.antlr.v4.runtime.ParserRuleContext;
import org.antlr.v4.runtime.Token;
Expand Down Expand Up @@ -348,11 +349,7 @@ public UnresolvedPlan visitTopCommand(TopCommandContext ctx) {
*/
@Override
public UnresolvedPlan visitFromClause(FromClauseContext ctx) {
if (ctx.tableFunction() != null) {
return visitTableFunction(ctx.tableFunction());
} else {
return visitTableSourceClause(ctx.tableSourceClause());
}
return visitTableSourceClause(ctx.tableSourceClause());
}

@Override
Expand All @@ -363,16 +360,10 @@ public UnresolvedPlan visitTableSourceClause(TableSourceClauseContext ctx) {
}

@Override
@Generated //To exclude from jacoco..will remove https://github.com/opensearch-project/sql/issues/1019
public UnresolvedPlan visitTableFunction(OpenSearchPPLParser.TableFunctionContext ctx) {
ImmutableList.Builder<UnresolvedExpression> builder = ImmutableList.builder();
ctx.functionArgs().functionArg().forEach(arg
-> {
String argName = (arg.ident() != null) ? arg.ident().getText() : null;
builder.add(
new UnresolvedArgument(argName,
this.internalVisitExpression(arg.valueExpression())));
});
return new TableFunction(this.internalVisitExpression(ctx.qualifiedName()), builder.build());
//<TODO>
return null;
}

/**
Expand Down
Loading

0 comments on commit be4512e

Please sign in to comment.