Skip to content

Commit

Permalink
feat: add SQL query with named parameters (#94)
Browse files Browse the repository at this point in the history
* feat: add SQL query with named parameters

* style: fix example formatting

* docs: update CHANGELOG.md

* docs: update README.md

* chore: improve test coverage

* chore: improve test coverage

* chore: improve test coverage

* chore: improve test coverage

---------

Co-authored-by: Jakub Bednar <[email protected]>
  • Loading branch information
alespour and bednar authored Feb 27, 2024
1 parent dfda05b commit 7b1f93d
Show file tree
Hide file tree
Showing 8 changed files with 287 additions and 9 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 0.6.0 [unreleased]

### Features

1. [#94](https://github.com/InfluxCommunity/influxdb3-java/pull/94): Add support for named query parameters

## 0.5.1 [2024-02-01]

Resync artifacts with Maven Central.
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ try (Stream<Object[]> stream = client.query(sql)) {

System.out.printf("--------------------------------------------------------%n%n");

//
// Query by parametrized SQL
//
System.out.printf("--------------------Parametrized SQL--------------------%n");
System.out.printf("| %-8s | %-8s | %-30s |%n", "location", "value", "time");
System.out.printf("--------------------------------------------------------%n");

String sqlParams = "select time,location,value from temperature where location=$location order by time desc limit 10";
try (Stream<Object[]> stream = client.query(sqlParams, Map.of("location", "north"))) {
stream.forEach(row -> System.out.printf("| %-8s | %-8s | %-30s |%n", row[1], row[2], row[0]));
}

System.out.printf("--------------------------------------------------------%n%n");

//
// Query by InfluxQL
//
Expand Down
16 changes: 16 additions & 0 deletions examples/src/main/java/com/influxdb/v3/IOxExample.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package com.influxdb.v3;

import java.time.Instant;
import java.util.Map;
import java.util.stream.Stream;

import com.influxdb.v3.client.InfluxDBClient;
Expand Down Expand Up @@ -73,6 +74,21 @@ public static void main(final String[] args) throws Exception {

System.out.printf("--------------------------------------------------------%n%n");

//
// Query by parametrized SQL
//
System.out.printf("--------------------Parametrized SQL--------------------%n");
System.out.printf("| %-8s | %-8s | %-30s |%n", "location", "value", "time");
System.out.printf("--------------------------------------------------------%n");

String sqlWithParameters =
"select time,location,value from temperature where location=$location order by time desc limit 10";
try (Stream<Object[]> stream = client.query(sqlWithParameters, Map.of("location", "north"))) {
stream.forEach(row -> System.out.printf("| %-8s | %-8s | %-30s |%n", row[1], row[2], row[0]));
}

System.out.printf("--------------------------------------------------------%n%n");

//
// Query by InfluxQL
//
Expand Down
129 changes: 129 additions & 0 deletions src/main/java/com/influxdb/v3/client/InfluxDBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,26 @@ public interface InfluxDBClient extends AutoCloseable {
@Nonnull
Stream<Object[]> query(@Nonnull final String query);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Object[]&gt; rows = client.query("select * from cpu where host=$host",
* Map.of("host", "server-a")) {
* rows.forEach(row -&gt; {
* // process row
* }
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param parameters query named parameters
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Object[]> query(@Nonnull final String query, @Nonnull final Map<String, Object> parameters);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
Expand All @@ -139,6 +159,29 @@ public interface InfluxDBClient extends AutoCloseable {
@Nonnull
Stream<Object[]> query(@Nonnull final String query, @Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Object[]&gt; rows = client.query("select * from cpu where host=$host",
* Map.of("host", "server-a"), options)) {
* rows.forEach(row -&gt; {
* // process row
* }
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param parameters query named parameters
* @param options the options for querying data from InfluxDB
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Object[]> query(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx into Point structure using FlightSQL.
* <p>
Expand All @@ -157,6 +200,26 @@ public interface InfluxDBClient extends AutoCloseable {
@Nonnull
Stream<PointValues> queryPoints(@Nonnull final String query);

/**
* Query data from InfluxDB IOx into Point structure using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;PointValues&gt; rows = client.queryPoints("select * from cpu where host=$host",
* Map.of("host", "server-a"))) {
* rows.forEach(row -&gt; {
* // process row
* }
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param parameters query named parameters
* @return Batches of PointValues returned by the query
*/
@Nonnull
Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull final Map<String, Object> parameters);

/**
* Query data from InfluxDB IOx into Point structure using FlightSQL.
* <p>
Expand All @@ -176,6 +239,30 @@ public interface InfluxDBClient extends AutoCloseable {
@Nonnull
Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx into Point structure using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;PointValues&gt; rows = client.queryPoints("select * from cpu where host=$host",
* Map.of("host", "server-a"),
* options)) {
* rows.forEach(row -&gt; {
* // process row
* }
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param parameters query named parameters
* @param options the options for querying data from InfluxDB
* @return Batches of PointValues returned by the query
*/
@Nonnull
Stream<PointValues> queryPoints(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
Expand All @@ -194,6 +281,26 @@ public interface InfluxDBClient extends AutoCloseable {
@Nonnull
Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;VectorSchemaRoot&gt; batches = client.queryBatches("select * from cpu where host=$host",
* Map.of("host", "server-a"))) {
* batches.forEach(batch -&gt; {
* // process batch
* }
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param parameters query named parameters
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query, @Nonnull final Map<String, Object> parameters);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <pre>
Expand All @@ -211,6 +318,28 @@ public interface InfluxDBClient extends AutoCloseable {
@Nonnull
Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query, @Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <pre>
* try (Stream&lt;VectorSchemaRoot&gt; batches = client.queryBatches("select * from cpu where host=$host",
* Map.of("host", "server-a"),
* options)) {
* batches.forEach(batch -&gt; {
* // process batch
* }
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param parameters query named parameters
* @param options the options for querying data from InfluxDB
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options);

/**
* Creates a new instance of the {@link InfluxDBClient} for interacting with an InfluxDB server, simplifying
* common operations such as writing, querying.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,19 @@ final class FlightSqlClient implements AutoCloseable {
@Nonnull
Stream<VectorSchemaRoot> execute(@Nonnull final String query,
@Nonnull final String database,
@Nonnull final QueryType queryType) {
@Nonnull final QueryType queryType,
@Nonnull final Map<String, Object> queryParameters) {

Map<String, String> ticketData = new HashMap<>() {{
Map<String, Object> ticketData = new HashMap<>() {{
put("database", database);
put("sql_query", query);
put("query_type", queryType.name().toLowerCase());
}};

if (queryParameters.size() > 0) {
ticketData.put("params", queryParameters);
}

String json;
try {
json = objectMapper.writeValueAsString(ticketData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -62,6 +63,16 @@ public final class InfluxDBClientImpl implements InfluxDBClient {
private static final String DATABASE_REQUIRED_MESSAGE = "Please specify the 'Database' as a method parameter "
+ "or use default configuration at 'ClientConfig.database'.";

private static final Map<String, Object> NO_PARAMETERS = Map.of();
private static final List<Class<?>> ALLOWED_NAMED_PARAMETER_TYPES = List.of(
String.class,
Integer.class,
Long.class,
Float.class,
Double.class,
Boolean.class
);

private boolean closed = false;
private final ClientConfig config;

Expand Down Expand Up @@ -136,13 +147,27 @@ public void writePoints(@Nonnull final List<Point> points, @Nonnull final WriteO
@Nonnull
@Override
public Stream<Object[]> query(@Nonnull final String query) {
return query(query, QueryOptions.DEFAULTS);
return query(query, NO_PARAMETERS, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<Object[]> query(@Nonnull final String query, @Nonnull final QueryOptions options) {
return queryData(query, options)
return query(query, NO_PARAMETERS, options);
}

@Nonnull
@Override
public Stream<Object[]> query(@Nonnull final String query, @Nonnull final Map<String, Object> parameters) {
return query(query, parameters, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<Object[]> query(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options) {
return queryData(query, parameters, options)
.flatMap(vector -> {
List<FieldVector> fieldVectors = vector.getFieldVectors();
return IntStream
Expand All @@ -167,7 +192,21 @@ public Stream<PointValues> queryPoints(@Nonnull final String query) {
@Nonnull
@Override
public Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull final QueryOptions options) {
return queryData(query, options)
return queryPoints(query, NO_PARAMETERS, options);
}

@Nonnull
@Override
public Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull final Map<String, Object> parameters) {
return queryPoints(query, parameters, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<PointValues> queryPoints(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options) {
return queryData(query, parameters, options)
.flatMap(vector -> {
List<FieldVector> fieldVectors = vector.getFieldVectors();
return IntStream
Expand All @@ -183,11 +222,25 @@ public Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query) {
return queryBatches(query, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query, @Nonnull final QueryOptions options) {
return queryBatches(query, NO_PARAMETERS, options);
}

@Nonnull
@Override
public Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters) {
return queryBatches(query, parameters, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options) {
return queryData(query, options);
return queryData(query, parameters, options);
}

@Override
Expand Down Expand Up @@ -258,9 +311,11 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti

@Nonnull
private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options) {

Arguments.checkNonEmpty(query, "query");
Arguments.checkNotNull(parameters, "parameters");
Arguments.checkNotNull(options, "options");

if (closed) {
Expand All @@ -272,7 +327,14 @@ private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
throw new IllegalStateException(DATABASE_REQUIRED_MESSAGE);
}

return flightSqlClient.execute(query, database, options.queryTypeSafe());
parameters.forEach((k, v) -> {
if (!Objects.isNull(v) && !ALLOWED_NAMED_PARAMETER_TYPES.contains(v.getClass())) {
throw new IllegalArgumentException(String.format("The parameter %s value has unsupported type: %s",
k, v.getClass()));
}
});

return flightSqlClient.execute(query, database, options.queryTypeSafe(), parameters);
}

@Nonnull
Expand Down
Loading

0 comments on commit 7b1f93d

Please sign in to comment.