Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for passthrough Elasticsearch queries #15900

Merged
merged 1 commit into from
Apr 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions presto-docs/src/main/sphinx/connector/elasticsearch.rst
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,28 @@ as part of the table name, separated by a colon. For example:
.. _full text query: https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#query-string-syntax


Pass-through Queries
--------------------

The Elasticsearch connector allows you to embed any valid Elasticsearch query,
that uses the `Elasticsearch Query DSL
<https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl.html>`_
in your SQL query.

The results can then be used in any SQL statement, wrapping the Elasticsearch
query. The syntax extends the syntax of the enhanced Elasticsearch table names
with the following::

SELECT * FROM es.default."<index>$query:<es-query>"

The Elasticsearch query string ``es-query`` is base32-encoded to avoid having to
deal with escaping quotes and case sensitivity issues in table identifiers.

The result of these query tables is a table with a single row and a single
column named ``result`` of type VARCHAR. It contains the JSON payload returned
by Elasticsearch, and can be processed with the :doc:`built-in JSON functions
</functions/json>`.

AWS Authorization
-----------------

Expand Down
12 changes: 12 additions & 0 deletions presto-elasticsearch/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,19 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>

<!-- for testing -->
<dependency>
<groupId>org.jetbrains</groupId>
<artifactId>annotations</artifactId>
<version>19.0.0</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-client</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.elasticsearch;

import com.facebook.airlift.json.JsonObjectMapperProvider;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.RowType;
import com.facebook.presto.common.type.StandardTypes;
Expand All @@ -33,11 +34,15 @@
import com.facebook.presto.spi.ConnectorTableLayoutResult;
import com.facebook.presto.spi.ConnectorTableMetadata;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.SchemaTablePrefix;
import com.facebook.presto.spi.connector.ConnectorMetadata;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.BaseEncoding;

import javax.inject.Inject;

Expand All @@ -58,14 +63,25 @@
import static com.facebook.presto.common.type.TinyintType.TINYINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.common.type.VarcharType.VARCHAR;
import static com.facebook.presto.elasticsearch.ElasticsearchTableHandle.Type.QUERY;
import static com.facebook.presto.elasticsearch.ElasticsearchTableHandle.Type.SCAN;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static java.lang.String.format;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

