Skip to content

Commit

Permalink
Deprecated ClusterService and Using NodeClient to fetch meta data (op…
Browse files Browse the repository at this point in the history
  • Loading branch information
penghuo authored Aug 18, 2022
1 parent 808813d commit 0576d96
Show file tree
Hide file tree
Showing 8 changed files with 170 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
package org.opensearch.sql.legacy.plugin;

import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.expression.config.ExpressionConfig;
Expand All @@ -34,8 +33,6 @@
@Configuration
@Import({ExpressionConfig.class})
public class OpenSearchSQLPluginConfig {
@Autowired
private ClusterService clusterService;

@Autowired
private NodeClient nodeClient;
Expand All @@ -48,7 +45,7 @@ public class OpenSearchSQLPluginConfig {

@Bean
public OpenSearchClient client() {
return new OpenSearchNodeClient(clusterService, nodeClient);
return new OpenSearchNodeClient(nodeClient);
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.opensearch.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.rest.RestStatus.OK;
import static org.opensearch.rest.RestStatus.SERVICE_UNAVAILABLE;
import static org.opensearch.sql.opensearch.executor.Scheduler.schedule;

import com.alibaba.druid.sql.parser.ParserException;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -147,19 +148,27 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient cli

Format format = SqlRequestParam.getFormat(request.params());

// Route request to new query engine if it's supported already
SQLQueryRequest newSqlRequest = new SQLQueryRequest(sqlRequest.getJsonContent(),
sqlRequest.getSql(), request.path(), request.params());
RestChannelConsumer result = newSqlQueryHandler.prepareRequest(newSqlRequest, client);
if (result != RestSQLQueryAction.NOT_SUPPORTED_YET) {
LOG.info("[{}] Request is handled by new SQL query engine", LogUtils.getRequestId());
return result;
}
LOG.debug("[{}] Request {} is not supported and falling back to old SQL engine",
LogUtils.getRequestId(), newSqlRequest);

final QueryAction queryAction = explainRequest(client, sqlRequest, format);
return channel -> executeSqlRequest(request, queryAction, client, channel);
return channel -> schedule(client, () -> {
try {
// Route request to new query engine if it's supported already
SQLQueryRequest newSqlRequest = new SQLQueryRequest(sqlRequest.getJsonContent(),
sqlRequest.getSql(), request.path(), request.params());
RestChannelConsumer result = newSqlQueryHandler.prepareRequest(newSqlRequest, client);
if (result != RestSQLQueryAction.NOT_SUPPORTED_YET) {
LOG.info("[{}] Request is handled by new SQL query engine", LogUtils.getRequestId());
result.accept(channel);
} else {
LOG.debug("[{}] Request {} is not supported and falling back to old SQL engine",
LogUtils.getRequestId(), newSqlRequest);

QueryAction queryAction = explainRequest(client, sqlRequest, format);
executeSqlRequest(request, queryAction, client, channel);
}
} catch (Exception e) {
logAndPublishMetrics(e);
reportError(channel, e, isClientError(e) ? BAD_REQUEST : SERVICE_UNAVAILABLE);
}
});
} catch (Exception e) {
logAndPublishMetrics(e);
return channel -> reportError(channel, e, isClientError(e) ? BAD_REQUEST : SERVICE_UNAVAILABLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@

package org.opensearch.sql.opensearch.client;

import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import com.google.common.collect.Streams;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
Expand All @@ -18,45 +17,31 @@
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.logging.log4j.ThreadContext;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.metadata.AliasMetadata;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.MappingMetadata;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.collect.ImmutableOpenMap;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.sql.opensearch.mapping.IndexMapping;
import org.opensearch.sql.opensearch.request.OpenSearchRequest;
import org.opensearch.sql.opensearch.response.OpenSearchResponse;
import org.opensearch.threadpool.ThreadPool;

/** OpenSearch connection by node client. */
public class OpenSearchNodeClient implements OpenSearchClient {

public static final Function<String, Predicate<String>> ALL_FIELDS =
(anyIndex -> (anyField -> true));

/** Current cluster state on local node. */
private final ClusterService clusterService;

/** Node client provided by OpenSearch container. */
private final NodeClient client;

/** Index name expression resolver to get concrete index name. */
private final IndexNameExpressionResolver resolver;

private static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";

/**
* Constructor of ElasticsearchNodeClient.
*/
public OpenSearchNodeClient(ClusterService clusterService,
NodeClient client) {
this.clusterService = clusterService;
public OpenSearchNodeClient(NodeClient client) {
this.client = client;
this.resolver = new IndexNameExpressionResolver(client.threadPool().getThreadContext());
}
Expand All @@ -75,12 +60,14 @@ public OpenSearchNodeClient(ClusterService clusterService,
@Override
public Map<String, IndexMapping> getIndexMappings(String... indexExpression) {
try {
ClusterState state = clusterService.state();
String[] concreteIndices = resolveIndexExpression(state, indexExpression);

return populateIndexMappings(
state.metadata().findMappings(concreteIndices, ALL_FIELDS));
} catch (IOException e) {
GetMappingsResponse mappingsResponse = client.admin().indices()
.prepareGetMappings(indexExpression)
.setLocal(true)
.get();
return Streams.stream(mappingsResponse.mappings().iterator())
.collect(Collectors.toMap(cursor -> cursor.key,
cursor -> new IndexMapping(cursor.value)));
} catch (Exception e) {
throw new IllegalStateException(
"Failed to read mapping in cluster state for index pattern [" + indexExpression + "]", e);
}
Expand Down Expand Up @@ -123,9 +110,8 @@ public List<String> indices() {
*/
@Override
public Map<String, String> meta() {
final ImmutableMap.Builder<String, String> builder = new ImmutableMap.Builder<>();
builder.put(META_CLUSTER_NAME, clusterService.getClusterName().value());
return builder.build();
return ImmutableMap.of(META_CLUSTER_NAME,
client.settings().get("cluster.name", "opensearch"));
}

@Override
Expand All @@ -135,40 +121,12 @@ public void cleanup(OpenSearchRequest request) {

@Override
public void schedule(Runnable task) {
ThreadPool threadPool = client.threadPool();
threadPool.schedule(
withCurrentContext(task),
new TimeValue(0),
SQL_WORKER_THREAD_POOL_NAME
);
// at that time, task already running the sql-worker ThreadPool.
task.run();
}

@Override
public NodeClient getNodeClient() {
return client;
}

private String[] resolveIndexExpression(ClusterState state, String[] indices) {
return resolver.concreteIndexNames(state, IndicesOptions.strictExpandOpen(), true, indices);
}

private Map<String, IndexMapping> populateIndexMappings(
ImmutableOpenMap<String, MappingMetadata> indexMappings) {

ImmutableMap.Builder<String, IndexMapping> result = ImmutableMap.builder();
for (ObjectObjectCursor<String, MappingMetadata> cursor:
indexMappings) {
result.put(cursor.key, new IndexMapping(cursor.value));
}
return result.build();
}

/** Copy from LogUtils. */
private static Runnable withCurrentContext(final Runnable task) {
final Map<String, String> currentContext = ThreadContext.getImmutableContext();
return () -> {
ThreadContext.putAll(currentContext);
task.run();
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.opensearch.executor;

import java.util.Map;
import lombok.experimental.UtilityClass;
import org.apache.logging.log4j.ThreadContext;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.ThreadPool;

/** The scheduler which schedule the task run in sql-worker thread pool. */
@UtilityClass
public class Scheduler {

public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker";

public static void schedule(NodeClient client, Runnable task) {
ThreadPool threadPool = client.threadPool();
threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME);
}

private static Runnable withCurrentContext(final Runnable task) {
final Map<String, String> currentContext = ThreadContext.getImmutableContext();
return () -> {
ThreadContext.putAll(currentContext);
task.run();
};
}
}
Loading

0 comments on commit 0576d96

Please sign in to comment.