From e6b6cee8cb485cbbf55fca9f44b9ec66a3082bc0 Mon Sep 17 00:00:00 2001 From: Zhongnan Su Date: Tue, 10 May 2022 02:28:16 -0700 Subject: [PATCH] address comments Signed-off-by: Zhongnan Su --- plugin/build.gradle | 2 +- .../org/opensearch/sql/plugin/SQLPlugin.java | 22 ++++--- .../transport/TransportPPLQueryAction.java | 58 +++++++++++++++++++ ...ryHelper.java => TransportPPLService.java} | 38 ++++++------ ...tion.java => TransportSQLQueryAction.java} | 33 +++++------ ...ryHelper.java => TransportSQLService.java} | 37 ++++++------ 6 files changed, 122 insertions(+), 68 deletions(-) create mode 100644 plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java rename plugin/src/main/java/org/opensearch/sql/plugin/transport/{PPLQueryHelper.java => TransportPPLService.java} (75%) rename plugin/src/main/java/org/opensearch/sql/plugin/transport/{TransportQueryAction.java => TransportSQLQueryAction.java} (52%) rename plugin/src/main/java/org/opensearch/sql/plugin/transport/{SQLQueryHelper.java => TransportSQLService.java} (76%) diff --git a/plugin/build.gradle b/plugin/build.gradle index 1341e71a79..343412783a 100644 --- a/plugin/build.gradle +++ b/plugin/build.gradle @@ -116,7 +116,7 @@ dependencyLicenses.enabled = false // enable testingConventions check will cause errors like: "Classes ending with [Tests] must subclass [LuceneTestCase]" testingConventions.enabled = false -// TODO: need to verify the thirdParty +// TODO: need to verify the thirdPartyAudit // currently it complains missing classes like ibatis, mysql etc, should not be a problem thirdPartyAudit.enabled = false diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 1b5e2bea7b..945d755461 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -33,7 +33,8 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.commons.sql.action.SQLActions; -import org.opensearch.commons.sql.action.TransportQueryResponse; +import org.opensearch.commons.sql.action.TransportPPLQueryResponse; +import org.opensearch.commons.sql.action.TransportSQLQueryResponse; import org.opensearch.env.Environment; import org.opensearch.env.NodeEnvironment; import org.opensearch.plugins.ActionPlugin; @@ -60,9 +61,10 @@ import org.opensearch.sql.plugin.rest.RestPPLQueryAction; import org.opensearch.sql.plugin.rest.RestPPLStatsAction; import org.opensearch.sql.plugin.rest.RestQuerySettingsAction; -import org.opensearch.sql.plugin.transport.PPLQueryHelper; -import org.opensearch.sql.plugin.transport.SQLQueryHelper; -import org.opensearch.sql.plugin.transport.TransportQueryAction; +import org.opensearch.sql.plugin.transport.TransportPPLService; +import org.opensearch.sql.plugin.transport.TransportPPLQueryAction; +import org.opensearch.sql.plugin.transport.TransportSQLService; +import org.opensearch.sql.plugin.transport.TransportSQLQueryAction; import org.opensearch.sql.ppl.PPLService; import org.opensearch.sql.ppl.config.PPLServiceConfig; import org.opensearch.sql.sql.SQLService; @@ -120,8 +122,12 @@ public List getRestHandlers(Settings settings, RestController restC return Arrays .asList( new ActionHandler<>( - new ActionType<>(SQLActions.SEND_SQL_QUERY_NAME, TransportQueryResponse::new), - TransportQueryAction.class + new ActionType<>(SQLActions.SEND_SQL_QUERY_NAME, TransportSQLQueryResponse::new), + TransportSQLQueryAction.class + ), + new ActionHandler<>( + new ActionType<>(SQLActions.SEND_PPL_QUERY_NAME, TransportPPLQueryResponse::new), + TransportPPLQueryAction.class ) ); } @@ -145,8 +151,8 @@ public Collection createComponents(Client client, ClusterService cluster LocalClusterState.state().setClusterService(clusterService); LocalClusterState.state().setPluginSettings((OpenSearchSettings) pluginSettings); - PPLQueryHelper.getInstance().setPplService(createPPLService((NodeClient) client)); - SQLQueryHelper.getInstance().setSqlService(createSQLService((NodeClient) client)); + TransportPPLService.getInstance().setPplService(createPPLService((NodeClient) client)); + TransportSQLService.getInstance().setSqlService(createSQLService((NodeClient) client)); return super .createComponents(client, clusterService, threadPool, resourceWatcherService, scriptService, diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java new file mode 100644 index 0000000000..88fecb937e --- /dev/null +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLQueryAction.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + + +package org.opensearch.sql.plugin.transport; + +import org.opensearch.action.ActionListener; +import org.opensearch.action.ActionRequest; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.client.Client; +import org.opensearch.common.inject.Inject; +import org.opensearch.commons.sql.action.SQLActions; +import org.opensearch.commons.sql.action.TransportPPLQueryRequest; +import org.opensearch.commons.sql.action.TransportPPLQueryResponse; +import org.opensearch.tasks.Task; +import org.opensearch.transport.TransportService; +import org.opensearch.commons.utils.TransportHelpersKt; + +import java.io.IOException; + +/** + * Send PPL query transport action + */ +public class TransportPPLQueryAction extends HandledTransportAction { + private final Client client; + + @Inject + public TransportPPLQueryAction(TransportService transportService, ActionFilters actionFilters, Client client) { + super(SQLActions.SEND_PPL_QUERY_NAME, transportService, actionFilters, TransportPPLQueryRequest::new); + this.client = client; + } + + /** + * {@inheritDoc} + * Transform the request and call super.doExecute() to support call from other plugins. + */ + @Override + protected void doExecute(Task task, ActionRequest request, ActionListener listener) { + TransportPPLQueryRequest transformedRequest; + if (request instanceof TransportPPLQueryRequest) { + transformedRequest = (TransportPPLQueryRequest) request; + } else { + transformedRequest = TransportHelpersKt.recreateObject(request, streamInput -> { + try { + return new TransportPPLQueryRequest(streamInput); + } catch (IOException e) { + listener.onFailure(e); + } + return null; + } + ); + } + TransportPPLService.execute(transformedRequest, listener); + } +} \ No newline at end of file diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/PPLQueryHelper.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLService.java similarity index 75% rename from plugin/src/main/java/org/opensearch/sql/plugin/transport/PPLQueryHelper.java rename to plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLService.java index 13fd56eb28..24b0be21ed 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/PPLQueryHelper.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportPPLService.java @@ -8,8 +8,8 @@ import org.opensearch.OpenSearchStatusException; import org.opensearch.action.ActionListener; -import org.opensearch.commons.sql.action.TransportQueryRequest; -import org.opensearch.commons.sql.action.TransportQueryResponse; +import org.opensearch.commons.sql.action.TransportPPLQueryRequest; +import org.opensearch.commons.sql.action.TransportPPLQueryResponse; import org.opensearch.index.IndexNotFoundException; import org.opensearch.rest.RestStatus; import org.opensearch.sql.common.antlr.SyntaxCheckException; @@ -18,8 +18,6 @@ import org.opensearch.sql.exception.QueryEngineException; import org.opensearch.sql.exception.SemanticCheckException; import org.opensearch.sql.executor.ExecutionEngine; -import org.opensearch.sql.legacy.metrics.MetricName; -import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.ppl.PPLService; import org.opensearch.sql.ppl.domain.PPLQueryRequest; import org.opensearch.sql.protocol.response.QueryResult; @@ -32,25 +30,28 @@ import static org.opensearch.rest.RestStatus.BAD_REQUEST; import static org.opensearch.rest.RestStatus.SERVICE_UNAVAILABLE; +import static org.opensearch.sql.plugin.rest.RestPPLQueryAction.QUERY_API_ENDPOINT; import static org.opensearch.sql.protocol.response.format.JsonResponseFormatter.Style.PRETTY; -public class PPLQueryHelper { +public class TransportPPLService { private static PPLService pplService; - private static PPLQueryHelper INSTANCE; + private static TransportPPLService INSTANCE; - public static synchronized PPLQueryHelper getInstance() { + private TransportPPLService() {} + + public static synchronized TransportPPLService getInstance() { if (INSTANCE == null) { - INSTANCE = new PPLQueryHelper(); + INSTANCE = new TransportPPLService(); } return INSTANCE; } public void setPplService(PPLService pplService) { - PPLQueryHelper.pplService = pplService; + TransportPPLService.pplService = pplService; } - public static void execute(TransportQueryRequest request, ActionListener listener) { - // convert the TransportQueryRequest request to PPLQueryRequest + public static void execute(TransportPPLQueryRequest request, ActionListener listener) { + // convert the TransportPPLQueryRequest request to PPLQueryRequest PPLQueryRequest pplRequest = createPPLQueryRequest(request); // execute by ppl service pplService.execute(pplRequest, createListener(pplRequest, listener)); @@ -58,7 +59,7 @@ public static void execute(TransportQueryRequest request, ActionListener createListener( PPLQueryRequest pplRequest, - ActionListener listener + ActionListener listener ) { Format format = pplRequest.format(); ResponseFormatter formatter; @@ -83,21 +84,19 @@ public void onResponse(ExecutionEngine.QueryResponse response) { @Override public void onFailure(Exception e) { if (isClientError(e)) { - Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_CUS).increment(); reportError(listener, e, BAD_REQUEST); } else { - Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_SYS).increment(); reportError(listener, e, SERVICE_UNAVAILABLE); } } }; } - private static void sendResponse(String content, ActionListener listener) { - listener.onResponse(new TransportQueryResponse(content)); + private static void sendResponse(String content, ActionListener listener) { + listener.onResponse(new TransportPPLQueryResponse(content)); } - private static void reportError(ActionListener listener, final Exception exception, final RestStatus status) { + private static void reportError(ActionListener listener, final Exception exception, final RestStatus status) { listener.onFailure(new OpenSearchStatusException(exception.getMessage(), status)); } @@ -112,8 +111,7 @@ private static boolean isClientError(Exception e) { || e instanceof SyntaxCheckException; } - private static PPLQueryRequest createPPLQueryRequest(TransportQueryRequest request) { - return new PPLQueryRequest(request.getQuery(), null,"/_plugins/_ppl"); + private static PPLQueryRequest createPPLQueryRequest(TransportPPLQueryRequest request) { + return new PPLQueryRequest(request.getQuery(), null,QUERY_API_ENDPOINT); } - } diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportQueryAction.java b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportSQLQueryAction.java similarity index 52% rename from plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportQueryAction.java rename to plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportSQLQueryAction.java index e449c57d3d..ab88180abc 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportQueryAction.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/transport/TransportSQLQueryAction.java @@ -12,13 +12,9 @@ import org.opensearch.action.support.HandledTransportAction; import org.opensearch.client.Client; import org.opensearch.common.inject.Inject; -import org.opensearch.common.util.concurrent.ThreadContext; -import org.opensearch.commons.ConfigConstants; -import org.opensearch.commons.authuser.User; import org.opensearch.commons.sql.action.SQLActions; -import org.opensearch.commons.sql.action.TransportQueryRequest; -import org.opensearch.commons.sql.action.TransportQueryResponse; -import org.opensearch.commons.sql.model.QueryType; +import org.opensearch.commons.sql.action.TransportSQLQueryRequest; +import org.opensearch.commons.sql.action.TransportSQLQueryResponse; import org.opensearch.tasks.Task; import org.opensearch.transport.TransportService; import org.opensearch.commons.utils.TransportHelpersKt; @@ -26,14 +22,14 @@ import java.io.IOException; /** - * Send SQL/PPL query transport action + * Send SQL query transport action */ -public class TransportQueryAction extends HandledTransportAction { +public class TransportSQLQueryAction extends HandledTransportAction { private final Client client; @Inject - public TransportQueryAction(TransportService transportService, ActionFilters actionFilters, Client client) { - super(SQLActions.SEND_SQL_QUERY_NAME, transportService, actionFilters, TransportQueryRequest::new); + public TransportSQLQueryAction(TransportService transportService, ActionFilters actionFilters, Client client) { + super(SQLActions.SEND_SQL_QUERY_NAME, transportService, actionFilters, TransportSQLQueryRequest::new); this.client = client; } @@ -42,14 +38,14 @@ public TransportQueryAction(TransportService transportService, ActionFilters act * Transform the request and call super.doExecute() to support call from other plugins. */ @Override - protected void doExecute(Task task, ActionRequest request, ActionListener listener) { - TransportQueryRequest transformedRequest; - if (request instanceof TransportQueryRequest) { - transformedRequest = (TransportQueryRequest) request; + protected void doExecute(Task task, ActionRequest request, ActionListener listener) { + TransportSQLQueryRequest transformedRequest; + if (request instanceof TransportSQLQueryRequest) { + transformedRequest = (TransportSQLQueryRequest) request; } else { transformedRequest = TransportHelpersKt.recreateObject(request, streamInput -> { try { - return new TransportQueryRequest(streamInput); + return new TransportSQLQueryRequest(streamInput); } catch (IOException e) { listener.onFailure(e); } @@ -57,10 +53,7 @@ protected void doExecute(Task task, ActionRequest request, ActionListener listener) { - // convert the TransportQueryRequest request to SQLQueryRequest + public static void execute(TransportSQLQueryRequest request, ActionListener listener) { + // convert the TransportSQLQueryRequest request to SQLQueryRequest SQLQueryRequest sqlRequest = createSQLQueryRequest(request); // execute by sql service sqlService.execute(sqlRequest, createListener(sqlRequest, listener)); @@ -60,7 +61,7 @@ public static void execute(TransportQueryRequest request, ActionListener createListener( SQLQueryRequest sqlRequest, - ActionListener listener + ActionListener listener ) { Format format = sqlRequest.format(); ResponseFormatter formatter; @@ -83,21 +84,19 @@ public void onResponse(ExecutionEngine.QueryResponse response) { @Override public void onFailure(Exception e) { if (isClientError(e)) { - Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_CUS).increment(); reportError(listener, e, BAD_REQUEST); } else { - Metrics.getInstance().getNumericalMetric(MetricName.PPL_FAILED_REQ_COUNT_SYS).increment(); reportError(listener, e, SERVICE_UNAVAILABLE); } } }; } - private static void sendResponse(String content, ActionListener listener) { - listener.onResponse(new TransportQueryResponse(content)); + private static void sendResponse(String content, ActionListener listener) { + listener.onResponse(new TransportSQLQueryResponse(content)); } - private static void reportError(ActionListener listener, final Exception exception, final RestStatus status) { + private static void reportError(ActionListener listener, final Exception exception, final RestStatus status) { listener.onFailure(new OpenSearchStatusException(exception.getMessage(), status)); } @@ -112,9 +111,9 @@ private static boolean isClientError(Exception e) { || e instanceof SyntaxCheckException; } - private static SQLQueryRequest createSQLQueryRequest(TransportQueryRequest request) { + private static SQLQueryRequest createSQLQueryRequest(TransportSQLQueryRequest request) { String query = request.getQuery(); String jsonContent = "{\"query\": \"" + query + "\"}"; - return new SQLQueryRequest(new JSONObject(jsonContent), query ,"/_plugins/_sql", Collections.emptyMap()); + return new SQLQueryRequest(new JSONObject(jsonContent), query ,QUERY_API_ENDPOINT, Collections.emptyMap()); } } \ No newline at end of file