From c55a7dbaeb28f4f3251f3286981559da5b0e32fd Mon Sep 17 00:00:00 2001 From: Jakub Bednar Date: Wed, 6 Mar 2024 14:58:57 +0100 Subject: [PATCH] feat: custom headers can be specified per request (query/write) --- .../influxdb/v3/client/InfluxDBClient.java | 56 +++++------ .../v3/client/config/ClientConfig.java | 1 + .../v3/client/internal/FlightSqlClient.java | 31 ++++-- .../client/internal/InfluxDBClientImpl.java | 22 ++++- .../v3/client/internal/RestClient.java | 15 +-- .../v3/client/query/QueryOptions.java | 39 +++++++- .../v3/client/write/WriteOptions.java | 96 +++++++++++++------ .../client/internal/FlightSqlClientTest.java | 91 +++++++++++++++++- .../v3/client/internal/RestClientTest.java | 63 ++++++++++++ 9 files changed, 334 insertions(+), 80 deletions(-) diff --git a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java index a02ed54..1f9d1d2 100644 --- a/src/main/java/com/influxdb/v3/client/InfluxDBClient.java +++ b/src/main/java/com/influxdb/v3/client/InfluxDBClient.java @@ -52,8 +52,8 @@ public interface InfluxDBClient extends AutoCloseable { /** * Write a record specified in the InfluxDB Line Protocol to the InfluxDB server. * - * @param record the record specified in the InfluxDB Line Protocol, can be null - * @param options the options for writing data to InfluxDB + * @param record the record specified in the InfluxDB Line Protocol, can be null + * @param options the options for writing data to InfluxDB */ void writeRecord(@Nullable final String record, @Nonnull final WriteOptions options); @@ -67,7 +67,7 @@ public interface InfluxDBClient extends AutoCloseable { /** * Write records specified in the InfluxDB Line Protocol to the InfluxDB server. * - * @param records the records specified in the InfluxDB Line Protocol, cannot be null + * @param records the records specified in the InfluxDB Line Protocol, cannot be null * @param options the options for writing data to InfluxDB */ void writeRecords(@Nonnull final List records, @Nonnull final WriteOptions options); @@ -82,7 +82,7 @@ public interface InfluxDBClient extends AutoCloseable { /** * Write a {@link Point} to the InfluxDB server. * - * @param point the {@link Point} to write, can be null + * @param point the {@link Point} to write, can be null * @param options the options for writing data to InfluxDB */ void writePoint(@Nullable final Point point, @Nonnull final WriteOptions options); @@ -97,7 +97,7 @@ public interface InfluxDBClient extends AutoCloseable { /** * Write a list of {@link Point} to the InfluxDB server. * - * @param points the list of {@link Point} to write, cannot be null + * @param points the list of {@link Point} to write, cannot be null * @param options the options for writing data to InfluxDB */ void writePoints(@Nonnull final List points, @Nonnull final WriteOptions options); @@ -133,7 +133,7 @@ public interface InfluxDBClient extends AutoCloseable { * }); * * - * @param query the SQL query string to execute, cannot be null + * @param query the SQL query string to execute, cannot be null * @param parameters query named parameters * @return Batches of rows returned by the query */ @@ -152,7 +152,7 @@ public interface InfluxDBClient extends AutoCloseable { * }); * * - * @param query the SQL query string to execute, cannot be null + * @param query the query string to execute, cannot be null * @param options the options for querying data from InfluxDB * @return Batches of rows returned by the query */ @@ -172,9 +172,9 @@ public interface InfluxDBClient extends AutoCloseable { * }); * * - * @param query the SQL query string to execute, cannot be null + * @param query the query string to execute, cannot be null * @param parameters query named parameters - * @param options the options for querying data from InfluxDB + * @param options the options for querying data from InfluxDB * @return Batches of rows returned by the query */ @Nonnull @@ -194,7 +194,7 @@ Stream query(@Nonnull final String query, * }); * * - * @param query the SQL query string to execute, cannot be null + * @param query the SQL query string to execute, cannot be null * @return Batches of PointValues returned by the query */ @Nonnull @@ -232,7 +232,7 @@ Stream query(@Nonnull final String query, * }); * * - * @param query the SQL query string to execute, cannot be null + * @param query the query string to execute, cannot be null * @param options the options for querying data from InfluxDB * @return Batches of PointValues returned by the query */ @@ -253,9 +253,9 @@ Stream query(@Nonnull final String query, * }); * * - * @param query the SQL query string to execute, cannot be null + * @param query the query string to execute, cannot be null * @param parameters query named parameters - * @param options the options for querying data from InfluxDB + * @param options the options for querying data from InfluxDB * @return Batches of PointValues returned by the query */ @Nonnull @@ -294,7 +294,7 @@ Stream queryPoints(@Nonnull final String query, * }); * * - * @param query the SQL query string to execute, cannot be null + * @param query the SQL query string to execute, cannot be null * @param parameters query named parameters * @return Batches of rows returned by the query */ @@ -311,7 +311,7 @@ Stream queryPoints(@Nonnull final String query, * }); * * - * @param query the SQL query string to execute, cannot be null + * @param query the query string to execute, cannot be null * @param options the options for querying data from InfluxDB * @return Batches of rows returned by the query */ @@ -330,9 +330,9 @@ Stream queryPoints(@Nonnull final String query, * }); * * - * @param query the SQL query string to execute, cannot be null + * @param query the query string to execute, cannot be null * @param parameters query named parameters - * @param options the options for querying data from InfluxDB + * @param options the options for querying data from InfluxDB * @return Batches of rows returned by the query */ @Nonnull @@ -344,9 +344,9 @@ Stream queryBatches(@Nonnull final String query, * Creates a new instance of the {@link InfluxDBClient} for interacting with an InfluxDB server, simplifying * common operations such as writing, querying. * - * @param host the URL of the InfluxDB server - * @param token the authentication token for accessing the InfluxDB server, can be null - * @param database the database to be used for InfluxDB operations, can be null + * @param host the URL of the InfluxDB server + * @param token the authentication token for accessing the InfluxDB server, can be null + * @param database the database to be used for InfluxDB operations, can be null * @return new instance of the {@link InfluxDBClient} */ @Nonnull @@ -366,9 +366,9 @@ static InfluxDBClient getInstance(@Nonnull final String host, * Creates a new instance of the {@link InfluxDBClient} for interacting with an InfluxDB server, simplifying * common operations such as writing, querying. * - * @param host the URL of the InfluxDB server - * @param token the authentication token for accessing the InfluxDB server, can be null - * @param database the database to be used for InfluxDB operations, can be null + * @param host the URL of the InfluxDB server + * @param token the authentication token for accessing the InfluxDB server, can be null + * @param database the database to be used for InfluxDB operations, can be null * @param defaultTags tags to be added by default to writes of points * @return new instance of the {@link InfluxDBClient} */ @@ -379,11 +379,11 @@ static InfluxDBClient getInstance(@Nonnull final String host, @Nullable Map defaultTags) { ClientConfig config = new ClientConfig.Builder() - .host(host) - .token(token) - .database(database) - .defaultTags(defaultTags) - .build(); + .host(host) + .token(token) + .database(database) + .defaultTags(defaultTags) + .build(); return getInstance(config); } diff --git a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java index f8313ba..b50dca0 100644 --- a/src/main/java/com/influxdb/v3/client/config/ClientConfig.java +++ b/src/main/java/com/influxdb/v3/client/config/ClientConfig.java @@ -140,6 +140,7 @@ public String getDatabase() { /** * Gets the default precision to use for the timestamp of points. + * If no precision is specified than 'ns' is used. * * @return the default precision to use for the timestamp of points, may be null */ diff --git a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java index 6a2d2b6..bf3d725 100644 --- a/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/FlightSqlClient.java @@ -55,9 +55,9 @@ final class FlightSqlClient implements AutoCloseable { - private final HeaderCallOption headers; private final FlightClient client; + private final Map defaultHeaders = new HashMap<>(); private final ObjectMapper objectMapper = new ObjectMapper(); FlightSqlClient(@Nonnull final ClientConfig config) { @@ -73,17 +73,13 @@ final class FlightSqlClient implements AutoCloseable { FlightSqlClient(@Nonnull final ClientConfig config, @Nullable final FlightClient client) { Arguments.checkNotNull(config, "config"); - MetadataAdapter metadata = new MetadataAdapter(new Metadata()); if (config.getToken() != null && config.getToken().length > 0) { - metadata.insert("Authorization", "Bearer " + new String(config.getToken())); + defaultHeaders.put("Authorization", "Bearer " + new String(config.getToken())); } if (config.getHeaders() != null) { - for (Map.Entry entry : config.getHeaders().entrySet()) { - metadata.insert(entry.getKey(), entry.getValue()); - } + defaultHeaders.putAll(config.getHeaders()); } - this.headers = new HeaderCallOption(metadata); this.client = (client != null) ? client : createFlightClient(config); } @@ -91,7 +87,8 @@ final class FlightSqlClient implements AutoCloseable { Stream execute(@Nonnull final String query, @Nonnull final String database, @Nonnull final QueryType queryType, - @Nonnull final Map queryParameters) { + @Nonnull final Map queryParameters, + @Nonnull final Map headers) { Map ticketData = new HashMap<>() {{ put("database", database); @@ -110,8 +107,9 @@ Stream execute(@Nonnull final String query, throw new RuntimeException(e); } + HeaderCallOption headerCallOption = metadataHeader(headers); Ticket ticket = new Ticket(json.getBytes(StandardCharsets.UTF_8)); - FlightStream stream = client.getStream(ticket, headers); + FlightStream stream = client.getStream(ticket, headerCallOption); FlightSqlIterator iterator = new FlightSqlIterator(stream); Spliterator spliterator = Spliterators.spliteratorUnknownSize(iterator, Spliterator.NONNULL); @@ -148,6 +146,21 @@ private Location createLocation(@Nonnull final ClientConfig config) { } } + @Nonnull + private HeaderCallOption metadataHeader(@Nonnull final Map requestHeaders) { + MetadataAdapter metadata = new MetadataAdapter(new Metadata()); + for (Map.Entry entry : requestHeaders.entrySet()) { + metadata.insert(entry.getKey(), entry.getValue()); + } + + for (Map.Entry entry : defaultHeaders.entrySet()) { + if (!metadata.containsKey(entry.getKey())) { + metadata.insert(entry.getKey(), entry.getValue()); + } + } + return new HeaderCallOption(metadata); + } + private static final class FlightSqlIterator implements Iterator, AutoCloseable { private final List autoCloseable = new ArrayList<>(); diff --git a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java index b64d5e3..f43178b 100644 --- a/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java +++ b/src/main/java/com/influxdb/v3/client/internal/InfluxDBClientImpl.java @@ -87,13 +87,26 @@ public final class InfluxDBClientImpl implements InfluxDBClient { * @param config the client config. */ public InfluxDBClientImpl(@Nonnull final ClientConfig config) { + this(config, null, null); + } + + /** + * Constructor for testing purposes. + * + * @param config the client config + * @param restClient the rest client, if null a new client will be created + * @param flightSqlClient the flight sql client, if null a new client will be created + */ + InfluxDBClientImpl(@Nonnull final ClientConfig config, + @Nullable final RestClient restClient, + @Nullable final FlightSqlClient flightSqlClient) { Arguments.checkNotNull(config, "config"); config.validate(); this.config = config; - this.restClient = new RestClient(config); - this.flightSqlClient = new FlightSqlClient(config); + this.restClient = restClient != null ? restClient : new RestClient(config); + this.flightSqlClient = flightSqlClient != null ? flightSqlClient : new FlightSqlClient(config); } @Override @@ -305,8 +318,9 @@ private void writeData(@Nonnull final List data, @Nonnull final WriteOpti throw new InfluxDBApiException(e); } } + headers.putAll(options.headersSafe()); - restClient.request("api/v2/write", HttpMethod.POST, body, headers, queryParams); + restClient.request("api/v2/write", HttpMethod.POST, body, queryParams, headers); } @Nonnull @@ -334,7 +348,7 @@ private Stream queryData(@Nonnull final String query, } }); - return flightSqlClient.execute(query, database, options.queryTypeSafe(), parameters); + return flightSqlClient.execute(query, database, options.queryTypeSafe(), parameters, options.headersSafe()); } @Nonnull diff --git a/src/main/java/com/influxdb/v3/client/internal/RestClient.java b/src/main/java/com/influxdb/v3/client/internal/RestClient.java index 8e6eace..38e2a55 100644 --- a/src/main/java/com/influxdb/v3/client/internal/RestClient.java +++ b/src/main/java/com/influxdb/v3/client/internal/RestClient.java @@ -122,8 +122,7 @@ public void checkServerTrusted( void request(@Nonnull final String path, @Nonnull final HttpMethod method, @Nullable final byte[] data, - @Nullable final Map headers, - @Nullable final Map queryParams) { + @Nullable final Map queryParams, @Nullable final Map headers) { QueryStringEncoder uriEncoder = new QueryStringEncoder(String.format("%s%s", baseUrl, path)); if (queryParams != null) { @@ -148,16 +147,18 @@ void request(@Nonnull final String path, ? HttpRequest.BodyPublishers.noBody() : HttpRequest.BodyPublishers.ofByteArray(data)); // headers - if (defaultHeaders != null) { - for (Map.Entry entry : defaultHeaders.entrySet()) { - request.header(entry.getKey(), entry.getValue()); - } - } if (headers != null) { for (Map.Entry entry : headers.entrySet()) { request.header(entry.getKey(), entry.getValue()); } } + if (defaultHeaders != null) { + for (Map.Entry entry : defaultHeaders.entrySet()) { + if (headers == null || !headers.containsKey(entry.getKey())) { + request.header(entry.getKey(), entry.getValue()); + } + } + } request.header("User-Agent", userAgent); if (config.getToken() != null && config.getToken().length > 0) { request.header("Authorization", String.format("Token %s", new String(config.getToken()))); diff --git a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java index e7b1625..39f8731 100644 --- a/src/main/java/com/influxdb/v3/client/query/QueryOptions.java +++ b/src/main/java/com/influxdb/v3/client/query/QueryOptions.java @@ -21,6 +21,7 @@ */ package com.influxdb.v3.client.query; +import java.util.Map; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -35,6 +36,7 @@ *
    *
  • database - specifies the database to be used for InfluxDB operations
  • *
  • queryType - specifies the type of query sent to InfluxDB. Default to 'SQL'.
  • + *
  • headers - specifies the headers to be added to query request
  • *
*/ @ThreadSafe @@ -52,9 +54,10 @@ public final class QueryOptions { private final String database; private final QueryType queryType; + private final Map headers; /** - * Construct QueryAPI options. + * Construct QueryAPI options. The query type is set to SQL. * * @param database The database to be used for InfluxDB operations. */ @@ -71,6 +74,16 @@ public QueryOptions(@Nonnull final QueryType queryType) { this(null, queryType); } + /** + * Construct QueryAPI options. The query type is set to SQL. + * + * @param headers The headers to be added to query request. + * The headers specified here are preferred over the headers specified in the client configuration. + */ + public QueryOptions(@Nullable final Map headers) { + this(null, QueryType.SQL, headers); + } + /** * Construct QueryAPI options. * @@ -79,8 +92,24 @@ public QueryOptions(@Nonnull final QueryType queryType) { * @param queryType The type of query sent to InfluxDB. If it is not specified then use {@link QueryType#SQL}. */ public QueryOptions(@Nullable final String database, @Nullable final QueryType queryType) { + this(database, queryType, null); + } + + /** + * Construct QueryAPI options. + * + * @param database The database to be used for InfluxDB operations. + * If it is not specified then use {@link ClientConfig#getDatabase()}. + * @param queryType The type of query sent to InfluxDB. If it is not specified then use {@link QueryType#SQL}. + * @param headers The headers to be added to query request. + * The headers specified here are preferred over the headers specified in the client configuration. + */ + public QueryOptions(@Nullable final String database, + @Nullable final QueryType queryType, + @Nullable final Map headers) { this.database = database; this.queryType = queryType; + this.headers = headers == null ? Map.of() : headers; } /** @@ -101,6 +130,14 @@ public QueryType queryTypeSafe() { return queryType == null ? QueryType.SQL : queryType; } + /** + * @return The headers to be added to query request, cannot be null. + */ + @Nonnull + public Map headersSafe() { + return headers; + } + private boolean isNotDefined(final String option) { return option == null || option.isEmpty(); } diff --git a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java index 8024ed9..2e72e7d 100644 --- a/src/main/java/com/influxdb/v3/client/write/WriteOptions.java +++ b/src/main/java/com/influxdb/v3/client/write/WriteOptions.java @@ -40,6 +40,7 @@ *
  • organization - specifies the organization to be used for InfluxDB operations
  • *
  • precision - specifies the precision to use for the timestamp of points
  • *
  • defaultTags - specifies tags to be added by default to all write operations using points.
  • + *
  • headers - specifies the headers to be added to write request
  • * */ @ThreadSafe @@ -63,42 +64,78 @@ public final class WriteOptions { private final WritePrecision precision; private final Integer gzipThreshold; private final Map defaultTags; + private final Map headers; /** * Construct WriteAPI options. * - * @param database The database to be used for InfluxDB operations. - * @param precision The precision to use for the timestamp of points. + * @param database The database to be used for InfluxDB operations. + * If it is not specified then use {@link ClientConfig#getDatabase()}. + * @param precision The precision to use for the timestamp of points. + * If it is not specified then use {@link ClientConfig#getWritePrecision()}. * @param gzipThreshold The threshold for compressing request body. + * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}. */ - public WriteOptions(@Nonnull final String database, - @Nonnull final WritePrecision precision, - @Nonnull final Integer gzipThreshold) { - this.database = database; - this.precision = precision; - this.gzipThreshold = gzipThreshold; - this.defaultTags = new HashMap<>(); + public WriteOptions(@Nullable final String database, + @Nullable final WritePrecision precision, + @Nullable final Integer gzipThreshold) { + this(database, precision, gzipThreshold, null); } /** * Construct WriteAPI options. * - * @param database The database to be used for InfluxDB operations. - * @param precision The precision to use for the timestamp of points. + * @param database The database to be used for InfluxDB operations. + * If it is not specified then use {@link ClientConfig#getDatabase()}. + * @param precision The precision to use for the timestamp of points. + * If it is not specified then use {@link ClientConfig#getWritePrecision()}. * @param gzipThreshold The threshold for compressing request body. + * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}. * @param defaultTags Default tags to be added when writing points. */ - public WriteOptions(@Nonnull final String database, - @Nonnull final WritePrecision precision, - @Nonnull final Integer gzipThreshold, - @Nonnull final Map defaultTags) { + public WriteOptions(@Nullable final String database, + @Nullable final WritePrecision precision, + @Nullable final Integer gzipThreshold, + @Nullable final Map defaultTags) { + this(database, precision, gzipThreshold, defaultTags, null); + } + + /** + * Construct WriteAPI options. + * + * @param headers The headers to be added to write request. + * The headers specified here are preferred over the headers specified in the client configuration. + */ + public WriteOptions(@Nullable final Map headers) { + this(null, null, null, null, headers); + } + + /** + * Construct WriteAPI options. + * + * @param database The database to be used for InfluxDB operations. + * If it is not specified then use {@link ClientConfig#getDatabase()}. + * @param precision The precision to use for the timestamp of points. + * If it is not specified then use {@link ClientConfig#getWritePrecision()}. + * @param gzipThreshold The threshold for compressing request body. + * If it is not specified then use {@link WriteOptions#DEFAULT_GZIP_THRESHOLD}. + * @param defaultTags Default tags to be added when writing points. + * @param headers The headers to be added to write request. + * The headers specified here are preferred over the headers + * specified in the client configuration. + */ + public WriteOptions(@Nullable final String database, + @Nullable final WritePrecision precision, + @Nullable final Integer gzipThreshold, + @Nullable final Map defaultTags, + @Nullable final Map headers) { this.database = database; this.precision = precision; this.gzipThreshold = gzipThreshold; - this.defaultTags = defaultTags; + this.defaultTags = defaultTags == null ? Map.of() : defaultTags; + this.headers = headers == null ? Map.of() : headers; } - /** * @param config with default value * @return The destination database for writes. @@ -128,11 +165,11 @@ public WritePrecision precisionSafe(@Nonnull final ClientConfig config) { public Map defaultTagsSafe(@Nonnull final ClientConfig config) { Arguments.checkNotNull(config, "config"); return defaultTags.isEmpty() - ? (config.getDefaultTags() != null - ? config.getDefaultTags() - : defaultTags - ) - : defaultTags; + ? (config.getDefaultTags() != null + ? config.getDefaultTags() + : defaultTags + ) + : defaultTags; } /** @@ -146,6 +183,14 @@ public Integer gzipThresholdSafe(@Nonnull final ClientConfig config) { : (config.getWritePrecision() != null ? config.getGzipThreshold() : DEFAULT_GZIP_THRESHOLD); } + /** + * @return The headers to be added to write request. + */ + @Nonnull + public Map headersSafe() { + return headers; + } + @Override public boolean equals(final Object o) { if (this == o) { @@ -245,9 +290,6 @@ public WriteOptions build() { } private WriteOptions(@Nonnull final Builder builder) { - this.database = builder.database; - this.precision = builder.precision; - this.gzipThreshold = builder.gzipThreshold; - this.defaultTags = builder.defaultTags; + this(builder.database, builder.precision, builder.gzipThreshold, builder.defaultTags); } -} \ No newline at end of file +} diff --git a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java index 1f95c44..3c1c148 100644 --- a/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/FlightSqlClientTest.java @@ -39,7 +39,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.config.ClientConfig; +import com.influxdb.v3.client.query.QueryOptions; import com.influxdb.v3.client.query.QueryType; public class FlightSqlClientTest { @@ -107,7 +109,7 @@ public void callHeaders() throws Exception { try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig, client)) { - flightSqlClient.execute("select * from cpu", "mydb", QueryType.SQL, Map.of()); + flightSqlClient.execute("select * from cpu", "mydb", QueryType.SQL, Map.of(), Map.of()); final CallHeaders incomingHeaders = callHeadersMiddleware.headers; @@ -127,7 +129,7 @@ public void callHeadersWithoutToken() throws Exception { try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig, client)) { - flightSqlClient.execute("select * from cpu", "mydb", QueryType.SQL, Map.of()); + flightSqlClient.execute("select * from cpu", "mydb", QueryType.SQL, Map.of(), Map.of()); final CallHeaders incomingHeaders = callHeadersMiddleware.headers; @@ -145,7 +147,7 @@ public void callHeadersEmptyToken() throws Exception { try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig, client)) { - flightSqlClient.execute("select * from cpu", "mydb", QueryType.SQL, Map.of()); + flightSqlClient.execute("select * from cpu", "mydb", QueryType.SQL, Map.of(), Map.of()); final CallHeaders incomingHeaders = callHeadersMiddleware.headers; @@ -164,7 +166,7 @@ public void callHeadersCustomHeader() throws Exception { try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig, client)) { - flightSqlClient.execute("select * from cpu", "mydb", QueryType.SQL, Map.of()); + flightSqlClient.execute("select * from cpu", "mydb", QueryType.SQL, Map.of(), Map.of()); final CallHeaders incomingHeaders = callHeadersMiddleware.headers; @@ -177,6 +179,87 @@ public void callHeadersCustomHeader() throws Exception { } } + @Test + public void customHeaderForRequest() throws Exception { + ClientConfig clientConfig = new ClientConfig.Builder() + .host(serverLocation) + .token("my-token".toCharArray()) + .headers(Map.of("X-Tracing-Id", "123")) + .build(); + + try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig, client)) { + + flightSqlClient.execute( + "select * from cpu", + "mydb", + QueryType.SQL, + Map.of(), + Map.of("X-Invoice-Id", "456")); + + final CallHeaders incomingHeaders = callHeadersMiddleware.headers; + + Assertions.assertThat(incomingHeaders.keys()).containsOnly( + "authorization", + "x-tracing-id", + "x-invoice-id", + GrpcUtil.MESSAGE_ACCEPT_ENCODING + ); + Assertions.assertThat(incomingHeaders.get("X-Tracing-Id")).isEqualTo("123"); + } + } + + @Test + public void customHeaderForRequestOverrideConfig() throws Exception { + ClientConfig clientConfig = new ClientConfig.Builder() + .host(serverLocation) + .token("my-token".toCharArray()) + .headers(Map.of("X-Tracing-Id", "123")) + .build(); + + try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig, client)) { + + flightSqlClient.execute( + "select * from cpu", + "mydb", + QueryType.SQL, + Map.of(), + Map.of("X-Tracing-Id", "456")); + + final CallHeaders incomingHeaders = callHeadersMiddleware.headers; + + Assertions.assertThat(incomingHeaders.keys()).containsOnly( + "authorization", + "x-tracing-id", + GrpcUtil.MESSAGE_ACCEPT_ENCODING + ); + Assertions.assertThat(incomingHeaders.get("X-Tracing-Id")).isEqualTo("456"); + } + } + + @Test + public void useParamsFromQueryConfig() throws Exception { + ClientConfig clientConfig = new ClientConfig.Builder() + .host(serverLocation) + .token("my-token".toCharArray()) + .database("mydb") + .build(); + + try (FlightSqlClient flightSqlClient = new FlightSqlClient(clientConfig, client); + InfluxDBClient influxDBClient = new InfluxDBClientImpl(clientConfig, null, flightSqlClient)) { + + influxDBClient.query("select * from cpu", new QueryOptions(Map.of("X-Tracing-Id", "987"))); + + final CallHeaders incomingHeaders = callHeadersMiddleware.headers; + + Assertions.assertThat(incomingHeaders.keys()).containsOnly( + "authorization", + "x-tracing-id", + GrpcUtil.MESSAGE_ACCEPT_ENCODING + ); + Assertions.assertThat(incomingHeaders.get("X-Tracing-Id")).isEqualTo("987"); + } + } + static class CallHeadersMiddleware implements FlightClientMiddleware.Factory { CallHeaders headers; diff --git a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java index 08d4a56..2340247 100644 --- a/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java +++ b/src/test/java/com/influxdb/v3/client/internal/RestClientTest.java @@ -39,7 +39,9 @@ import com.influxdb.v3.client.AbstractMockServerTest; import com.influxdb.v3.client.InfluxDBApiException; +import com.influxdb.v3.client.InfluxDBClient; import com.influxdb.v3.client.config.ClientConfig; +import com.influxdb.v3.client.write.WriteOptions; public class RestClientTest extends AbstractMockServerTest { @@ -158,6 +160,67 @@ public void customHeader() throws InterruptedException { Assertions.assertThat(authorization).isEqualTo("ab-01"); } + @Test + public void customHeaderRequest() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .token("my-token".toCharArray()) + .headers(Map.of("X-device", "ab-01")) + .build()); + + restClient.request("ping", HttpMethod.GET, null, null, Map.of("X-Request-Trace-Id", "123")); + + RecordedRequest recordedRequest = mockServer.takeRequest(); + + String header = recordedRequest.getHeader("X-device"); + Assertions.assertThat(header).isEqualTo("ab-01"); + header = recordedRequest.getHeader("X-Request-Trace-Id"); + Assertions.assertThat(header).isEqualTo("123"); + } + + @Test + public void useCustomHeaderFromRequest() throws InterruptedException { + mockServer.enqueue(createResponse(200)); + + restClient = new RestClient(new ClientConfig.Builder() + .host(baseURL) + .token("my-token".toCharArray()) + .headers(Map.of("X-device", "ab-01")) + .build()); + + restClient.request("ping", HttpMethod.GET, null, null, Map.of("X-device", "ab-02")); + + RecordedRequest recordedRequest = mockServer.takeRequest(); + + String header = recordedRequest.getHeader("X-device"); + Assertions.assertThat(header).isEqualTo("ab-02"); + } + + @Test + public void useParamsFromWriteConfig() throws Exception { + + ClientConfig config = new ClientConfig.Builder() + .host(baseURL) + .token("my-token".toCharArray()) + .database("my-database") + .build(); + + mockServer.enqueue(createResponse(200)); + + try (RestClient restClient = new RestClient(config); + InfluxDBClient client = new InfluxDBClientImpl(config, restClient, null)) { + + client.writeRecord("mem,tag=one value=1.0", new WriteOptions(Map.of("X-Tracing-Id", "852"))); + } + + RecordedRequest recordedRequest = mockServer.takeRequest(); + + String header = recordedRequest.getHeader("X-Tracing-Id"); + Assertions.assertThat(header).isEqualTo("852"); + } + @Test public void uri() throws InterruptedException { mockServer.enqueue(createResponse(200));