Skip to content

Commit

Permalink
[feature](selectdb-cloud) Add test for copy into http data api and us…
Browse files Browse the repository at this point in the history
…erId (apache#1044)

* Add test for copy into http data api and userId
* Add external and internal stage cross use regression case.
  • Loading branch information
deardeng authored Nov 25, 2022
1 parent 40b3bc3 commit 380864b
Show file tree
Hide file tree
Showing 10 changed files with 288 additions and 8 deletions.
2 changes: 0 additions & 2 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4094,8 +4094,6 @@ void MetaServiceImpl::create_stage(::google::protobuf::RpcController* controller
as->add_mysql_user_id(stage.mysql_user_id(0));
as->set_stage_id(stage.stage_id());
} else if (stage.type() == StagePB::EXTERNAL) {
// external stage does not need mysql user_name
stage.clear_mysql_user_name();
instance.add_stages()->CopyFrom(stage);
}
val = instance.SerializeAsString();
Expand Down
2 changes: 1 addition & 1 deletion cloud/src/resource-manager/resource_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,7 @@ std::pair<MetaServiceCode, std::string> ResourceManager::refresh_instance(
for (int i = 0; i < instance.clusters_size(); ++i) {
add_cluster_to_index_no_lock(instance_id, instance.clusters(i));
}
LOG(INFO) << "finish refresing instance, instance_id=" << instance_id << " seq=" << seq;
LOG(INFO) << "finish refreshing instance, instance_id=" << instance_id << " seq=" << seq;
return ret0;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ public class CloudLoadAction extends RestBaseController {

private final String external = "external";

// for ut
public static StatementSubmitter getStmtSubmitter() {
return stmtSubmitter;
}

private boolean isIP(String addr) {
if (Strings.isNullOrEmpty(addr)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.doris.analysis;

import org.apache.doris.catalog.Env;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
Expand Down Expand Up @@ -76,9 +75,8 @@ public void analyze(Analyzer analyzer) throws UserException {

public StagePB toStageProto() throws DdlException {
StagePB.Builder stageBuilder = StagePB.newBuilder();
stageBuilder.addMysqlUserName(ClusterNamespace
.getNameFromFullName(ConnectContext.get().getCurrentUserIdentity().getQualifiedUser()))
.setStageId(UUID.randomUUID().toString());
// external stage doesn't need username
stageBuilder.setStageId(UUID.randomUUID().toString());
switch (type) {
case EXTERNAL:
stageBuilder.setName(getStageName()).setType(StagePB.StageType.EXTERNAL)
Expand Down
164 changes: 164 additions & 0 deletions fe/fe-core/src/test/java/com/selectdb/cloud/http/CopyIntoTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package com.selectdb.cloud.http;

import mockit.Expectations;
import okhttp3.Credentials;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.http.DorisHttpTestCase;
import org.apache.doris.httpv2.util.ExecutionResultSet;
import org.apache.doris.httpv2.util.StatementSubmitter.StmtContext;
import org.apache.doris.utframe.MockedMetaServerFactory;
import org.apache.doris.utframe.UtFrameUtils;
import org.json.simple.JSONObject;
import org.json.simple.JSONValue;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CopyIntoTest extends DorisHttpTestCase {
protected static final String runningDir = "fe/mocked/" + CopyIntoTest.class.getSimpleName() + "/" + UUID.randomUUID() + "/";
protected static int port;
private static final String UPDATE_URI = "/copy/upload";
private static final String QUERY_URI = "/copy/query";

protected String rootAuth = Credentials.basic("root", "");

@BeforeClass
public static void beforeClass() throws Exception {
port = UtFrameUtils.createMetaServer(MockedMetaServerFactory.METASERVER_DEFAULT_IP);
}

@Test
public void testUpload() throws IOException {
FeConstants.runningUnitTest = true;
Request request = new Request.Builder()
.put(RequestBody.create("12345".getBytes()))
.addHeader("Authorization", rootAuth)
.addHeader("Content-Type", "text/plain; charset=UTF-8").url(CloudURI + UPDATE_URI).build();
Response response = networkClient.newCall(request).execute();
Assert.assertNotNull(response.body());
String respStr = response.body().string();
JSONObject jsonObject = (JSONObject) JSONValue.parse(respStr);
Assert.assertEquals(403, (long) jsonObject.get("code"));
String exception = (String) jsonObject.get("data");
Assert.assertTrue(exception.contains("http header must have fileName entry"));

// case 1
request = new Request.Builder()
.put(RequestBody.create("12345".getBytes()))
.addHeader("Authorization", rootAuth)
.addHeader("fileName", "test.csv")
.addHeader("Content-Type", "text/plain; charset=UTF-8").url(CloudURI + UPDATE_URI).build();

Config.cloud_unique_id = "Internal-MetaServiceCode.OK";
Config.meta_service_endpoint = MockedMetaServerFactory.METASERVER_DEFAULT_IP + ":" + port;
response = networkClient.newCall(request).execute();
Assert.assertTrue(response.request().url().toString().contains("http://bucketbucket.cos.ap-beijing.myqcloud.internal.com/ut-test/test.csv"));

// case 2 add header endpointHeader, __USE_ENDPOINT__
request = new Request.Builder()
.put(RequestBody.create("12345".getBytes()))
.addHeader("Authorization", rootAuth)
.addHeader("fileName", "test.csv")
.addHeader("__USE_ENDPOINT__", "internal")
.addHeader("Content-Type", "text/plain; charset=UTF-8").url(CloudURI + UPDATE_URI).build();

Config.cloud_unique_id = "Internal-MetaServiceCode.OK";
Config.meta_service_endpoint = MockedMetaServerFactory.METASERVER_DEFAULT_IP + ":" + port;
response = networkClient.newCall(request).execute();
Assert.assertTrue(response.request().url().toString().contains("http://bucketbucket.cos.ap-beijing.myqcloud.internal.com/ut-test/test.csv"));

// case 3 add header endpointHeader, __USE_ENDPOINT__
request = new Request.Builder()
.put(RequestBody.create("12345".getBytes()))
.addHeader("Authorization", rootAuth)
.addHeader("fileName", "test.csv")
.addHeader("__USE_ENDPOINT__", "external")
.addHeader("Content-Type", "text/plain; charset=UTF-8").url(CloudURI + UPDATE_URI).build();

Config.cloud_unique_id = "Internal-MetaServiceCode.OK";
Config.meta_service_endpoint = MockedMetaServerFactory.METASERVER_DEFAULT_IP + ":" + port;
response = networkClient.newCall(request).execute();
Assert.assertTrue(response.request().url().toString().contains("http://bucketbucket.cos.ap-beijing.myqcloud.com/ut-test/test.csv"));

// case 4 add header endpointHeader, host
request = new Request.Builder()
.put(RequestBody.create("12345".getBytes()))
.addHeader("Authorization", rootAuth)
.addHeader("fileName", "test.csv")
.addHeader("host", "192.168.0.1:7788")
.addHeader("Content-Type", "text/plain; charset=UTF-8").url(CloudURI + UPDATE_URI).build();

Config.cloud_unique_id = "Internal-MetaServiceCode.OK";
Config.meta_service_endpoint = MockedMetaServerFactory.METASERVER_DEFAULT_IP + ":" + port;
response = networkClient.newCall(request).execute();
Assert.assertTrue(response.request().url().toString().contains("http://bucketbucket.cos.ap-beijing.myqcloud.com/ut-test/test.csv"));
}

@Test
public void testQuery() throws IOException, ExecutionException, InterruptedException {
String emptySql = JSONObject.toJSONString(new HashMap(){});
Request request = new Request.Builder().post(RequestBody.create(emptySql.getBytes())).addHeader("Authorization", rootAuth)
.addHeader("Content-Type", "application/json").url(CloudURI + QUERY_URI).build();
Response response = networkClient.newCall(request).execute();
Assert.assertNotNull(response.body());
String respStr = response.body().string();
JSONObject jsonObject = (JSONObject) JSONValue.parse(respStr);
Assert.assertEquals(403, (long) jsonObject.get("code"));
String exception = (String) jsonObject.get("data");
Assert.assertTrue(exception.contains("POST body must contain [sql] root object"));

java.util.concurrent.FutureTask<ExecutionResultSet> ret = new FutureTask<>(() -> new ExecutionResultSet(new HashMap<>()));

HashMap<String, Object> om = new HashMap<>();
HashMap<String, Object> im = new HashMap<>();
im.put("copyId", "copy_1296997def6d4887_9e7ff31a7f3842cc");
im.put("msg", "");
im.put("loadedRows", "");
im.put("state", "CANCELLED");
im.put("type", "LOAD_RUN_FAIL");
im.put("filterRows", "");
im.put("unselectRows", "");
im.put("url", null);
om.put("result", im);
ExecutionResultSet e = new ExecutionResultSet(om);

new Expectations(CloudLoadAction.getStmtSubmitter(), ret) {
{
CloudLoadAction.getStmtSubmitter().submitBlock((StmtContext) any);
minTimes = 0;
result = ret;

ret.get();
minTimes = 0;
result = e;
}
};

Map<String, String> m = new HashMap<>();
m.put("sql", "copy into db1.t2 from @~(\"{t3.dat}\")");
String copyIntoSql = JSONObject.toJSONString(m);
request = new Request.Builder().post(RequestBody.create(copyIntoSql.getBytes())).addHeader("Authorization", rootAuth)
.addHeader("Content-Type", "application/json").url(CloudURI + QUERY_URI).build();
response = networkClient.newCall(request).execute();
respStr = response.body().string();
// {"msg":"success","code":0,"data":{"result":{"copyId":"copy_1296997def6d4887_9e7ff31a7f3842cc","msg":"","loadedRows":"","state":"CANCELLED","type":"LOAD_RUN_FAIL","filterRows":"","unselectRows":"","url":null}},"count":0}
System.out.println(respStr);
jsonObject = (JSONObject) JSONValue.parse(respStr);
Assert.assertEquals(0, (long) jsonObject.get("code"));
JSONObject data = (JSONObject) jsonObject.get("data");
JSONObject result = (JSONObject) data.get("result");
String copyId = (String) result.get("copyId");
Assert.assertEquals(copyId, "copy_1296997def6d4887_9e7ff31a7f3842cc");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,6 @@ public void testStagePB() throws Exception {
StagePB stagePB = createStageStmt.toStageProto();
Assert.assertEquals(StageType.EXTERNAL, stagePB.getType());
Assert.assertEquals("ex_stage_1", stagePB.getName());
Assert.assertEquals(1, stagePB.getMysqlUserNameList().size());
Assert.assertTrue(StringUtils.isNotBlank(stagePB.getStageId()));
ObjectStoreInfoPB objInfo = stagePB.getObjInfo();
Assert.assertEquals("cos.ap-beijing.myqcloud.com", objInfo.getEndpoint());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,10 @@ public abstract class DorisHttpTestCase {
public static int testSchemaHash = 93423942;

public static int HTTP_PORT;
public static int CLOUD_HTTP_PORT;

protected static String URI;
protected static String CloudURI;

protected String rootAuth = Credentials.basic("root", "");

Expand Down Expand Up @@ -330,11 +332,16 @@ private static void assignBackends() {
@BeforeClass
public static void initHttpServer() throws IllegalArgException, InterruptedException {
ServerSocket socket = null;
ServerSocket socketCloud = null;
try {
socket = new ServerSocket(0);
socket.setReuseAddress(true);
HTTP_PORT = socket.getLocalPort();
socketCloud = new ServerSocket(0);
socketCloud.setReuseAddress(true);
CLOUD_HTTP_PORT = socketCloud.getLocalPort();
URI = "http://localhost:" + HTTP_PORT + "/api/" + DB_NAME + "/" + TABLE_NAME;
CloudURI = "http://localhost:" + CLOUD_HTTP_PORT;
} catch (Exception e) {
throw new IllegalStateException("Could not find a free TCP/IP port to start HTTP Server on");
} finally {
Expand All @@ -345,11 +352,19 @@ public static void initHttpServer() throws IllegalArgException, InterruptedExcep
// CHECKSTYLE IGNORE THIS LINE
}
}
if (socketCloud != null) {
try {
socketCloud.close();
} catch (Exception e) {
// CHECKSTYLE IGNORE THIS LINE
}
}
}

FeConstants.runningUnitTest = true;
httpServer = new HttpServer();
httpServer.setPort(HTTP_PORT);
httpServer.setCloudPort(CLOUD_HTTP_PORT);
httpServer.setMaxHttpPostSize(100 * 1024 * 1024);
httpServer.setAcceptors(2);
httpServer.setSelectors(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ public void setUp() throws NoSuchMethodException, SecurityException {
@Test
public void test()
throws IllegalAccessException, IllegalArgumentException, InvocationTargetException, UserException {
new Expectations(auth) {
{
auth.getUserId(anyString);
minTimes = 0;
result = "4802c5dd-38a8-430a-9d8c-a9714690ca9e";
}
};
// 1. create cmy@%
UserIdentity userIdentity = new UserIdentity("cmy", "%");
UserDesc userDesc = new UserDesc(userIdentity, "12345", true);
Expand All @@ -165,10 +172,19 @@ public void test()

try {
auth.createUser(createUserStmt);
String cmyUid = auth.getUserPrivTable().getUserIdByUser("cmy");
Assert.assertEquals(cmyUid, "4802c5dd-38a8-430a-9d8c-a9714690ca9e");
} catch (DdlException e) {
Assert.fail();
}

new Expectations(auth) {
{
auth.getUserId(anyString);
minTimes = 0;
result = "9e804163-db74-408c-8881-5dff1739241c";
}
};
// 1.1 create cmy@% again with IF NOT EXISTS
userIdentity = new UserIdentity("cmy", "%");
userDesc = new UserDesc(userIdentity, "54321", true);
Expand All @@ -182,6 +198,9 @@ public void test()

try {
auth.createUser(createUserStmt);
String cmyUid = auth.getUserPrivTable().getUserIdByUser("cmy");
Assert.assertNotEquals(cmyUid, "9e804163-db74-408c-8881-5dff1739241c");
Assert.assertEquals(cmyUid, "4802c5dd-38a8-430a-9d8c-a9714690ca9e");
} catch (DdlException e) {
Assert.fail();
}
Expand All @@ -192,6 +211,9 @@ public void test()
createUserStmt = new CreateUserStmt(false, userDesc, null);
try {
createUserStmt.analyze(analyzer);
String cmyUid = auth.getUserPrivTable().getUserIdByUser("cmy");
Assert.assertNotEquals(cmyUid, "9e804163-db74-408c-8881-5dff1739241c");
Assert.assertEquals(cmyUid, "4802c5dd-38a8-430a-9d8c-a9714690ca9e");
} catch (UserException e) {
e.printStackTrace();
Assert.fail();
Expand All @@ -217,6 +239,13 @@ public void test()
Assert.assertTrue(currentUser.get(0).equals(userIdentity));

// 3. create another user: zhangsan@"192.%"
new Expectations(auth) {
{
auth.getUserId(anyString);
minTimes = 0;
result = "";
}
};
userIdentity = new UserIdentity("zhangsan", "192.%");
userDesc = new UserDesc(userIdentity, "12345", true);
createUserStmt = new CreateUserStmt(false, userDesc, null);
Expand All @@ -229,6 +258,9 @@ public void test()

try {
auth.createUser(createUserStmt);
String zsUid = auth.getUserPrivTable().getUserIdByUser("zhangsan");
// get a new uuid, not eq ""
Assert.assertNotEquals(zsUid, "");
} catch (DdlException e) {
Assert.fail();
}
Expand Down
Loading

0 comments on commit 380864b

Please sign in to comment.