Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add SQL query with named parameters #94

Merged
merged 8 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## 0.6.0 [unreleased]

### Features

1. [#94](https://github.com/InfluxCommunity/influxdb3-java/pull/94): Add support for named query parameters

## 0.5.1 [2024-02-01]

Resync artifacts with Maven Central.
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,20 @@ try (Stream<Object[]> stream = client.query(sql)) {

System.out.printf("--------------------------------------------------------%n%n");

//
// Query by parametrized SQL
//
System.out.printf("--------------------Parametrized SQL--------------------%n");
System.out.printf("| %-8s | %-8s | %-30s |%n", "location", "value", "time");
System.out.printf("--------------------------------------------------------%n");

String sqlParams = "select time,location,value from temperature where location=$location order by time desc limit 10";
try (Stream<Object[]> stream = client.query(sqlParams, Map.of("location", "north"))) {
stream.forEach(row -> System.out.printf("| %-8s | %-8s | %-30s |%n", row[1], row[2], row[0]));
}

System.out.printf("--------------------------------------------------------%n%n");

//
// Query by InfluxQL
//
Expand Down
16 changes: 16 additions & 0 deletions examples/src/main/java/com/influxdb/v3/IOxExample.java
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Works, however on first usage got exceptions when the query was executed faster than the test data was written, apparently. I had to add a Thread.sleep after write on a subsequent run to get results. However, I cannot now recreate this issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is probably caused by "slow" propagation of the new measurement on the server side.

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package com.influxdb.v3;

import java.time.Instant;
import java.util.Map;
import java.util.stream.Stream;

import com.influxdb.v3.client.InfluxDBClient;
Expand Down Expand Up @@ -73,6 +74,21 @@ public static void main(final String[] args) throws Exception {

System.out.printf("--------------------------------------------------------%n%n");

//
// Query by parametrized SQL
//
System.out.printf("--------------------Parametrized SQL--------------------%n");
System.out.printf("| %-8s | %-8s | %-30s |%n", "location", "value", "time");
System.out.printf("--------------------------------------------------------%n");

String sqlWithParameters =
"select time,location,value from temperature where location=$location order by time desc limit 10";
try (Stream<Object[]> stream = client.query(sqlWithParameters, Map.of("location", "north"))) {
stream.forEach(row -> System.out.printf("| %-8s | %-8s | %-30s |%n", row[1], row[2], row[0]));
}

System.out.printf("--------------------------------------------------------%n%n");

//
// Query by InfluxQL
//
Expand Down
129 changes: 129 additions & 0 deletions src/main/java/com/influxdb/v3/client/InfluxDBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,26 @@ public interface InfluxDBClient extends AutoCloseable {
@Nonnull
Stream<Object[]> query(@Nonnull final String query);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Object[]&gt; rows = client.query("select * from cpu where host=$host",
* Map.of("host", "server-a")) {
* rows.forEach(row -&gt; {
* // process row
* }
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param parameters query named parameters
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Object[]> query(@Nonnull final String query, @Nonnull final Map<String, Object> parameters);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
Expand All @@ -139,6 +159,29 @@ public interface InfluxDBClient extends AutoCloseable {
@Nonnull
Stream<Object[]> query(@Nonnull final String query, @Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;Object[]&gt; rows = client.query("select * from cpu where host=$host",
* Map.of("host", "server-a"), options)) {
* rows.forEach(row -&gt; {
* // process row
* }
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param parameters query named parameters
* @param options the options for querying data from InfluxDB
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<Object[]> query(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx into Point structure using FlightSQL.
* <p>
Expand All @@ -157,6 +200,26 @@ public interface InfluxDBClient extends AutoCloseable {
@Nonnull
Stream<PointValues> queryPoints(@Nonnull final String query);

/**
* Query data from InfluxDB IOx into Point structure using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;PointValues&gt; rows = client.queryPoints("select * from cpu where host=$host",
* Map.of("host", "server-a"))) {
* rows.forEach(row -&gt; {
* // process row
* }
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param parameters query named parameters
* @return Batches of PointValues returned by the query
*/
@Nonnull
Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull final Map<String, Object> parameters);

/**
* Query data from InfluxDB IOx into Point structure using FlightSQL.
* <p>
Expand All @@ -176,6 +239,30 @@ public interface InfluxDBClient extends AutoCloseable {
@Nonnull
Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx into Point structure using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;PointValues&gt; rows = client.queryPoints("select * from cpu where host=$host",
* Map.of("host", "server-a"),
* options)) {
* rows.forEach(row -&gt; {
* // process row
* }
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param parameters query named parameters
* @param options the options for querying data from InfluxDB
* @return Batches of PointValues returned by the query
*/
@Nonnull
Stream<PointValues> queryPoints(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
Expand All @@ -194,6 +281,26 @@ public interface InfluxDBClient extends AutoCloseable {
@Nonnull
Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <p>
* The result stream should be closed after use, you can use try-resource pattern to close it automatically:
* <pre>
* try (Stream&lt;VectorSchemaRoot&gt; batches = client.queryBatches("select * from cpu where host=$host",
* Map.of("host", "server-a"))) {
* batches.forEach(batch -&gt; {
* // process batch
* }
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param parameters query named parameters
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query, @Nonnull final Map<String, Object> parameters);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <pre>
Expand All @@ -211,6 +318,28 @@ public interface InfluxDBClient extends AutoCloseable {
@Nonnull
Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query, @Nonnull final QueryOptions options);

/**
* Query data from InfluxDB IOx using FlightSQL.
* <pre>
* try (Stream&lt;VectorSchemaRoot&gt; batches = client.queryBatches("select * from cpu where host=$host",
* Map.of("host", "server-a"),
* options)) {
* batches.forEach(batch -&gt; {
* // process batch
* }
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param parameters query named parameters
* @param options the options for querying data from InfluxDB
* @return Batches of rows returned by the query
*/
@Nonnull
Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options);

/**
* Creates a new instance of the {@link InfluxDBClient} for interacting with an InfluxDB server, simplifying
* common operations such as writing, querying.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,19 @@ final class FlightSqlClient implements AutoCloseable {
@Nonnull
Stream<VectorSchemaRoot> execute(@Nonnull final String query,
@Nonnull final String database,
@Nonnull final QueryType queryType) {
@Nonnull final QueryType queryType,
@Nonnull final Map<String, Object> queryParameters) {

Map<String, String> ticketData = new HashMap<>() {{
Map<String, Object> ticketData = new HashMap<>() {{
put("database", database);
put("sql_query", query);
put("query_type", queryType.name().toLowerCase());
}};

if (queryParameters.size() > 0) {
ticketData.put("params", queryParameters);
}

String json;
try {
json = objectMapper.writeValueAsString(ticketData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.logging.Logger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -62,6 +63,16 @@ public final class InfluxDBClientImpl implements InfluxDBClient {
private static final String DATABASE_REQUIRED_MESSAGE = "Please specify the 'Database' as a method parameter "
+ "or use default configuration at 'ClientConfig.database'.";

private static final Map<String, Object> NO_PARAMETERS = Map.of();
private static final List<Class<?>> ALLOWED_NAMED_PARAMETER_TYPES = List.of(
String.class,
Integer.class,
Long.class,
Float.class,
Double.class,
Boolean.class
);

private boolean closed = false;
private final ClientConfig config;

Expand Down Expand Up @@ -136,13 +147,27 @@ public void writePoints(@Nonnull final List<Point> points, @Nonnull final WriteO
@Nonnull
@Override
public Stream<Object[]> query(@Nonnull final String query) {
return query(query, QueryOptions.DEFAULTS);
return query(query, NO_PARAMETERS, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<Object[]> query(@Nonnull final String query, @Nonnull final QueryOptions options) {
return queryData(query, options)
return query(query, NO_PARAMETERS, options);
}

@Nonnull
@Override
public Stream<Object[]> query(@Nonnull final String query, @Nonnull final Map<String, Object> parameters) {
return query(query, parameters, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<Object[]> query(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options) {
return queryData(query, parameters, options)
.flatMap(vector -> {
List<FieldVector> fieldVectors = vector.getFieldVectors();
return IntStream
Expand All @@ -167,7 +192,21 @@ public Stream<PointValues> queryPoints(@Nonnull final String query) {
@Nonnull
@Override
public Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull final QueryOptions options) {
return queryData(query, options)
return queryPoints(query, NO_PARAMETERS, options);
}

@Nonnull
@Override
public Stream<PointValues> queryPoints(@Nonnull final String query, @Nonnull final Map<String, Object> parameters) {
return queryPoints(query, parameters, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<PointValues> queryPoints(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options) {
return queryData(query, parameters, options)
.flatMap(vector -> {
List<FieldVector> fieldVectors = vector.getFieldVectors();
return IntStream
Expand All @@ -183,11 +222,25 @@ public Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query) {
return queryBatches(query, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query, @Nonnull final QueryOptions options) {
return queryBatches(query, NO_PARAMETERS, options);
}

@Nonnull
@Override
public Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters) {
return queryBatches(query, parameters, QueryOptions.DEFAULTS);
}

@Nonnull
@Override
public Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options) {
return queryData(query, options);
return queryData(query, parameters, options);
}

@Override
Expand Down Expand Up @@ -258,9 +311,11 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti

@Nonnull
private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
@Nonnull final Map<String, Object> parameters,
@Nonnull final QueryOptions options) {

Arguments.checkNonEmpty(query, "query");
Arguments.checkNotNull(parameters, "parameters");
Arguments.checkNotNull(options, "options");

if (closed) {
Expand All @@ -272,7 +327,14 @@ private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
throw new IllegalStateException(DATABASE_REQUIRED_MESSAGE);
}

return flightSqlClient.execute(query, database, options.queryTypeSafe());
parameters.forEach((k, v) -> {
if (!Objects.isNull(v) && !ALLOWED_NAMED_PARAMETER_TYPES.contains(v.getClass())) {
throw new IllegalArgumentException(String.format("The parameter %s value has unsupported type: %s",
k, v.getClass()));
}
});

return flightSqlClient.execute(query, database, options.queryTypeSafe(), parameters);
}

@Nonnull
Expand Down
Loading
Loading