diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml deleted file mode 100644 index ce4dcbb..0000000 --- a/.github/workflows/build.yaml +++ /dev/null @@ -1,31 +0,0 @@ -name: build -on: - push: - branches: [master] - pull_request: - branches: - - master - - "v[0-9]+.*" - -jobs: - - build: - name: Build - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - - name: Set up JDK 1.8 - uses: actions/setup-java@v1 - with: - java-version: 1.8 - - - name: Cache the Maven packages to speed up build - uses: actions/cache@v2 - with: - path: ~/.m2/repository - key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }} - restore-keys: ${{ runner.os }}-maven- - - - name: Build with Maven - run: cd util/LdbcGoStep && mvn -B package - diff --git a/.github/workflows/import.yaml b/.github/workflows/nebula-bench.yaml similarity index 100% rename from .github/workflows/import.yaml rename to .github/workflows/nebula-bench.yaml diff --git a/README.md b/README.md index 4ebfdba..366f1f8 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,14 @@ It only support nebula graph 2.0+ release. The main features: * Generate the LDBC dataset and then import into nebula-graph. -* nebula-graph benchmark. (WIP) +* Run load test with k6. + +## Dependency + +| Nebula Bench | Nebua | Nebula Importer | K6 Plugin | ldbc_snb_datagen | Nebula-go | +|:-----------------:|:-------------:|:---------------:|:------------:|:-------------------:|:--------------:| +| v0.2 | v2.0.1 | v2.0.0-ga | v0.0.6 | v0.3.3 | v2.0.0-ga | +| master | v2.0.1 | v2.0.0-ga | v0.0.6 | v0.3.3 | v2.0.0-ga | ## How to use diff --git a/README_cn.md b/README_cn.md index 7c25674..96020f0 100644 --- a/README_cn.md +++ b/README_cn.md @@ -7,7 +7,14 @@ 主要功能: * 生产 LDBC 数据集并导入 nebula graph。 -* nebula-graph benchmark。 (未完成) +* 用 k6 进行压测。 + +## 工具依赖 + +| Nebula Bench | Nebua | Nebula Importer | K6 Plugin | ldbc_snb_datagen | Nebula-go | +|:-----------------:|:-------------:|:---------------:|:------------:|:-------------------:|:--------------:| +| v0.2 | v2.0.1 | v2.0.0-ga | v0.0.6 | v0.3.3 | v2.0.0-ga | +| master | v2.0.1 | v2.0.0-ga | v0.0.6 | v0.3.3 | v2.0.0-ga | ## 使用说明 @@ -179,8 +186,6 @@ awk -F ',' 'NR>1{print $NF}' output/output_Go1Step.csv |sort|uniq -c latency 的单位是 `us`。 -如果使用 Jmeter,暂时没有自动化操作,可以通过手动调整 Jmeter 来测试,具体参考 [jmx](ldbc/jmx/go_step.jmx) 和 [java](util/LdbcGoStep/src/main/java/vesoft/LdbcGoStep.java)。 - ## 更多 * 生成的数据文件,如果是 `aaa_xxYY_bbb` 格式,比如 `comment_hasTag_tag`,会认为是一个边类型,然后边的格式是 `XX_YY`。和 ldbc 保持一致 [ldbc_snb_interactive](https://github.com/ldbc/ldbc_snb_interactive/blob/main/cypher/queries/interactive-complex-1.cypher) diff --git a/ldbc/jmx/go_step.jmx b/ldbc/jmx/go_step.jmx deleted file mode 100644 index 88cf221..0000000 --- a/ldbc/jmx/go_step.jmx +++ /dev/null @@ -1,92 +0,0 @@ - - - - - - false - true - false - - - - - - - - stoptestnow - - false - -1 - - {nums} - 0 - true - {duration} - - true - - - - - - - hosts - {hosts} - = - - - maxconn - 10 - = - - - user - {user} - = - - - pwd - {pwd} - = - - - space - {space} - = - - - - var2 - ${c2} - = - - - - nGQL - GO 3 STEP FROM var2 OVER knows - = - - - - - com.vesoft.LdbcGoStep - - - - | - - person.csv - false - false - true - shareMode.all - false - c1,c2,c3,c4,c5,c6,c7,c8 - - - - - - - - \ No newline at end of file diff --git a/ldbc/scripts/statistics.py b/ldbc/scripts/statistics.py deleted file mode 100644 index d8476c5..0000000 --- a/ldbc/scripts/statistics.py +++ /dev/null @@ -1,246 +0,0 @@ -import sys -import pandas as pd -import numpy as np -import pymysql -import datetime -from optparse import OptionParser -import json - - -def insert_mertrics(sql, conf): - conn = pymysql.connect( - host=conf["ip"], port=conf["port"], user=conf["user"], passwd=conf["pwd"], db=conf["db"] - ) - cur = conn.cursor() - cur.execute(sql) - conn.commit() - conn.close() - - -def df_throughput(df): - df["finish"] = df["timeStamp"] + df["elapsed"] / 1000 - - start_time = df["timeStamp"].min() - end_time = df["finish"].max() - count = len(df) - throughput = count / ((end_time - start_time) / 1000) - print("throughput:", throughput) - dic_throughput = {} - dic_throughput["start_time"] = datetime.datetime.fromtimestamp(int(start_time / 1000)).strftime( - "%Y-%m-%d %H:%M:%S" - ) - dic_throughput["end_time"] = datetime.datetime.fromtimestamp(int(end_time / 1000)).strftime( - "%Y-%m-%d %H:%M:%S" - ) - dic_throughput["throughput"] = throughput - return dic_throughput - - -def df_statistics(df, label, column_name): - df_sort = df.sort_values(by=column_name, ascending=True).reset_index(drop=True) - count = df_sort.agg({column_name: ["count"]})[column_name].values[0] - fail_count = df[df["success"] == False].count()["success"] - - p90 = int(len(df_sort) * 0.90) - p95 = int(len(df_sort) * 0.95) - p99 = int(len(df_sort) * 0.99) - if p90 >= count: - p90 = count - 1 - if p90 >= count: - p95 = count - 1 - if p90 >= count: - p99 = count - 1 - if column_name == "elapsed": - p90_value = df_sort.loc[p90, [column_name]][column_name] / 1000 - p95_value = df_sort.loc[p95, [column_name]][column_name] / 1000 - p99_value = df_sort.loc[p99, [column_name]][column_name] / 1000 - min_value = df_sort.agg({column_name: ["min"]})[column_name].values[0] / 1000 - max_value = df_sort.agg({column_name: ["max"]})[column_name].values[0] / 1000 - avg = df_sort.agg({column_name: ["mean"]})[column_name].values[0] / 1000 - median = df_sort.agg({column_name: ["median"]})[column_name].values[0] / 1000 - elif column_name == "Latency": - p90_value = df_sort.loc[p90, [column_name]][column_name] / 1000 - p95_value = df_sort.loc[p95, [column_name]][column_name] / 1000 - p99_value = df_sort.loc[p99, [column_name]][column_name] / 1000 - min_value = df_sort.agg({column_name: ["min"]})[column_name].values[0] / 1000 - max_value = df_sort.agg({column_name: ["max"]})[column_name].values[0] / 1000 - avg = df_sort.agg({column_name: ["mean"]})[column_name].values[0] / 1000 - median = df_sort.agg({column_name: ["median"]})[column_name].values[0] / 1000 - - # sql='insert perf_statisticss(casename,label,nebula_version,starttime,endtime,count,fail_count,min,max,avg,median,p90,p95,p99,throughput) \ - # values (\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',{},{},{},{},{},{},{},{},{},{})\ - # .format(casename,lable,nebula_version,); - print( - "statistics:", - column_name, - "label:", - label, - "count:", - count, - "fail_count:", - fail_count, - "min:", - min_value, - "max_value:", - max_value, - "avg:", - avg, - "median", - median, - "p90:", - p90_value, - "p95:", - p95_value, - "p99:", - p99_value, - ) - dic_statistics = {} - dic_statistics["column_name"] = column_name - dic_statistics["label"] = label - dic_statistics["count"] = count - dic_statistics["fail_count"] = fail_count - dic_statistics["min"] = min_value - dic_statistics["max"] = max_value - dic_statistics["avg"] = avg - dic_statistics["median"] = median - dic_statistics["p90"] = p90_value - dic_statistics["p95"] = p95_value - dic_statistics["p99"] = p99_value - return dic_statistics - - -if __name__ == "__main__": - opt_parser = OptionParser() - opt_parser.add_option("-f", "--filename", dest="filename", default="", help="case1.jtl") - opt_parser.add_option("-c", "--casename", dest="casename", default="", help="2.1") - opt_parser.add_option("-v", "--vesion", dest="nebula_version", default="", help="version") - opt_parser.add_option( - "-m", - "--mysqlconf", - dest="mysqlconf", - default="", - help='{"ip":"127.0.0.1","port":3306,"user":"root","pwd":"xxx","db":"xxx"}', - ) - options, args = opt_parser.parse_args() - - if options.filename == "": - print("please input csv filename") - sys.exit(1) - - if options.mysqlconf != "": - try: - print(options.mysqlconf) - mysql_conf_json = json.loads(options.mysqlconf) - except: - print("mysqlconf is invalid, please check!") - sys.exit(1) - - if options.casename == "": - print("please input casename") - sys.exit(1) - - if options.nebula_version == "": - print("please input nebula_version") - sys.exit(1) - - # read file - try: - df = pd.read_csv(options.filename, header=0, sep=",") - except: - print("read csv file failed") - sys.exit(1) - else: - print("read csv to df success") - - if len(df) == 0: - print("there is no data to statistics") - sys.exit(0) - - # delete somedata - df.drop(df.head((int)(len(df) * 0.1)).index, inplace=True) - df.drop(df.tail((int)(len(df) * 0.1)).index, inplace=True) - - # statistics client time - try: - dic_elapsed_statistics = df_statistics(df, "total", "elapsed") - for label, group in df.groupby(["label"]): - df_statistics(group, label, "elapsed") - except: - print("statistics elapsed failed , please check the data in file") - sys.exit(1) - else: - print("statistics elapsed finished!") - - # statistics server time - try: - dic_latency_statistics = df_statistics(df, "total", "Latency") - for label, group in df.groupby(["label"]): - df_statistics(group, label, "Latency") - except: - print("statistics Latency failed , please check the data in file") - sys.exit(1) - else: - print("statistics Latency finished!") - - try: - dic_throughput = df_throughput(df) - except: - print("statistics throughput failed , please check the data in file") - sys.exit(1) - else: - print("statistics throughput finished!") - - if options.mysqlconf != "": - client_metrics_sql = "insert perf_metrics(casename,label,nebula_version,starttime,endtime,count,fail_count,min,max,avg,median,p90,p95,p99,throughput) \ - values ('{}','{}','{}','{}','{}',{},{},{},{},{},{},{},{},{},{})".format( - options.casename, - "client_time", - options.nebula_version, - dic_throughput["start_time"], - dic_throughput["end_time"], - dic_elapsed_statistics["count"], - dic_elapsed_statistics["fail_count"], - dic_elapsed_statistics["min"], - dic_elapsed_statistics["max"], - dic_elapsed_statistics["avg"], - dic_elapsed_statistics["median"], - dic_elapsed_statistics["p90"], - dic_elapsed_statistics["p95"], - dic_elapsed_statistics["p99"], - dic_throughput["throughput"], - ) - - server_metrics_sql = "insert perf_metrics(casename,label,nebula_version,starttime,endtime,count,fail_count,min,max,avg,median,p90,p95,p99,throughput) \ - values ('{}','{}','{}','{}','{}',{},{},{},{},{},{},{},{},{},{})".format( - options.casename, - "server_time", - options.nebula_version, - dic_throughput["start_time"], - dic_throughput["end_time"], - dic_latency_statistics["count"], - dic_latency_statistics["fail_count"], - dic_latency_statistics["min"], - dic_latency_statistics["max"], - dic_latency_statistics["avg"], - dic_latency_statistics["median"], - dic_latency_statistics["p90"], - dic_latency_statistics["p95"], - dic_latency_statistics["p99"], - dic_throughput["throughput"], - ) - - try: - insert_mertrics(client_metrics_sql, mysql_conf_json) - except: - print("insert client metrics to mysql failed!") - sys.exit(1) - else: - print("insert client metrics to mysql finished!") - - try: - insert_mertrics(server_metrics_sql, mysql_conf_json) - except: - print("insert server metrics to mysql failed!") - sys.exit(1) - else: - print("insert server metrics to mysql finished!") diff --git a/ldbc/setup/requirements.txt b/ldbc/setup/requirements.txt deleted file mode 100644 index 3b2d239..0000000 --- a/ldbc/setup/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -pandas==1.1.5 -numpy==1.19.5 -pymysql==1.0.2 diff --git a/ldbc/setup/setup.sh b/ldbc/setup/setup.sh deleted file mode 100755 index 9f4f897..0000000 --- a/ldbc/setup/setup.sh +++ /dev/null @@ -1,44 +0,0 @@ -#! /usr/bin/env bash -curr_path=$(readlink -f "$(dirname "$0")") -java_prj_path=${curr_path}/../../util/LdbcGoStep - -if [ $# -lt 1 ] ; then - echo "Please input jmeter install path" - exit -else - jmeter_install_path=$1 - echo "jmeter install path is $1" -fi - -cd ${java_prj_path} -mvn package -if [ $? != 0 ] ; then - cd - - echo "mvn package failed!" - exit -fi - -cd - - -wget -P $jmeter_install_path https://mirrors.bfsu.edu.cn/apache/jmeter/binaries/apache-jmeter-5.4.zip -unzip ${jmeter_install_path}/apache-jmeter-5.4.zip -d $jmeter_install_path -if [ $? != 0 ] ; then - echo "install jmeter failed!" - exit -fi - -jar_path=${java_prj_path}/target/LdbcGoStep-2-jar-with-dependencies.jar -jmeter_lib_path=${jmeter_install_path}/apache-jmeter-5.4/lib/ext -cp -rf ${jar_path} ${jmeter_lib_path}/LdbcGoStep-2-jar-with-dependencies.jar -if [ $? != 0 ] ; then - echo "cp -rf ${jar_path} ${jmeter_lib_path} failed" - exit -fi - - -pip3 install --user -r ${curr_path}/requirements.txt -i https://mirrors.aliyun.com/pypi/simple/ - -if [ $? != 0 ] ; then - echo "install python3 pkg failed!" - exit -fi diff --git a/ldbc/sql/perf_metric.sql b/ldbc/sql/perf_metric.sql deleted file mode 100644 index b0fd265..0000000 --- a/ldbc/sql/perf_metric.sql +++ /dev/null @@ -1,22 +0,0 @@ -drop database if exists perftest; - -create database perftest; - -use perftest1; -create table perf_metrics( -casename varchar(50), -label varchar(50), -nebula_version varchar(10), -starttime timestamp, -endtime timestamp, -count int, -fail_count int, -min double, -max double, -avg double, -median double, -p90 double, -p95 double, -p99 double, -throughput double -); diff --git a/nebula_bench/cli.py b/nebula_bench/cli.py index 4d6db15..5ee29ce 100644 --- a/nebula_bench/cli.py +++ b/nebula_bench/cli.py @@ -97,29 +97,9 @@ def importer(folder, address, user, password, space, vid_type, dry_run): c = nc.import_space(dry_run) if c != 0: exit(c) - if not dry_run: - click.echo("begin space compact") - nc.compact() - click.echo("end space compact") - nc.release() -@nebula.command(help="initial nebula graph, including create indexes") -@common -def init(folder, address, user, password, space): - nc = NebulaController( - data_folder=folder, - user=user, - password=password, - address=address, - space=space, - vid_type="int", - ) - - nc.init_space() - - -@cli.group() +@cli.group(help="stress testing") def stress(): pass @@ -158,8 +138,6 @@ def run( ) stress.run() - pass - @stress.command() def scenarios(): diff --git a/nebula_bench/common/base.py b/nebula_bench/common/base.py index 9d376d9..892b78c 100644 --- a/nebula_bench/common/base.py +++ b/nebula_bench/common/base.py @@ -1,137 +1,7 @@ # -*- encoding: utf-8 -*- -import re -import time -from collections import deque -import csv -from pathlib import Path -from collections import namedtuple - -from nebula2.gclient.net import ConnectionPool -from nebula2.Config import Config -from nebula2.data.ResultSet import ResultSet - -from nebula_bench import setting from nebula_bench.utils import logger -class CSVReader(object): - def __init__(self, file): - try: - file = open(file) - except TypeError: - # "file" was already a pre-opened file-like object - pass - self.file = file - self.reader = csv.reader(file) - - def __next__(self): - try: - return next(self.reader) - except StopIteration: - # reuse file on EOF - self.file.seek(0, 0) - return next(self.reader) - - -class NebulaClient(object): - def __init__( - self, - max_connection=None, - user=None, - password=None, - address=None, - space=None, - timeout=None, - ): - self.user = user or setting.NEBULA_USER - self.password = password or setting.NEBULA_PASSWORD - self.space = space or setting.NEBULA_SPACE - self.config = Config() - self.config.max_connection_pool_size = max_connection or setting.NEBULA_MAX_CONNECTION - - if timeout is not None: - self.config.timeout = timeout - - address = address or setting.NEBULA_ADDRESS - address_list = address.split(",") - self.address = [(item.split(":")[0], int(item.split(":")[1])) for item in address_list] - - self.deque = deque(maxlen=self.config.max_connection_pool_size) - - # init connection pool - self.connection_pool = ConnectionPool() - # if the given servers are ok, return true, else return false - ok = self.connection_pool.init(self.address, self.config) - assert ok, "cannot connect the server, address is {}".format(address) - - def add_session(self): - _session = self.connection_pool.get_session(self.user, self.password) - _session.execute("USE {}".format(self.space)) - self.deque.append(_session) - - def release_session(self): - _session = None - try: - _session = self.deque.pop() - except: - pass - if _session is not None: - _session.release() - - def execute(self, stmt): - if len(self.deque) == 0: - self.add_session() - _session = self.deque.popleft() - try: - r = _session.execute(stmt) - except Exception as e: - logger.error("execute stmt error, e is {}".format(e)) - r = None - finally: - self.deque.append(_session) - return r - - def release(self): - for _session in self.deque: - _session.release() - self.connection_pool.close() - - -re_pattern = r"\$\{\}" - - -class StmtGenerator(object): - def __init__(self, stmt_template, parameters, data_folder): - """ - - :param stmt_template: statement template - :param parameters: ((csv_file, index),) - """ - self.csv_reader_list = [] - self.index_list = [] - self.stmt_template = stmt_template - self.data_folder = Path(data_folder) - - for p in parameters: - csv_file, index = p - csv_path = str((self.data_folder / csv_file).absolute()) - csv_reader = CSVReader(csv_path) - self.csv_reader_list.append(csv_reader) - self.index_list.append(index) - - def __next__(self): - _stmt = self.stmt_template - for _index, csv_reader in enumerate(self.csv_reader_list): - index = self.index_list[_index] - - line = ",".join(next(csv_reader)) - value = line.split("|")[index] - - _stmt = re.sub(re_pattern, value, _stmt, count=1) - - return _stmt - - class ScenarioMeta(type): def __new__(cls, name, bases, attrs, *args, **kwargs): # super(ScenarioMeta, cls).__new__(cls, name, bases, attrs, *args, **kwargs) @@ -151,11 +21,3 @@ class BaseScenario(metaclass=ScenarioMeta): csv_path: str csv_index: list name: str - - -class BaseQuery(object): - queries: tuple - - -class BaseImport(object): - pass diff --git a/nebula_bench/controller.py b/nebula_bench/controller.py index 7cd7a6e..078af1b 100644 --- a/nebula_bench/controller.py +++ b/nebula_bench/controller.py @@ -1,12 +1,10 @@ # -*- coding: utf-8 -*- -import time from pathlib import Path -from datetime import datetime from nebula_bench import parser from nebula_bench import setting from nebula_bench.utils import logger -from nebula_bench.common.base import BaseScenario, BaseQuery, NebulaClient +from nebula_bench.common.base import BaseScenario from nebula_bench import utils @@ -20,75 +18,6 @@ def __init__(self, data_folder=None, space=None, user=None, password=None, addre self.password = password or setting.NEBULA_PASSWORD self.address = address or setting.NEBULA_ADDRESS - self.nebula_client = NebulaClient(max_connection=5, address=self.address) - self.nebula_session = self.nebula_client.connection_pool.get_session( - self.user, self.password - ) - self.nebula_session.execute("USE {}".format(self.space)) - - def release(self): - self.nebula_session.release() - self.nebula_client.release() - - def update_config(self, module, config): - assert module.upper() in ["GRAPH", "STORAGE", "META"] - self.nebula_session.execute("UPDATE CONFIGS {}:{}".format(module.upper(), config)) - - def submit_job(self, statement, timeout=1800, retry=5): - job_id = job_status = None - submit_succeeded = False - for i in range(retry): - r = self.nebula_session.execute(statement) - if r.is_succeeded(): - row = next(r._data_set_wrapper) - job_id = row.get_value(0).as_int() - - show_r = self.nebula_session.execute("SHOW JOB {}".format(job_id)) - assert show_r.is_succeeded() - row = next(show_r._data_set_wrapper) - job_status = row.get_value(2).as_string() - if job_status != "FAILED": - submit_succeeded = True - break - - else: - logger.warning("submit job failed, error message is {}".format(r.error_msg())) - time.sleep(10) - assert submit_succeeded, "this job does not submit successfully, job is <{}>".format( - statement - ) - logger.info("succeeded job <{}>, job id is <{}>".format(statement, job_id)) - timeout_time = time.time() + timeout - while time.time() < timeout_time: - show_r = self.nebula_session.execute("SHOW JOB {}".format(job_id)) - assert show_r.is_succeeded() - row = next(show_r._data_set_wrapper) - job_status = row.get_value(2).as_string() - if job_status == "FINISHED": - # job_id, command, start_time, stop_time, - return ( - row.get_value(0).as_int(), - row.get_value(1).as_string(), - row.get_value(3).as_datetime(), - row.get_value(4).as_datetime(), - ) - elif job_status == "FAILED": - logger.error("running job failed, job id is <{}>".format(job_id)) - raise Exception("job <{}> failed".format(job_id)) - - time.sleep(2) - - raise Exception("timeout") - - def compact(self, timeout=3600): - """ - TODO should move to another controller to test multiple import scenarios. - :param timeout: - :return: - """ - self.nebula_session.execute("USE {}".format(self.space)) - return self.submit_job("SUBMIT JOB COMPACT", timeout) - class NebulaController(BaseController): def __init__( @@ -104,33 +33,14 @@ def __init__( self.vid_type = vid_type or "int" def import_space(self, dry_run=False): - self.update_config("graph", "heartbeat_interval_secs=1") - self.update_config("storage", "heartbeat_interval_secs=1") result_file = self.dump_nebula_importer() command = ["scripts/nebula-importer", "--config", result_file] if not dry_run: return utils.run_process(command) return 0 - def init_space(self): - """ - create index and then rebuild - :return: - """ - self.update_config("graph", "heartbeat_interval_secs=1") - self.update_config("storage", "heartbeat_interval_secs=1") - r = self.nebula_session.execute( - "CREATE TAG INDEX IF NOT EXISTS idx_person on Person(firstName(20))" - ) - assert r.is_succeeded() - self.submit_job("REBUILD TAG INDEX idx_person") - def dump_nebula_importer(self): _type = "int64" if self.vid_type == "int" else "fixed_string(20)" - statment = "CREATE SPACE IF NOT EXISTS {}(PARTITION_NUM = 24, REPLICA_FACTOR = 3, vid_type ={} );".format( - self.space, _type - ) - self.nebula_session.execute(statment) p = parser.Parser(parser.NebulaDumper, self.data_folder) dumper = p.parse() kwargs = {} @@ -142,23 +52,6 @@ def dump_nebula_importer(self): return dumper.dump(**kwargs) - def clean_spaces(self, keep=None): - """ - delete the spaces - """ - if keep is None: - keep_spaces = [] - else: - keep_spaces = [item.strip() for item in keep.split(",")] - - result = self.nebula_session.execute("show spaces;") - - for r in result: - name = r.get_value(0).as_string() - if name in keep_spaces: - continue - self.nebula_session.execute("drop space {}; ".format(name)) - class StressController(BaseController): def __init__( @@ -217,95 +110,3 @@ def run(self, nebula_scenario): "{}/{}.html".format(result_folder, _class.result_file_name), ] utils.run_process(command) - - -class QueryController(BaseController): - def __init__( - self, - alert_class=None, - report_class=None, - space=None, - user=None, - password=None, - address=None, - ): - BaseController.__init__(self, space=space, user=user, password=password, address=address) - self.queries = [] - self.alert_class = alert_class - self.report_class = report_class - self.alert = [] - - def load_queries(self): - package_name = "nebula_bench.queries" - query_classes = utils.load_class(package_name, load_all=True, base_class=BaseQuery) - for _class in query_classes: - for q in _class.queries: - self.queries.append(q) - - def run(self): - # balance leader - self.nebula_session.execute("balance leader") - time.sleep(10) - - # prevent leader change error, run some queries to update storage client. - for _ in range(10): - self.nebula_session.execute("GO 2 STEPS FROM 2608 OVER KNOWS") - - self.load_queries() - for query in self.queries: - self.run_statement(query.name, query.stmt) - - self.release() - self.record.end_at = datetime.now() - self.record.save() - report_file = self.generate_report() - utils.dingding_msg( - "Finish query test, report html is http://{}/{}".format( - self.record.execute_machine, report_file - ) - ) - - self.send_alert() - - def run_statement(self, name, stmt): - logger.info("execute the statement <{}>, stmt is <{}>".format(name, stmt)) - count = 10 - latency_results = [] - response_results = [] - rows_counts = [] - succeeded = True - latency = response_time = rows_count = 0 - for _ in range(count): - now = time.monotonic() - r = self.nebula_session.execute(stmt) - if r.is_succeeded(): - # us - response_results.append((time.monotonic() - now) * 1000 * 1000) - latency_results.append(r.latency()) - rows_counts.append(r.row_size()) - else: - logger.warning("execution is not succeeded, the err is <{}>".format(r.error_msg())) - succeeded = False - - latency_results.sort() - response_results.sort() - if succeeded: - # pop minimum and maximum value - latency_results.pop(0) - latency_results.pop(1) - response_results.pop(0) - response_results.pop(1) - latency = sum(latency_results) / len(latency_results) - response_time = sum(response_results) / len(latency_results) - rows_count = sum(rows_counts) / len(rows_counts) - self.save_statement_result(name, stmt, latency, response_time, rows_count, succeeded) - - def add_alert(self, stmt, latency, baseline_latency): - self.alert.append((stmt, latency, baseline_latency)) - - def send_alert(self, alter_class=None): - pass - - def generate_report(self): - if self.report_class is not None: - return self.report_class(self).report() diff --git a/nebula_bench/locust_file.py b/nebula_bench/locust_file.py deleted file mode 100644 index 1a4fdb9..0000000 --- a/nebula_bench/locust_file.py +++ /dev/null @@ -1,56 +0,0 @@ -# -*- coding: utf-8 -*- -import importlib - -from locust import events, LoadTestShape -from locust.argument_parser import parse_options - - -class StepLoadShape(LoadTestShape): - stages = [ - {"duration": 6, "users": 10, "spawn_rate": 2}, - {"duration": 10, "users": 50, "spawn_rate": 10}, - {"duration": 20, "users": 100, "spawn_rate": 20}, - {"duration": 30, "users": 50, "spawn_rate": 10}, - # {"duration": 230, "users": 10, "spawn_rate": 10}, - # {"duration": 240, "users": 1, "spawn_rate": 1}, - ] - - def tick(self): - run_time = self.get_run_time() - - for stage in self.stages: - if run_time < stage["duration"]: - tick_data = (stage["users"], stage["spawn_rate"]) - return tick_data - - return None - - -@events.init_command_line_parser.add_listener -def add_arguments(parser): - group = parser.add_argument_group( - "customize nebula arguments", - "", - ) - group.add_argument( - "--nebula-scenario", - type=str, - help="stress test scenario name, e.g. match.MatchPerson", - env_var="", - default="", - ), - - -p = parse_options() -scenario = p.__dict__["nebula_scenario"] - -if scenario == "": - print("need scenario") - exit -else: - print("run the scenario {}".format(scenario)) - print("\n") - module, clazz = scenario.split(".") - _module = importlib.import_module(".".join(["nebula_bench.scenarios", module])) - User = getattr(_module, clazz) - print("statement is {}".format(User.statement)) diff --git a/nebula_bench/scenarios/go.py b/nebula_bench/scenarios/go.py index b625612..d0da279 100644 --- a/nebula_bench/scenarios/go.py +++ b/nebula_bench/scenarios/go.py @@ -18,6 +18,7 @@ class Go2Step(BaseGoScenario): abstract = False nGQL = "GO 2 STEP FROM {} OVER KNOWS" + class Go3Step(BaseGoScenario): abstract = False nGQL = "GO 3 STEP FROM {} OVER KNOWS" diff --git a/requirements.txt b/requirements.txt index e2c3906..a81d57a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,3 @@ Jinja2 click -nebula2-python == 2.0.0 python-dotenv diff --git a/scripts/env.sh b/scripts/env.sh index d7757a1..cc4d86f 100755 --- a/scripts/env.sh +++ b/scripts/env.sh @@ -5,7 +5,7 @@ HADOOP_VERSION=3.2.1 scaleFactor=${scaleFactor:-1} -NEBULA_IMPORTER_VERSION=${NEBULA_IMPORTER_VERSION:-master} -NEBULA_XK6_VERSION=${NEBULA_XK6_VERSION:-master} +NEBULA_IMPORTER_VERSION=${NEBULA_IMPORTER_VERSION:-v2.0.0-ga} +NEBULA_XK6_VERSION=${NEBULA_XK6_VERSION:-v0.0.6} GOLANG_VERSION=${GOLANG_VERSION:-1.16.6} \ No newline at end of file diff --git a/scripts/setup.sh b/scripts/setup.sh index b2b9c7a..1971c10 100755 --- a/scripts/setup.sh +++ b/scripts/setup.sh @@ -33,8 +33,8 @@ function setup_nebula_importer(){ } function setup_nebula_k6(){ - git clone --branch ${NEBULA_XK6_VERSION} https://github.com/HarrisChu/xk6-nebula ${TEMP_DIR}/xk6-nebula - cd ${TEMP_DIR}/xk6-nebula + git clone --branch ${NEBULA_XK6_VERSION} https://github.com/vesoft-inc/k6-plugin ${TEMP_DIR}/k6-plugin + cd ${TEMP_DIR}/k6-plugin make build mv k6 ${PROJECT_DIR}/scripts/. } diff --git a/util/LdbcGoStep/pom.xml b/util/LdbcGoStep/pom.xml deleted file mode 100644 index f5eedcc..0000000 --- a/util/LdbcGoStep/pom.xml +++ /dev/null @@ -1,89 +0,0 @@ - - 4.0.0 - com.vesoft - LdbcGoStep - jar - 2 - Maven Quick Start Archetype - http://maven.apache.org - - - junit - junit - 4.12 - - - org.slf4j - slf4j-api - 1.7.25 - - - com.vesoft - client - 2.0.0-SNAPSHOT - - - - org.apache.commons - commons-csv - 1.7 - - - org.apache.jmeter - ApacheJMeter_core - - - org.apache.logging.log4j - log4j-slf4j-impl - - - 5.4 - compile - - - - org.apache.jmeter - ApacheJMeter_java - 5.4 - compile - - - - - - snapshots - https://oss.sonatype.org/content/repositories/snapshots/ - - - - - - - org.apache.maven.plugins - maven-assembly-plugin - 2.4.1 - - - - src/main/resources/package.xml - - - - - com.healchow.consumer.Main - - - - - - make-assembly - package - - single - - - - - - - \ No newline at end of file diff --git a/util/LdbcGoStep/src/main/java/vesoft/LdbcGoStep.java b/util/LdbcGoStep/src/main/java/vesoft/LdbcGoStep.java deleted file mode 100644 index 9393587..0000000 --- a/util/LdbcGoStep/src/main/java/vesoft/LdbcGoStep.java +++ /dev/null @@ -1,181 +0,0 @@ -/* Copyright (c) 2021 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License, - * attached with Common Clause Condition 1.0, found in the LICENSES directory. - */ - -package com.vesoft; - -import com.vesoft.nebula.client.graph.NebulaPoolConfig; -import com.vesoft.nebula.client.graph.data.HostAddress; -import com.vesoft.nebula.client.graph.data.ResultSet; -import com.vesoft.nebula.client.graph.net.NebulaPool; -import com.vesoft.nebula.client.graph.net.Session; - -import java.util.Arrays; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient; -import org.slf4j.Logger; -import org.apache.jmeter.config.Arguments; -import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext; -import org.apache.jmeter.samplers.SampleResult; - -/** - * LDBC Go Step - */ -public class LdbcGoStep extends AbstractJavaSamplerClient { - - private final Logger log = getNewLogger(); - private NebulaPool pool = null; - private Session session = null; - private Integer maxVars = 20; - - - @Override - public Arguments getDefaultParameters() { - Arguments arguments = new Arguments(); - arguments.addArgument("hosts", "127.0.0.1:9669"); - arguments.addArgument("maxconn", "10"); - arguments.addArgument("user", "root"); - arguments.addArgument("pwd", "nebula"); - arguments.addArgument("space", ""); - arguments.addArgument("nGQL", "yield 1"); - arguments.addArgument("person", ""); - return arguments; - } - - - public void initNebulaPool(String hosts, int maxconn, int id) { - pool = new NebulaPool(); - try { - List addresses = new ArrayList(); - NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig(); - nebulaPoolConfig.setMaxConnSize(maxconn); - List host_list = new ArrayList(Arrays.asList(hosts.split(","))); - if (host_list == null) { - log.error("host_list is null!"); - - } - - String host = host_list.get(id % host_list.size()); - String[] splits = host.split(":"); - addresses.add(new HostAddress(splits[0], Integer.parseInt(splits[1]))); - boolean init = pool.init(addresses, nebulaPoolConfig); - if (init != true) { - if (pool != null) { - pool.close(); - } - log.info("pool init failed!"); - - } - } catch (Exception e) { - e.printStackTrace(); - if (pool != null) { - pool.close(); - } - log.error("pool init failed, error message is ", e); - - } finally { - log.info(String.format("initNebulaPool success!")); - } - } - - - @Override - public void setupTest(JavaSamplerContext javaSamplerContext) { - System.out.println("Perf thread start:" + Thread.currentThread().getName()); - String hosts = javaSamplerContext.getParameter("hosts"); - int maxconn = Integer.parseInt(javaSamplerContext.getParameter("maxconn").trim()); - String user = javaSamplerContext.getParameter("user"); - String pwd = javaSamplerContext.getParameter("pwd"); - String space = javaSamplerContext.getParameter("space"); - int id = javaSamplerContext.getJMeterContext().getThreadNum(); - initNebulaPool(hosts, maxconn, id); - try { - session = pool.getSession(user, pwd, false); - if (session != null) { - String use_space = "use " + space + ";"; - ResultSet resp = null; - resp = session.execute(use_space); - if (!resp.isSucceeded()) { - System.out.println("Switch space failed:" + space + "\nError is " + resp.getErrorMessage()); - System.exit(1); - } - } else { - log.info("getSession failed !"); - pool.close(); - - } - } catch ( - Exception e) { - log.error(e.getMessage()); - if (session != null) { - session.release(); - } - if (pool != null) { - pool.close(); - } - } finally { - log.info(String.format("setupTest success!")); - } - - } - - @Override - public SampleResult runTest(JavaSamplerContext javaSamplerContext) { - String nGQL = javaSamplerContext.getParameter("nGQL"); - for (int i=0;i - - jar-with-dependencies - - jar - - false - - - / - true - true - runtime - - org.apache.logging.log4j:log4j-slf4j-impl - org.slf4j:slf4j-log4j12 - - - - - \ No newline at end of file