Skip to content

Commit

Permalink
feat: custom headers can be specified per request (query/write) (#108)
Browse files Browse the repository at this point in the history
  • Loading branch information
bednar authored Mar 7, 2024
1 parent da932f7 commit aa5509b
Show file tree
Hide file tree
Showing 11 changed files with 409 additions and 87 deletions.
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,37 @@
}
```

1. [#108](https://github.com/InfluxCommunity/influxdb3-java/pull/108): Custom headers can be specified per request (query/write):

```java
ClientConfig config = new ClientConfig.Builder()
.host("https://us-east-1-1.aws.cloud2.influxdata.com")
.token("my-token".toCharArray())
.database("my-database")
.build();

try (InfluxDBClient client = InfluxDBClient.getInstance(config)) {
//
// Write with custom headers
//
WriteOptions writeOptions = new WriteOptions(
Map.of("X-Tracing-Id", "852")
);
client.writeRecord("mem,tag=one value=1.0", writeOptions);

//
// Query with custom headers
//
QueryOptions queryOptions = new QueryOptions(
Map.of("X-Tracing-Id", "852")
);
Stream<Object[]> rows = client.query("select * from cpu", queryOptions);

} catch (Exception e) {
throw new RuntimeException(e);
}
```

## 0.6.0 [2024-03-01]

### Features
Expand Down
56 changes: 28 additions & 28 deletions src/main/java/com/influxdb/v3/client/InfluxDBClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ public interface InfluxDBClient extends AutoCloseable {
/**
* Write a record specified in the InfluxDB Line Protocol to the InfluxDB server.
*
* @param record the record specified in the InfluxDB Line Protocol, can be null
* @param options the options for writing data to InfluxDB
* @param record the record specified in the InfluxDB Line Protocol, can be null
* @param options the options for writing data to InfluxDB
*/
void writeRecord(@Nullable final String record, @Nonnull final WriteOptions options);

Expand All @@ -67,7 +67,7 @@ public interface InfluxDBClient extends AutoCloseable {
/**
* Write records specified in the InfluxDB Line Protocol to the InfluxDB server.
*
* @param records the records specified in the InfluxDB Line Protocol, cannot be null
* @param records the records specified in the InfluxDB Line Protocol, cannot be null
* @param options the options for writing data to InfluxDB
*/
void writeRecords(@Nonnull final List<String> records, @Nonnull final WriteOptions options);
Expand All @@ -82,7 +82,7 @@ public interface InfluxDBClient extends AutoCloseable {
/**
* Write a {@link Point} to the InfluxDB server.
*
* @param point the {@link Point} to write, can be null
* @param point the {@link Point} to write, can be null
* @param options the options for writing data to InfluxDB
*/
void writePoint(@Nullable final Point point, @Nonnull final WriteOptions options);
Expand All @@ -97,7 +97,7 @@ public interface InfluxDBClient extends AutoCloseable {
/**
* Write a list of {@link Point} to the InfluxDB server.
*
* @param points the list of {@link Point} to write, cannot be null
* @param points the list of {@link Point} to write, cannot be null
* @param options the options for writing data to InfluxDB
*/
void writePoints(@Nonnull final List<Point> points, @Nonnull final WriteOptions options);
Expand Down Expand Up @@ -133,7 +133,7 @@ public interface InfluxDBClient extends AutoCloseable {
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param query the SQL query string to execute, cannot be null
* @param parameters query named parameters
* @return Batches of rows returned by the query
*/
Expand All @@ -152,7 +152,7 @@ public interface InfluxDBClient extends AutoCloseable {
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param query the query string to execute, cannot be null
* @param options the options for querying data from InfluxDB
* @return Batches of rows returned by the query
*/
Expand All @@ -172,9 +172,9 @@ public interface InfluxDBClient extends AutoCloseable {
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param query the query string to execute, cannot be null
* @param parameters query named parameters
* @param options the options for querying data from InfluxDB
* @param options the options for querying data from InfluxDB
* @return Batches of rows returned by the query
*/
@Nonnull
Expand All @@ -194,7 +194,7 @@ Stream<Object[]> query(@Nonnull final String query,
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param query the SQL query string to execute, cannot be null
* @return Batches of PointValues returned by the query
*/
@Nonnull
Expand Down Expand Up @@ -232,7 +232,7 @@ Stream<Object[]> query(@Nonnull final String query,
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param query the query string to execute, cannot be null
* @param options the options for querying data from InfluxDB
* @return Batches of PointValues returned by the query
*/
Expand All @@ -253,9 +253,9 @@ Stream<Object[]> query(@Nonnull final String query,
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param query the query string to execute, cannot be null
* @param parameters query named parameters
* @param options the options for querying data from InfluxDB
* @param options the options for querying data from InfluxDB
* @return Batches of PointValues returned by the query
*/
@Nonnull
Expand Down Expand Up @@ -294,7 +294,7 @@ Stream<PointValues> queryPoints(@Nonnull final String query,
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param query the SQL query string to execute, cannot be null
* @param parameters query named parameters
* @return Batches of rows returned by the query
*/
Expand All @@ -311,7 +311,7 @@ Stream<PointValues> queryPoints(@Nonnull final String query,
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param query the query string to execute, cannot be null
* @param options the options for querying data from InfluxDB
* @return Batches of rows returned by the query
*/
Expand All @@ -330,9 +330,9 @@ Stream<PointValues> queryPoints(@Nonnull final String query,
* });
* </pre>
*
* @param query the SQL query string to execute, cannot be null
* @param query the query string to execute, cannot be null
* @param parameters query named parameters
* @param options the options for querying data from InfluxDB
* @param options the options for querying data from InfluxDB
* @return Batches of rows returned by the query
*/
@Nonnull
Expand All @@ -344,9 +344,9 @@ Stream<VectorSchemaRoot> queryBatches(@Nonnull final String query,
* Creates a new instance of the {@link InfluxDBClient} for interacting with an InfluxDB server, simplifying
* common operations such as writing, querying.
*
* @param host the URL of the InfluxDB server
* @param token the authentication token for accessing the InfluxDB server, can be null
* @param database the database to be used for InfluxDB operations, can be null
* @param host the URL of the InfluxDB server
* @param token the authentication token for accessing the InfluxDB server, can be null
* @param database the database to be used for InfluxDB operations, can be null
* @return new instance of the {@link InfluxDBClient}
*/
@Nonnull
Expand All @@ -366,9 +366,9 @@ static InfluxDBClient getInstance(@Nonnull final String host,
* Creates a new instance of the {@link InfluxDBClient} for interacting with an InfluxDB server, simplifying
* common operations such as writing, querying.
*
* @param host the URL of the InfluxDB server
* @param token the authentication token for accessing the InfluxDB server, can be null
* @param database the database to be used for InfluxDB operations, can be null
* @param host the URL of the InfluxDB server
* @param token the authentication token for accessing the InfluxDB server, can be null
* @param database the database to be used for InfluxDB operations, can be null
* @param defaultTags tags to be added by default to writes of points
* @return new instance of the {@link InfluxDBClient}
*/
Expand All @@ -379,11 +379,11 @@ static InfluxDBClient getInstance(@Nonnull final String host,
@Nullable Map<String, String> defaultTags) {

ClientConfig config = new ClientConfig.Builder()
.host(host)
.token(token)
.database(database)
.defaultTags(defaultTags)
.build();
.host(host)
.token(token)
.database(database)
.defaultTags(defaultTags)
.build();

return getInstance(config);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public String getDatabase() {

/**
* Gets the default precision to use for the timestamp of points.
* If no precision is specified then 'ns' is used.
*
* @return the default precision to use for the timestamp of points, may be null
*/
Expand Down
31 changes: 22 additions & 9 deletions src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@

final class FlightSqlClient implements AutoCloseable {

private final HeaderCallOption headers;
private final FlightClient client;

private final Map<String, String> defaultHeaders = new HashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();

FlightSqlClient(@Nonnull final ClientConfig config) {
Expand All @@ -73,25 +73,22 @@ final class FlightSqlClient implements AutoCloseable {
FlightSqlClient(@Nonnull final ClientConfig config, @Nullable final FlightClient client) {
Arguments.checkNotNull(config, "config");

MetadataAdapter metadata = new MetadataAdapter(new Metadata());
if (config.getToken() != null && config.getToken().length > 0) {
metadata.insert("Authorization", "Bearer " + new String(config.getToken()));
defaultHeaders.put("Authorization", "Bearer " + new String(config.getToken()));
}
if (config.getHeaders() != null) {
for (Map.Entry<String, String> entry : config.getHeaders().entrySet()) {
metadata.insert(entry.getKey(), entry.getValue());
}
defaultHeaders.putAll(config.getHeaders());
}

this.headers = new HeaderCallOption(metadata);
this.client = (client != null) ? client : createFlightClient(config);
}

@Nonnull
Stream<VectorSchemaRoot> execute(@Nonnull final String query,
@Nonnull final String database,
@Nonnull final QueryType queryType,
@Nonnull final Map<String, Object> queryParameters) {
@Nonnull final Map<String, Object> queryParameters,
@Nonnull final Map<String, String> headers) {

Map<String, Object> ticketData = new HashMap<>() {{
put("database", database);
Expand All @@ -110,8 +107,9 @@ Stream<VectorSchemaRoot> execute(@Nonnull final String query,
throw new RuntimeException(e);
}

HeaderCallOption headerCallOption = metadataHeader(headers);
Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8));
FlightStream stream = client.getStream(ticket, headers);
FlightStream stream = client.getStream(ticket, headerCallOption);
FlightSqlIterator iterator = new FlightSqlIterator(stream);

Spliterator<VectorSchemaRoot> spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL);
Expand Down Expand Up @@ -148,6 +146,21 @@ private Location createLocation(@Nonnull final ClientConfig config) {
}
}

@Nonnull
private HeaderCallOption metadataHeader(@Nonnull final Map<String, String> requestHeaders) {
MetadataAdapter metadata = new MetadataAdapter(new Metadata());
for (Map.Entry<String, String> entry : requestHeaders.entrySet()) {
metadata.insert(entry.getKey(), entry.getValue());
}

for (Map.Entry<String, String> entry : defaultHeaders.entrySet()) {
if (!metadata.containsKey(entry.getKey())) {
metadata.insert(entry.getKey(), entry.getValue());
}
}
return new HeaderCallOption(metadata);
}

private static final class FlightSqlIterator implements Iterator<VectorSchemaRoot>, AutoCloseable {

private final List<AutoCloseable> autoCloseable = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,26 @@ public final class InfluxDBClientImpl implements InfluxDBClient {
* @param config the client config.
*/
public InfluxDBClientImpl(@Nonnull final ClientConfig config) {
this(config, null, null);
}

/**
* Constructor for testing purposes.
*
* @param config the client config
* @param restClient the rest client, if null a new client will be created
* @param flightSqlClient the flight sql client, if null a new client will be created
*/
InfluxDBClientImpl(@Nonnull final ClientConfig config,
@Nullable final RestClient restClient,
@Nullable final FlightSqlClient flightSqlClient) {
Arguments.checkNotNull(config, "config");

config.validate();

this.config = config;
this.restClient = new RestClient(config);
this.flightSqlClient = new FlightSqlClient(config);
this.restClient = restClient != null ? restClient : new RestClient(config);
this.flightSqlClient = flightSqlClient != null ? flightSqlClient : new FlightSqlClient(config);
}

@Override
Expand Down Expand Up @@ -305,8 +318,9 @@ private <T> void writeData(@Nonnull final List<T> data, @Nonnull final WriteOpti
throw new InfluxDBApiException(e);
}
}
headers.putAll(options.headersSafe());

restClient.request("api/v2/write", HttpMethod.POST, body, headers, queryParams);
restClient.request("api/v2/write", HttpMethod.POST, body, queryParams, headers);
}

@Nonnull
Expand Down Expand Up @@ -334,7 +348,7 @@ private Stream<VectorSchemaRoot> queryData(@Nonnull final String query,
}
});

return flightSqlClient.execute(query, database, options.queryTypeSafe(), parameters);
return flightSqlClient.execute(query, database, options.queryTypeSafe(), parameters, options.headersSafe());
}

@Nonnull
Expand Down
16 changes: 9 additions & 7 deletions src/main/java/com/influxdb/v3/client/internal/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public void checkServerTrusted(
void request(@Nonnull final String path,
@Nonnull final HttpMethod method,
@Nullable final byte[] data,
@Nullable final Map<String, String> headers,
@Nullable final Map<String, String> queryParams) {
@Nullable final Map<String, String> queryParams,
@Nullable final Map<String, String> headers) {

QueryStringEncoder uriEncoder = new QueryStringEncoder(String.format("%s%s", baseUrl, path));
if (queryParams != null) {
Expand All @@ -148,16 +148,18 @@ void request(@Nonnull final String path,
? HttpRequest.BodyPublishers.noBody() : HttpRequest.BodyPublishers.ofByteArray(data));

// headers
if (defaultHeaders != null) {
for (Map.Entry<String, String> entry : defaultHeaders.entrySet()) {
request.header(entry.getKey(), entry.getValue());
}
}
if (headers != null) {
for (Map.Entry<String, String> entry : headers.entrySet()) {
request.header(entry.getKey(), entry.getValue());
}
}
if (defaultHeaders != null) {
for (Map.Entry<String, String> entry : defaultHeaders.entrySet()) {
if (headers == null || !headers.containsKey(entry.getKey())) {
request.header(entry.getKey(), entry.getValue());
}
}
}
request.header("User-Agent", userAgent);
if (config.getToken() != null && config.getToken().length > 0) {
request.header("Authorization", String.format("Token %s", new String(config.getToken())));
Expand Down
Loading

0 comments on commit aa5509b

Please sign in to comment.