public class ElasticsearchMetadata
implements ConnectorMetadata
{
private static final ObjectMapper JSON_PARSER = new JsonObjectMapperProvider().get();

private static final String PASSTHROUGH_QUERY_SUFFIX = "$query";
private final Map<String, ColumnHandle> queryTableColumns;
private final ColumnMetadata queryResultColumnMetadata;

private final ElasticsearchClient client;
private final String schemaName;
private final Type ipAddressType;
Expand All @@ -76,7 +92,18 @@ public ElasticsearchMetadata(TypeManager typeManager, ElasticsearchClient client
requireNonNull(config, "config is null");
this.ipAddressType = typeManager.getType(new TypeSignature(StandardTypes.IPADDRESS));
this.client = requireNonNull(client, "client is null");
requireNonNull(config, "config is null");
this.schemaName = config.getDefaultSchema();

Type jsonType = typeManager.getType(new TypeSignature(StandardTypes.JSON));
queryResultColumnMetadata = ColumnMetadata.builder()
.setName("result")
.setType(jsonType)
.setNullable(true)
.setHidden(false)
.build();

queryTableColumns = ImmutableMap.of("result", new ElasticsearchColumnHandle("result", jsonType, false));
}

@Override
Expand All @@ -93,12 +120,37 @@ public ElasticsearchTableHandle getTableHandle(ConnectorSession session, SchemaT
String[] parts = tableName.getTableName().split(":", 2);
String table = parts[0];
Optional<String> query = Optional.empty();
ElasticsearchTableHandle.Type type = SCAN;
if (parts.length == 2) {
query = Optional.of(parts[1]);
if (table.endsWith(PASSTHROUGH_QUERY_SUFFIX)) {
table = table.substring(0, table.length() - PASSTHROUGH_QUERY_SUFFIX.length());
byte[] decoded;
try {
decoded = BaseEncoding.base32().decode(parts[1].toUpperCase(ENGLISH));
}
catch (IllegalArgumentException e) {
throw new PrestoException(INVALID_ARGUMENTS, format("Elasticsearch query for '%s' is not base32-encoded correctly", table), e);
}

String queryJson = new String(decoded, UTF_8);
try {
// Ensure this is valid json
JSON_PARSER.readTree(queryJson);
}
catch (JsonProcessingException e) {
throw new PrestoException(INVALID_ARGUMENTS, format("Elasticsearch query for '%s' is not valid JSON", table), e);
}

query = Optional.of(queryJson);
type = QUERY;
}
else {
query = Optional.of(parts[1]);
}
}

if (listTables(session, Optional.of(schemaName)).contains(new SchemaTableName(schemaName, table))) {
return new ElasticsearchTableHandle(schemaName, table, query);
return new ElasticsearchTableHandle(type, schemaName, table, query);
}
}
return null;
Expand All @@ -122,6 +174,12 @@ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTa
public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
{
ElasticsearchTableHandle handle = (ElasticsearchTableHandle) table;

if (isPassthroughQuery(handle)) {
return new ConnectorTableMetadata(
new SchemaTableName(handle.getSchema(), handle.getIndex()),
ImmutableList.of(queryResultColumnMetadata));
}
return getTableMetadata(handle.getSchema(), handle.getIndex());
}

Expand Down Expand Up @@ -295,15 +353,39 @@ public List<SchemaTableName> listTables(ConnectorSession session, Optional<Strin
@Override
public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
{
ElasticsearchTableHandle table = (ElasticsearchTableHandle) tableHandle;

if (isPassthroughQuery(table)) {
return queryTableColumns;
}

InternalTableMetadata tableMetadata = makeInternalTableMetadata(tableHandle);
return tableMetadata.getColumnHandles();
}

@Override
public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
{
ElasticsearchColumnHandle handle = (ElasticsearchColumnHandle) columnHandle;
return new ColumnMetadata(handle.getName(), handle.getType());
ElasticsearchTableHandle table = (ElasticsearchTableHandle) tableHandle;
ElasticsearchColumnHandle column = (ElasticsearchColumnHandle) columnHandle;

if (isPassthroughQuery(table)) {
if (column.getName().equals(queryResultColumnMetadata.getName())) {
return queryResultColumnMetadata;
}

throw new IllegalArgumentException(format("Unexpected column for table '%s$query': %s", table.getIndex(), column.getName()));
}

return ColumnMetadata.builder()
.setName(column.getName())
.setType(column.getType())
.build();
}

private static boolean isPassthroughQuery(ElasticsearchTableHandle table)
{
return table.getType().equals(QUERY);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
*/
package com.facebook.presto.elasticsearch;

import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.elasticsearch.client.ElasticsearchClient;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPageSource;
Expand All @@ -27,18 +31,21 @@

import java.util.List;

import static com.facebook.presto.elasticsearch.ElasticsearchTableHandle.Type.QUERY;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

public class ElasticsearchPageSourceProvider
implements ConnectorPageSourceProvider
{
private final ElasticsearchClient client;
private final Type jsonType;

@Inject
public ElasticsearchPageSourceProvider(ElasticsearchClient client)
public ElasticsearchPageSourceProvider(ElasticsearchClient client, TypeManager typeManager)
{
this.client = requireNonNull(client, "client is null");
this.jsonType = typeManager.getType(new TypeSignature(StandardTypes.JSON));
}

@Override
Expand All @@ -55,6 +62,10 @@ public ConnectorPageSource createPageSource(
ElasticsearchTableLayoutHandle layoutHandle = (ElasticsearchTableLayoutHandle) layout;
ElasticsearchSplit elasticsearchSplit = (ElasticsearchSplit) split;

if (layoutHandle.getTable().getType().equals(QUERY)) {
return new PassthroughQueryPageSource(client, layoutHandle.getTable(), jsonType);
}

if (columns.isEmpty()) {
return new CountQueryPageSource(client, session, layoutHandle.getTable(), elasticsearchSplit);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import com.facebook.presto.spi.FixedSplitSource;
import com.facebook.presto.spi.connector.ConnectorSplitManager;
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.google.common.collect.ImmutableList;

import javax.inject.Inject;

import java.util.List;
import java.util.Optional;

import static com.facebook.presto.elasticsearch.ElasticsearchTableHandle.Type.QUERY;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;

Expand All @@ -49,10 +52,14 @@ public ConnectorSplitSource getSplits(
ElasticsearchTableLayoutHandle layoutHandle = (ElasticsearchTableLayoutHandle) layout;
ElasticsearchTableHandle tableHandle = layoutHandle.getTable();

List<ElasticsearchSplit> splits = client.getSearchShards(tableHandle.getIndex()).stream()
.map(shard -> new ElasticsearchSplit(shard.getIndex(), shard.getId(), layoutHandle.getTupleDomain(), shard.getAddress()))
.collect(toImmutableList());

return new FixedSplitSource(splits);
if (tableHandle.getType().equals(QUERY)) {
return new FixedSplitSource(ImmutableList.of(new ElasticsearchSplit(tableHandle.getIndex(), 0, layoutHandle.getTupleDomain(), Optional.empty())));
}
else {
List<ElasticsearchSplit> splits = client.getSearchShards(tableHandle.getIndex()).stream()
.map(shard -> new ElasticsearchSplit(shard.getIndex(), shard.getId(), layoutHandle.getTupleDomain(), shard.getAddress()))
.collect(toImmutableList());
return new FixedSplitSource(splits);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,36 @@
public final class ElasticsearchTableHandle
implements ConnectorTableHandle
{
public enum Type
{
SCAN,
QUERY
}

private final Type type;
private final String schema;
private final String index;
private final Optional<String> query;

@JsonCreator
public ElasticsearchTableHandle(
@JsonProperty("type") Type type,
@JsonProperty("schema") String schema,
@JsonProperty("index") String index,
@JsonProperty("query") Optional<String> query)
{
this.type = requireNonNull(type, "type is null");
this.schema = requireNonNull(schema, "schema is null");
this.index = requireNonNull(index, "index is null");
this.query = requireNonNull(query, "query is null");
}

@JsonProperty
public Type getType()
{
return type;
}

@JsonProperty
public String getIndex()
{
Expand All @@ -62,7 +77,7 @@ public Optional<String> getQuery()
@Override
public int hashCode()
{
return Objects.hash(schema, index, query);
return Objects.hash(type, schema, index, query);
}

@Override
Expand All @@ -76,7 +91,8 @@ public boolean equals(Object obj)
}

ElasticsearchTableHandle other = (ElasticsearchTableHandle) obj;
return Objects.equals(this.getSchema(), other.getSchema()) &&
return Objects.equals(this.type, other.getType()) &&
Objects.equals(this.getSchema(), other.getSchema()) &&
Objects.equals(this.getIndex(), other.getIndex()) &&
Objects.equals(this.getQuery(), other.getQuery());
}
Expand All @@ -85,6 +101,7 @@ public boolean equals(Object obj)
public String toString()
{
return toStringHelper(this)
.add("type", getType())
.add("schema", getSchema())
.add("index", getIndex())
.add("query", getQuery())
Expand Down
Loading