diff --git a/cloud/src/meta-service/meta_service.cpp b/cloud/src/meta-service/meta_service.cpp index 44cab97b07637c..20483ad519456d 100644 --- a/cloud/src/meta-service/meta_service.cpp +++ b/cloud/src/meta-service/meta_service.cpp @@ -4094,8 +4094,6 @@ void MetaServiceImpl::create_stage(::google::protobuf::RpcController* controller as->add_mysql_user_id(stage.mysql_user_id(0)); as->set_stage_id(stage.stage_id()); } else if (stage.type() == StagePB::EXTERNAL) { - // external stage does not need mysql user_name - stage.clear_mysql_user_name(); instance.add_stages()->CopyFrom(stage); } val = instance.SerializeAsString(); diff --git a/cloud/src/resource-manager/resource_manager.cpp b/cloud/src/resource-manager/resource_manager.cpp index dff3564531d8f5..46ff9cadb0e7db 100644 --- a/cloud/src/resource-manager/resource_manager.cpp +++ b/cloud/src/resource-manager/resource_manager.cpp @@ -870,7 +870,7 @@ std::pair ResourceManager::refresh_instance( for (int i = 0; i < instance.clusters_size(); ++i) { add_cluster_to_index_no_lock(instance_id, instance.clusters(i)); } - LOG(INFO) << "finish refresing instance, instance_id=" << instance_id << " seq=" << seq; + LOG(INFO) << "finish refreshing instance, instance_id=" << instance_id << " seq=" << seq; return ret0; } diff --git a/fe/fe-core/src/main/java/com/selectdb/cloud/http/CloudLoadAction.java b/fe/fe-core/src/main/java/com/selectdb/cloud/http/CloudLoadAction.java index 5bf56afc375e33..d3bfbfe09a3df9 100644 --- a/fe/fe-core/src/main/java/com/selectdb/cloud/http/CloudLoadAction.java +++ b/fe/fe-core/src/main/java/com/selectdb/cloud/http/CloudLoadAction.java @@ -57,6 +57,11 @@ public class CloudLoadAction extends RestBaseController { private final String external = "external"; + // for ut + public static StatementSubmitter getStmtSubmitter() { + return stmtSubmitter; + } + private boolean isIP(String addr) { if (Strings.isNullOrEmpty(addr)) { return false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateStageStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateStageStmt.java index b32cc912bfa276..9d8e6a3815277c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateStageStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateStageStmt.java @@ -18,7 +18,6 @@ package org.apache.doris.analysis; import org.apache.doris.catalog.Env; -import org.apache.doris.cluster.ClusterNamespace; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; @@ -76,9 +75,8 @@ public void analyze(Analyzer analyzer) throws UserException { public StagePB toStageProto() throws DdlException { StagePB.Builder stageBuilder = StagePB.newBuilder(); - stageBuilder.addMysqlUserName(ClusterNamespace - .getNameFromFullName(ConnectContext.get().getCurrentUserIdentity().getQualifiedUser())) - .setStageId(UUID.randomUUID().toString()); + // external stage doesn't need username + stageBuilder.setStageId(UUID.randomUUID().toString()); switch (type) { case EXTERNAL: stageBuilder.setName(getStageName()).setType(StagePB.StageType.EXTERNAL) diff --git a/fe/fe-core/src/test/java/com/selectdb/cloud/http/CopyIntoTest.java b/fe/fe-core/src/test/java/com/selectdb/cloud/http/CopyIntoTest.java new file mode 100644 index 00000000000000..086109973f1c66 --- /dev/null +++ b/fe/fe-core/src/test/java/com/selectdb/cloud/http/CopyIntoTest.java @@ -0,0 +1,164 @@ +package com.selectdb.cloud.http; + +import mockit.Expectations; +import okhttp3.Credentials; +import okhttp3.Request; +import okhttp3.RequestBody; +import okhttp3.Response; +import org.apache.doris.common.Config; +import org.apache.doris.common.FeConstants; +import org.apache.doris.http.DorisHttpTestCase; +import org.apache.doris.httpv2.util.ExecutionResultSet; +import org.apache.doris.httpv2.util.StatementSubmitter.StmtContext; +import org.apache.doris.utframe.MockedMetaServerFactory; +import org.apache.doris.utframe.UtFrameUtils; +import org.json.simple.JSONObject; +import org.json.simple.JSONValue; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; + +public class CopyIntoTest extends DorisHttpTestCase { + protected static final String runningDir = "fe/mocked/" + CopyIntoTest.class.getSimpleName() + "/" + UUID.randomUUID() + "/"; + protected static int port; + private static final String UPDATE_URI = "/copy/upload"; + private static final String QUERY_URI = "/copy/query"; + + protected String rootAuth = Credentials.basic("root", ""); + + @BeforeClass + public static void beforeClass() throws Exception { + port = UtFrameUtils.createMetaServer(MockedMetaServerFactory.METASERVER_DEFAULT_IP); + } + + @Test + public void testUpload() throws IOException { + FeConstants.runningUnitTest = true; + Request request = new Request.Builder() + .put(RequestBody.create("12345".getBytes())) + .addHeader("Authorization", rootAuth) + .addHeader("Content-Type", "text/plain; charset=UTF-8").url(CloudURI + UPDATE_URI).build(); + Response response = networkClient.newCall(request).execute(); + Assert.assertNotNull(response.body()); + String respStr = response.body().string(); + JSONObject jsonObject = (JSONObject) JSONValue.parse(respStr); + Assert.assertEquals(403, (long) jsonObject.get("code")); + String exception = (String) jsonObject.get("data"); + Assert.assertTrue(exception.contains("http header must have fileName entry")); + + // case 1 + request = new Request.Builder() + .put(RequestBody.create("12345".getBytes())) + .addHeader("Authorization", rootAuth) + .addHeader("fileName", "test.csv") + .addHeader("Content-Type", "text/plain; charset=UTF-8").url(CloudURI + UPDATE_URI).build(); + + Config.cloud_unique_id = "Internal-MetaServiceCode.OK"; + Config.meta_service_endpoint = MockedMetaServerFactory.METASERVER_DEFAULT_IP + ":" + port; + response = networkClient.newCall(request).execute(); + Assert.assertTrue(response.request().url().toString().contains("http://bucketbucket.cos.ap-beijing.myqcloud.internal.com/ut-test/test.csv")); + + // case 2 add header endpointHeader, __USE_ENDPOINT__ + request = new Request.Builder() + .put(RequestBody.create("12345".getBytes())) + .addHeader("Authorization", rootAuth) + .addHeader("fileName", "test.csv") + .addHeader("__USE_ENDPOINT__", "internal") + .addHeader("Content-Type", "text/plain; charset=UTF-8").url(CloudURI + UPDATE_URI).build(); + + Config.cloud_unique_id = "Internal-MetaServiceCode.OK"; + Config.meta_service_endpoint = MockedMetaServerFactory.METASERVER_DEFAULT_IP + ":" + port; + response = networkClient.newCall(request).execute(); + Assert.assertTrue(response.request().url().toString().contains("http://bucketbucket.cos.ap-beijing.myqcloud.internal.com/ut-test/test.csv")); + + // case 3 add header endpointHeader, __USE_ENDPOINT__ + request = new Request.Builder() + .put(RequestBody.create("12345".getBytes())) + .addHeader("Authorization", rootAuth) + .addHeader("fileName", "test.csv") + .addHeader("__USE_ENDPOINT__", "external") + .addHeader("Content-Type", "text/plain; charset=UTF-8").url(CloudURI + UPDATE_URI).build(); + + Config.cloud_unique_id = "Internal-MetaServiceCode.OK"; + Config.meta_service_endpoint = MockedMetaServerFactory.METASERVER_DEFAULT_IP + ":" + port; + response = networkClient.newCall(request).execute(); + Assert.assertTrue(response.request().url().toString().contains("http://bucketbucket.cos.ap-beijing.myqcloud.com/ut-test/test.csv")); + + // case 4 add header endpointHeader, host + request = new Request.Builder() + .put(RequestBody.create("12345".getBytes())) + .addHeader("Authorization", rootAuth) + .addHeader("fileName", "test.csv") + .addHeader("host", "192.168.0.1:7788") + .addHeader("Content-Type", "text/plain; charset=UTF-8").url(CloudURI + UPDATE_URI).build(); + + Config.cloud_unique_id = "Internal-MetaServiceCode.OK"; + Config.meta_service_endpoint = MockedMetaServerFactory.METASERVER_DEFAULT_IP + ":" + port; + response = networkClient.newCall(request).execute(); + Assert.assertTrue(response.request().url().toString().contains("http://bucketbucket.cos.ap-beijing.myqcloud.com/ut-test/test.csv")); + } + + @Test + public void testQuery() throws IOException, ExecutionException, InterruptedException { + String emptySql = JSONObject.toJSONString(new HashMap(){}); + Request request = new Request.Builder().post(RequestBody.create(emptySql.getBytes())).addHeader("Authorization", rootAuth) + .addHeader("Content-Type", "application/json").url(CloudURI + QUERY_URI).build(); + Response response = networkClient.newCall(request).execute(); + Assert.assertNotNull(response.body()); + String respStr = response.body().string(); + JSONObject jsonObject = (JSONObject) JSONValue.parse(respStr); + Assert.assertEquals(403, (long) jsonObject.get("code")); + String exception = (String) jsonObject.get("data"); + Assert.assertTrue(exception.contains("POST body must contain [sql] root object")); + + java.util.concurrent.FutureTask ret = new FutureTask<>(() -> new ExecutionResultSet(new HashMap<>())); + + HashMap om = new HashMap<>(); + HashMap im = new HashMap<>(); + im.put("copyId", "copy_1296997def6d4887_9e7ff31a7f3842cc"); + im.put("msg", ""); + im.put("loadedRows", ""); + im.put("state", "CANCELLED"); + im.put("type", "LOAD_RUN_FAIL"); + im.put("filterRows", ""); + im.put("unselectRows", ""); + im.put("url", null); + om.put("result", im); + ExecutionResultSet e = new ExecutionResultSet(om); + + new Expectations(CloudLoadAction.getStmtSubmitter(), ret) { + { + CloudLoadAction.getStmtSubmitter().submitBlock((StmtContext) any); + minTimes = 0; + result = ret; + + ret.get(); + minTimes = 0; + result = e; + } + }; + + Map m = new HashMap<>(); + m.put("sql", "copy into db1.t2 from @~(\"{t3.dat}\")"); + String copyIntoSql = JSONObject.toJSONString(m); + request = new Request.Builder().post(RequestBody.create(copyIntoSql.getBytes())).addHeader("Authorization", rootAuth) + .addHeader("Content-Type", "application/json").url(CloudURI + QUERY_URI).build(); + response = networkClient.newCall(request).execute(); + respStr = response.body().string(); + // {"msg":"success","code":0,"data":{"result":{"copyId":"copy_1296997def6d4887_9e7ff31a7f3842cc","msg":"","loadedRows":"","state":"CANCELLED","type":"LOAD_RUN_FAIL","filterRows":"","unselectRows":"","url":null}},"count":0} + System.out.println(respStr); + jsonObject = (JSONObject) JSONValue.parse(respStr); + Assert.assertEquals(0, (long) jsonObject.get("code")); + JSONObject data = (JSONObject) jsonObject.get("data"); + JSONObject result = (JSONObject) data.get("result"); + String copyId = (String) result.get("copyId"); + Assert.assertEquals(copyId, "copy_1296997def6d4887_9e7ff31a7f3842cc"); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/StageTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/StageTest.java index 51ca00b9ff8733..d5ca6c1814f076 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/StageTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/StageTest.java @@ -298,7 +298,6 @@ public void testStagePB() throws Exception { StagePB stagePB = createStageStmt.toStageProto(); Assert.assertEquals(StageType.EXTERNAL, stagePB.getType()); Assert.assertEquals("ex_stage_1", stagePB.getName()); - Assert.assertEquals(1, stagePB.getMysqlUserNameList().size()); Assert.assertTrue(StringUtils.isNotBlank(stagePB.getStageId())); ObjectStoreInfoPB objInfo = stagePB.getObjInfo(); Assert.assertEquals("cos.ap-beijing.myqcloud.com", objInfo.getEndpoint()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java index ca336cb46887a7..6f380645ee808b 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java +++ b/fe/fe-core/src/test/java/org/apache/doris/http/DorisHttpTestCase.java @@ -113,8 +113,10 @@ public abstract class DorisHttpTestCase { public static int testSchemaHash = 93423942; public static int HTTP_PORT; + public static int CLOUD_HTTP_PORT; protected static String URI; + protected static String CloudURI; protected String rootAuth = Credentials.basic("root", ""); @@ -330,11 +332,16 @@ private static void assignBackends() { @BeforeClass public static void initHttpServer() throws IllegalArgException, InterruptedException { ServerSocket socket = null; + ServerSocket socketCloud = null; try { socket = new ServerSocket(0); socket.setReuseAddress(true); HTTP_PORT = socket.getLocalPort(); + socketCloud = new ServerSocket(0); + socketCloud.setReuseAddress(true); + CLOUD_HTTP_PORT = socketCloud.getLocalPort(); URI = "http://localhost:" + HTTP_PORT + "/api/" + DB_NAME + "/" + TABLE_NAME; + CloudURI = "http://localhost:" + CLOUD_HTTP_PORT; } catch (Exception e) { throw new IllegalStateException("Could not find a free TCP/IP port to start HTTP Server on"); } finally { @@ -345,11 +352,19 @@ public static void initHttpServer() throws IllegalArgException, InterruptedExcep // CHECKSTYLE IGNORE THIS LINE } } + if (socketCloud != null) { + try { + socketCloud.close(); + } catch (Exception e) { + // CHECKSTYLE IGNORE THIS LINE + } + } } FeConstants.runningUnitTest = true; httpServer = new HttpServer(); httpServer.setPort(HTTP_PORT); + httpServer.setCloudPort(CLOUD_HTTP_PORT); httpServer.setMaxHttpPostSize(100 * 1024 * 1024); httpServer.setAcceptors(2); httpServer.setSelectors(4); diff --git a/fe/fe-core/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java b/fe/fe-core/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java index 85b09c50638644..472b0f62152c44 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/mysql/privilege/AuthTest.java @@ -152,6 +152,13 @@ public void setUp() throws NoSuchMethodException, SecurityException { @Test public void test() throws IllegalAccessException, IllegalArgumentException, InvocationTargetException, UserException { + new Expectations(auth) { + { + auth.getUserId(anyString); + minTimes = 0; + result = "4802c5dd-38a8-430a-9d8c-a9714690ca9e"; + } + }; // 1. create cmy@% UserIdentity userIdentity = new UserIdentity("cmy", "%"); UserDesc userDesc = new UserDesc(userIdentity, "12345", true); @@ -165,10 +172,19 @@ public void test() try { auth.createUser(createUserStmt); + String cmyUid = auth.getUserPrivTable().getUserIdByUser("cmy"); + Assert.assertEquals(cmyUid, "4802c5dd-38a8-430a-9d8c-a9714690ca9e"); } catch (DdlException e) { Assert.fail(); } + new Expectations(auth) { + { + auth.getUserId(anyString); + minTimes = 0; + result = "9e804163-db74-408c-8881-5dff1739241c"; + } + }; // 1.1 create cmy@% again with IF NOT EXISTS userIdentity = new UserIdentity("cmy", "%"); userDesc = new UserDesc(userIdentity, "54321", true); @@ -182,6 +198,9 @@ public void test() try { auth.createUser(createUserStmt); + String cmyUid = auth.getUserPrivTable().getUserIdByUser("cmy"); + Assert.assertNotEquals(cmyUid, "9e804163-db74-408c-8881-5dff1739241c"); + Assert.assertEquals(cmyUid, "4802c5dd-38a8-430a-9d8c-a9714690ca9e"); } catch (DdlException e) { Assert.fail(); } @@ -192,6 +211,9 @@ public void test() createUserStmt = new CreateUserStmt(false, userDesc, null); try { createUserStmt.analyze(analyzer); + String cmyUid = auth.getUserPrivTable().getUserIdByUser("cmy"); + Assert.assertNotEquals(cmyUid, "9e804163-db74-408c-8881-5dff1739241c"); + Assert.assertEquals(cmyUid, "4802c5dd-38a8-430a-9d8c-a9714690ca9e"); } catch (UserException e) { e.printStackTrace(); Assert.fail(); @@ -217,6 +239,13 @@ public void test() Assert.assertTrue(currentUser.get(0).equals(userIdentity)); // 3. create another user: zhangsan@"192.%" + new Expectations(auth) { + { + auth.getUserId(anyString); + minTimes = 0; + result = ""; + } + }; userIdentity = new UserIdentity("zhangsan", "192.%"); userDesc = new UserDesc(userIdentity, "12345", true); createUserStmt = new CreateUserStmt(false, userDesc, null); @@ -229,6 +258,9 @@ public void test() try { auth.createUser(createUserStmt); + String zsUid = auth.getUserPrivTable().getUserIdByUser("zhangsan"); + // get a new uuid, not eq "" + Assert.assertNotEquals(zsUid, ""); } catch (DdlException e) { Assert.fail(); } diff --git a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedMetaServerFactory.java b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedMetaServerFactory.java index 991d181b728954..4b0769eb588194 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedMetaServerFactory.java +++ b/fe/fe-core/src/test/java/org/apache/doris/utframe/MockedMetaServerFactory.java @@ -19,8 +19,15 @@ import com.selectdb.cloud.proto.MetaServiceGrpc; import com.selectdb.cloud.proto.SelectdbCloud; +import com.selectdb.cloud.proto.SelectdbCloud.GetStageResponse; import com.selectdb.cloud.proto.SelectdbCloud.MetaServiceCode; +import com.selectdb.cloud.proto.SelectdbCloud.MetaServiceResponseStatus; +import com.selectdb.cloud.proto.SelectdbCloud.ObjectStoreInfoPB; +import com.selectdb.cloud.proto.SelectdbCloud.ObjectStoreInfoPB.Provider; +import com.selectdb.cloud.proto.SelectdbCloud.StagePB; import io.grpc.stub.StreamObserver; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.io.IOException; @@ -33,6 +40,7 @@ public class MockedMetaServerFactory { public static final String METASERVER_DEFAULT_IP = "127.0.0.100"; public static final int METASERVER_DEFAULT_BRPC_PORT = 5001; + private static final Logger LOG = LogManager.getLogger(MockedMetaServerFactory.class); // create a mocked meta server with customize parameters public static MockedMetaServer createMetaServer(String host, int brpcPort, @@ -52,5 +60,31 @@ public void getVersion(SelectdbCloud.GetVersionRequest request, .setVersion(1).build()); responseObserver.onCompleted(); } + + @Override + public void getStage(SelectdbCloud.GetStageRequest request, + StreamObserver responseObserver) { + if (request.hasCloudUniqueId()) { + // reuse uniqueId for mock ut response + switch (request.getCloudUniqueId()) { + case "Internal-MetaServiceCode.OK": + ObjectStoreInfoPB obj = ObjectStoreInfoPB.newBuilder() + .setEndpoint("cos.ap-beijing.myqcloud.internal.com") + .setAk("akak").setSk("sksk").setRegion("ap-beijing") + .setBucket("bucketbucket").setExternalEndpoint("cos.ap-beijing.myqcloud.com") + .setPrefix("ut-test").setProvider(Provider.OSS).build(); + StagePB stage = StagePB.newBuilder().setObjInfo(obj).build(); + GetStageResponse resp = GetStageResponse.newBuilder() + .setStatus(MetaServiceResponseStatus.newBuilder().setCode(MetaServiceCode.OK).setMsg("OK")) + .addStage(stage).build(); + responseObserver.onNext(resp); + responseObserver.onCompleted(); + LOG.info("mock get Stage request: {}, response: {}", request, resp); + return; + default: + return; + } + } + } } } diff --git a/regression-test/suites/cloud/copy_into/test_internal_stage.groovy b/regression-test/suites/cloud/copy_into/test_internal_stage.groovy index 9a6b9222ce4a29..9687b4d1f49c86 100644 --- a/regression-test/suites/cloud/copy_into/test_internal_stage.groovy +++ b/regression-test/suites/cloud/copy_into/test_internal_stage.groovy @@ -1,6 +1,41 @@ import org.codehaus.groovy.runtime.IOGroovyMethods suite("test_internal_stage") { + // Internal and external stage cross use + def tableNamExternal = "customer_external_stage" + def externalStageName = "internal_external_stage_cross_use" + try { + sql """ DROP TABLE IF EXISTS ${tableNamExternal}; """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableNamExternal} ( + C_CUSTKEY INTEGER NOT NULL, + C_NAME VARCHAR(25) NOT NULL, + C_ADDRESS VARCHAR(40) NOT NULL, + C_NATIONKEY INTEGER NOT NULL, + C_PHONE CHAR(15) NOT NULL, + C_ACCTBAL DECIMAL(15,2) NOT NULL, + C_MKTSEGMENT CHAR(10) NOT NULL, + C_COMMENT VARCHAR(117) NOT NULL + ) + UNIQUE KEY(C_CUSTKEY) + DISTRIBUTED BY HASH(C_CUSTKEY) BUCKETS 1 + """ + + sql """ + create stage if not exists ${externalStageName} + properties ('endpoint' = '${getS3Endpoint()}' , + 'region' = '${getS3Region()}' , + 'bucket' = '${getS3BucketName()}' , + 'prefix' = 'regression' , + 'ak' = '${getS3AK()}' , + 'sk' = '${getS3SK()}' , + 'provider' = '${getProvider()}', + 'default.file.column_separator' = "|"); + """ + } finally { + try_sql("DROP TABLE IF EXISTS ${tableNamExternal}") + } + def tableName = "customer_internal_stage" def fileName = "internal_customer.csv" def filePath = "${context.config.dataPath}/cloud/copy_into/" + fileName