From 1e1c1de0e316eb79c25aac0e2aa2ab49949cfdb3 Mon Sep 17 00:00:00 2001
From: "Harris.Chu" <1726587+HarrisChu@users.noreply.github.com>
Date: Mon, 16 Aug 2021 14:13:41 +0800
Subject: [PATCH] Prepare release (#36)
* delete jmeter
* delete useless code and nebula python client
* release doc
---
.github/workflows/build.yaml | 31 ---
.../{import.yaml => nebula-bench.yaml} | 0
README.md | 9 +-
README_cn.md | 11 +-
ldbc/jmx/go_step.jmx | 92 -------
ldbc/scripts/statistics.py | 246 ------------------
ldbc/setup/requirements.txt | 3 -
ldbc/setup/setup.sh | 44 ----
ldbc/sql/perf_metric.sql | 22 --
nebula_bench/cli.py | 24 +-
nebula_bench/common/base.py | 138 ----------
nebula_bench/controller.py | 201 +-------------
nebula_bench/locust_file.py | 56 ----
nebula_bench/scenarios/go.py | 1 +
requirements.txt | 1 -
scripts/env.sh | 4 +-
scripts/setup.sh | 4 +-
util/LdbcGoStep/pom.xml | 89 -------
.../src/main/java/vesoft/LdbcGoStep.java | 181 -------------
.../LdbcGoStep/src/main/resources/package.xml | 23 --
20 files changed, 23 insertions(+), 1157 deletions(-)
delete mode 100644 .github/workflows/build.yaml
rename .github/workflows/{import.yaml => nebula-bench.yaml} (100%)
delete mode 100644 ldbc/jmx/go_step.jmx
delete mode 100644 ldbc/scripts/statistics.py
delete mode 100644 ldbc/setup/requirements.txt
delete mode 100755 ldbc/setup/setup.sh
delete mode 100644 ldbc/sql/perf_metric.sql
delete mode 100644 nebula_bench/locust_file.py
delete mode 100644 util/LdbcGoStep/pom.xml
delete mode 100644 util/LdbcGoStep/src/main/java/vesoft/LdbcGoStep.java
delete mode 100644 util/LdbcGoStep/src/main/resources/package.xml
diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml
deleted file mode 100644
index ce4dcbb..0000000
--- a/.github/workflows/build.yaml
+++ /dev/null
@@ -1,31 +0,0 @@
-name: build
-on:
- push:
- branches: [master]
- pull_request:
- branches:
- - master
- - "v[0-9]+.*"
-
-jobs:
-
- build:
- name: Build
- runs-on: ubuntu-latest
- steps:
- - uses: actions/checkout@v2
- - name: Set up JDK 1.8
- uses: actions/setup-java@v1
- with:
- java-version: 1.8
-
- - name: Cache the Maven packages to speed up build
- uses: actions/cache@v2
- with:
- path: ~/.m2/repository
- key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}
- restore-keys: ${{ runner.os }}-maven-
-
- - name: Build with Maven
- run: cd util/LdbcGoStep && mvn -B package
-
diff --git a/.github/workflows/import.yaml b/.github/workflows/nebula-bench.yaml
similarity index 100%
rename from .github/workflows/import.yaml
rename to .github/workflows/nebula-bench.yaml
diff --git a/README.md b/README.md
index 4ebfdba..366f1f8 100644
--- a/README.md
+++ b/README.md
@@ -10,7 +10,14 @@ It only support nebula graph 2.0+ release.
The main features:
* Generate the LDBC dataset and then import into nebula-graph.
-* nebula-graph benchmark. (WIP)
+* Run load test with k6.
+
+## Dependency
+
+| Nebula Bench | Nebua | Nebula Importer | K6 Plugin | ldbc_snb_datagen | Nebula-go |
+|:-----------------:|:-------------:|:---------------:|:------------:|:-------------------:|:--------------:|
+| v0.2 | v2.0.1 | v2.0.0-ga | v0.0.6 | v0.3.3 | v2.0.0-ga |
+| master | v2.0.1 | v2.0.0-ga | v0.0.6 | v0.3.3 | v2.0.0-ga |
## How to use
diff --git a/README_cn.md b/README_cn.md
index 7c25674..96020f0 100644
--- a/README_cn.md
+++ b/README_cn.md
@@ -7,7 +7,14 @@
主要功能:
* 生产 LDBC 数据集并导入 nebula graph。
-* nebula-graph benchmark。 (未完成)
+* 用 k6 进行压测。
+
+## 工具依赖
+
+| Nebula Bench | Nebua | Nebula Importer | K6 Plugin | ldbc_snb_datagen | Nebula-go |
+|:-----------------:|:-------------:|:---------------:|:------------:|:-------------------:|:--------------:|
+| v0.2 | v2.0.1 | v2.0.0-ga | v0.0.6 | v0.3.3 | v2.0.0-ga |
+| master | v2.0.1 | v2.0.0-ga | v0.0.6 | v0.3.3 | v2.0.0-ga |
## 使用说明
@@ -179,8 +186,6 @@ awk -F ',' 'NR>1{print $NF}' output/output_Go1Step.csv |sort|uniq -c
latency 的单位是 `us`。
-如果使用 Jmeter,暂时没有自动化操作,可以通过手动调整 Jmeter 来测试,具体参考 [jmx](ldbc/jmx/go_step.jmx) 和 [java](util/LdbcGoStep/src/main/java/vesoft/LdbcGoStep.java)。
-
## 更多
* 生成的数据文件,如果是 `aaa_xxYY_bbb` 格式,比如 `comment_hasTag_tag`,会认为是一个边类型,然后边的格式是 `XX_YY`。和 ldbc 保持一致 [ldbc_snb_interactive](https://github.com/ldbc/ldbc_snb_interactive/blob/main/cypher/queries/interactive-complex-1.cypher)
diff --git a/ldbc/jmx/go_step.jmx b/ldbc/jmx/go_step.jmx
deleted file mode 100644
index 88cf221..0000000
--- a/ldbc/jmx/go_step.jmx
+++ /dev/null
@@ -1,92 +0,0 @@
-
-
-
-
-
- false
- true
- false
-
-
-
-
-
-
-
- stoptestnow
-
- false
- -1
-
- {nums}
- 0
- true
- {duration}
-
- true
-
-
-
-
-
-
- hosts
- {hosts}
- =
-
-
- maxconn
- 10
- =
-
-
- user
- {user}
- =
-
-
- pwd
- {pwd}
- =
-
-
- space
- {space}
- =
-
-
-
- var2
- ${c2}
- =
-
-
-
- nGQL
- GO 3 STEP FROM var2 OVER knows
- =
-
-
-
-
- com.vesoft.LdbcGoStep
-
-
-
- |
-
- person.csv
- false
- false
- true
- shareMode.all
- false
- c1,c2,c3,c4,c5,c6,c7,c8
-
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/ldbc/scripts/statistics.py b/ldbc/scripts/statistics.py
deleted file mode 100644
index d8476c5..0000000
--- a/ldbc/scripts/statistics.py
+++ /dev/null
@@ -1,246 +0,0 @@
-import sys
-import pandas as pd
-import numpy as np
-import pymysql
-import datetime
-from optparse import OptionParser
-import json
-
-
-def insert_mertrics(sql, conf):
- conn = pymysql.connect(
- host=conf["ip"], port=conf["port"], user=conf["user"], passwd=conf["pwd"], db=conf["db"]
- )
- cur = conn.cursor()
- cur.execute(sql)
- conn.commit()
- conn.close()
-
-
-def df_throughput(df):
- df["finish"] = df["timeStamp"] + df["elapsed"] / 1000
-
- start_time = df["timeStamp"].min()
- end_time = df["finish"].max()
- count = len(df)
- throughput = count / ((end_time - start_time) / 1000)
- print("throughput:", throughput)
- dic_throughput = {}
- dic_throughput["start_time"] = datetime.datetime.fromtimestamp(int(start_time / 1000)).strftime(
- "%Y-%m-%d %H:%M:%S"
- )
- dic_throughput["end_time"] = datetime.datetime.fromtimestamp(int(end_time / 1000)).strftime(
- "%Y-%m-%d %H:%M:%S"
- )
- dic_throughput["throughput"] = throughput
- return dic_throughput
-
-
-def df_statistics(df, label, column_name):
- df_sort = df.sort_values(by=column_name, ascending=True).reset_index(drop=True)
- count = df_sort.agg({column_name: ["count"]})[column_name].values[0]
- fail_count = df[df["success"] == False].count()["success"]
-
- p90 = int(len(df_sort) * 0.90)
- p95 = int(len(df_sort) * 0.95)
- p99 = int(len(df_sort) * 0.99)
- if p90 >= count:
- p90 = count - 1
- if p90 >= count:
- p95 = count - 1
- if p90 >= count:
- p99 = count - 1
- if column_name == "elapsed":
- p90_value = df_sort.loc[p90, [column_name]][column_name] / 1000
- p95_value = df_sort.loc[p95, [column_name]][column_name] / 1000
- p99_value = df_sort.loc[p99, [column_name]][column_name] / 1000
- min_value = df_sort.agg({column_name: ["min"]})[column_name].values[0] / 1000
- max_value = df_sort.agg({column_name: ["max"]})[column_name].values[0] / 1000
- avg = df_sort.agg({column_name: ["mean"]})[column_name].values[0] / 1000
- median = df_sort.agg({column_name: ["median"]})[column_name].values[0] / 1000
- elif column_name == "Latency":
- p90_value = df_sort.loc[p90, [column_name]][column_name] / 1000
- p95_value = df_sort.loc[p95, [column_name]][column_name] / 1000
- p99_value = df_sort.loc[p99, [column_name]][column_name] / 1000
- min_value = df_sort.agg({column_name: ["min"]})[column_name].values[0] / 1000
- max_value = df_sort.agg({column_name: ["max"]})[column_name].values[0] / 1000
- avg = df_sort.agg({column_name: ["mean"]})[column_name].values[0] / 1000
- median = df_sort.agg({column_name: ["median"]})[column_name].values[0] / 1000
-
- # sql='insert perf_statisticss(casename,label,nebula_version,starttime,endtime,count,fail_count,min,max,avg,median,p90,p95,p99,throughput) \
- # values (\'{}\',\'{}\',\'{}\',\'{}\',\'{}\',{},{},{},{},{},{},{},{},{},{})\
- # .format(casename,lable,nebula_version,);
- print(
- "statistics:",
- column_name,
- "label:",
- label,
- "count:",
- count,
- "fail_count:",
- fail_count,
- "min:",
- min_value,
- "max_value:",
- max_value,
- "avg:",
- avg,
- "median",
- median,
- "p90:",
- p90_value,
- "p95:",
- p95_value,
- "p99:",
- p99_value,
- )
- dic_statistics = {}
- dic_statistics["column_name"] = column_name
- dic_statistics["label"] = label
- dic_statistics["count"] = count
- dic_statistics["fail_count"] = fail_count
- dic_statistics["min"] = min_value
- dic_statistics["max"] = max_value
- dic_statistics["avg"] = avg
- dic_statistics["median"] = median
- dic_statistics["p90"] = p90_value
- dic_statistics["p95"] = p95_value
- dic_statistics["p99"] = p99_value
- return dic_statistics
-
-
-if __name__ == "__main__":
- opt_parser = OptionParser()
- opt_parser.add_option("-f", "--filename", dest="filename", default="", help="case1.jtl")
- opt_parser.add_option("-c", "--casename", dest="casename", default="", help="2.1")
- opt_parser.add_option("-v", "--vesion", dest="nebula_version", default="", help="version")
- opt_parser.add_option(
- "-m",
- "--mysqlconf",
- dest="mysqlconf",
- default="",
- help='{"ip":"127.0.0.1","port":3306,"user":"root","pwd":"xxx","db":"xxx"}',
- )
- options, args = opt_parser.parse_args()
-
- if options.filename == "":
- print("please input csv filename")
- sys.exit(1)
-
- if options.mysqlconf != "":
- try:
- print(options.mysqlconf)
- mysql_conf_json = json.loads(options.mysqlconf)
- except:
- print("mysqlconf is invalid, please check!")
- sys.exit(1)
-
- if options.casename == "":
- print("please input casename")
- sys.exit(1)
-
- if options.nebula_version == "":
- print("please input nebula_version")
- sys.exit(1)
-
- # read file
- try:
- df = pd.read_csv(options.filename, header=0, sep=",")
- except:
- print("read csv file failed")
- sys.exit(1)
- else:
- print("read csv to df success")
-
- if len(df) == 0:
- print("there is no data to statistics")
- sys.exit(0)
-
- # delete somedata
- df.drop(df.head((int)(len(df) * 0.1)).index, inplace=True)
- df.drop(df.tail((int)(len(df) * 0.1)).index, inplace=True)
-
- # statistics client time
- try:
- dic_elapsed_statistics = df_statistics(df, "total", "elapsed")
- for label, group in df.groupby(["label"]):
- df_statistics(group, label, "elapsed")
- except:
- print("statistics elapsed failed , please check the data in file")
- sys.exit(1)
- else:
- print("statistics elapsed finished!")
-
- # statistics server time
- try:
- dic_latency_statistics = df_statistics(df, "total", "Latency")
- for label, group in df.groupby(["label"]):
- df_statistics(group, label, "Latency")
- except:
- print("statistics Latency failed , please check the data in file")
- sys.exit(1)
- else:
- print("statistics Latency finished!")
-
- try:
- dic_throughput = df_throughput(df)
- except:
- print("statistics throughput failed , please check the data in file")
- sys.exit(1)
- else:
- print("statistics throughput finished!")
-
- if options.mysqlconf != "":
- client_metrics_sql = "insert perf_metrics(casename,label,nebula_version,starttime,endtime,count,fail_count,min,max,avg,median,p90,p95,p99,throughput) \
- values ('{}','{}','{}','{}','{}',{},{},{},{},{},{},{},{},{},{})".format(
- options.casename,
- "client_time",
- options.nebula_version,
- dic_throughput["start_time"],
- dic_throughput["end_time"],
- dic_elapsed_statistics["count"],
- dic_elapsed_statistics["fail_count"],
- dic_elapsed_statistics["min"],
- dic_elapsed_statistics["max"],
- dic_elapsed_statistics["avg"],
- dic_elapsed_statistics["median"],
- dic_elapsed_statistics["p90"],
- dic_elapsed_statistics["p95"],
- dic_elapsed_statistics["p99"],
- dic_throughput["throughput"],
- )
-
- server_metrics_sql = "insert perf_metrics(casename,label,nebula_version,starttime,endtime,count,fail_count,min,max,avg,median,p90,p95,p99,throughput) \
- values ('{}','{}','{}','{}','{}',{},{},{},{},{},{},{},{},{},{})".format(
- options.casename,
- "server_time",
- options.nebula_version,
- dic_throughput["start_time"],
- dic_throughput["end_time"],
- dic_latency_statistics["count"],
- dic_latency_statistics["fail_count"],
- dic_latency_statistics["min"],
- dic_latency_statistics["max"],
- dic_latency_statistics["avg"],
- dic_latency_statistics["median"],
- dic_latency_statistics["p90"],
- dic_latency_statistics["p95"],
- dic_latency_statistics["p99"],
- dic_throughput["throughput"],
- )
-
- try:
- insert_mertrics(client_metrics_sql, mysql_conf_json)
- except:
- print("insert client metrics to mysql failed!")
- sys.exit(1)
- else:
- print("insert client metrics to mysql finished!")
-
- try:
- insert_mertrics(server_metrics_sql, mysql_conf_json)
- except:
- print("insert server metrics to mysql failed!")
- sys.exit(1)
- else:
- print("insert server metrics to mysql finished!")
diff --git a/ldbc/setup/requirements.txt b/ldbc/setup/requirements.txt
deleted file mode 100644
index 3b2d239..0000000
--- a/ldbc/setup/requirements.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-pandas==1.1.5
-numpy==1.19.5
-pymysql==1.0.2
diff --git a/ldbc/setup/setup.sh b/ldbc/setup/setup.sh
deleted file mode 100755
index 9f4f897..0000000
--- a/ldbc/setup/setup.sh
+++ /dev/null
@@ -1,44 +0,0 @@
-#! /usr/bin/env bash
-curr_path=$(readlink -f "$(dirname "$0")")
-java_prj_path=${curr_path}/../../util/LdbcGoStep
-
-if [ $# -lt 1 ] ; then
- echo "Please input jmeter install path"
- exit
-else
- jmeter_install_path=$1
- echo "jmeter install path is $1"
-fi
-
-cd ${java_prj_path}
-mvn package
-if [ $? != 0 ] ; then
- cd -
- echo "mvn package failed!"
- exit
-fi
-
-cd -
-
-wget -P $jmeter_install_path https://mirrors.bfsu.edu.cn/apache/jmeter/binaries/apache-jmeter-5.4.zip
-unzip ${jmeter_install_path}/apache-jmeter-5.4.zip -d $jmeter_install_path
-if [ $? != 0 ] ; then
- echo "install jmeter failed!"
- exit
-fi
-
-jar_path=${java_prj_path}/target/LdbcGoStep-2-jar-with-dependencies.jar
-jmeter_lib_path=${jmeter_install_path}/apache-jmeter-5.4/lib/ext
-cp -rf ${jar_path} ${jmeter_lib_path}/LdbcGoStep-2-jar-with-dependencies.jar
-if [ $? != 0 ] ; then
- echo "cp -rf ${jar_path} ${jmeter_lib_path} failed"
- exit
-fi
-
-
-pip3 install --user -r ${curr_path}/requirements.txt -i https://mirrors.aliyun.com/pypi/simple/
-
-if [ $? != 0 ] ; then
- echo "install python3 pkg failed!"
- exit
-fi
diff --git a/ldbc/sql/perf_metric.sql b/ldbc/sql/perf_metric.sql
deleted file mode 100644
index b0fd265..0000000
--- a/ldbc/sql/perf_metric.sql
+++ /dev/null
@@ -1,22 +0,0 @@
-drop database if exists perftest;
-
-create database perftest;
-
-use perftest1;
-create table perf_metrics(
-casename varchar(50),
-label varchar(50),
-nebula_version varchar(10),
-starttime timestamp,
-endtime timestamp,
-count int,
-fail_count int,
-min double,
-max double,
-avg double,
-median double,
-p90 double,
-p95 double,
-p99 double,
-throughput double
-);
diff --git a/nebula_bench/cli.py b/nebula_bench/cli.py
index 4d6db15..5ee29ce 100644
--- a/nebula_bench/cli.py
+++ b/nebula_bench/cli.py
@@ -97,29 +97,9 @@ def importer(folder, address, user, password, space, vid_type, dry_run):
c = nc.import_space(dry_run)
if c != 0:
exit(c)
- if not dry_run:
- click.echo("begin space compact")
- nc.compact()
- click.echo("end space compact")
- nc.release()
-@nebula.command(help="initial nebula graph, including create indexes")
-@common
-def init(folder, address, user, password, space):
- nc = NebulaController(
- data_folder=folder,
- user=user,
- password=password,
- address=address,
- space=space,
- vid_type="int",
- )
-
- nc.init_space()
-
-
-@cli.group()
+@cli.group(help="stress testing")
def stress():
pass
@@ -158,8 +138,6 @@ def run(
)
stress.run()
- pass
-
@stress.command()
def scenarios():
diff --git a/nebula_bench/common/base.py b/nebula_bench/common/base.py
index 9d376d9..892b78c 100644
--- a/nebula_bench/common/base.py
+++ b/nebula_bench/common/base.py
@@ -1,137 +1,7 @@
# -*- encoding: utf-8 -*-
-import re
-import time
-from collections import deque
-import csv
-from pathlib import Path
-from collections import namedtuple
-
-from nebula2.gclient.net import ConnectionPool
-from nebula2.Config import Config
-from nebula2.data.ResultSet import ResultSet
-
-from nebula_bench import setting
from nebula_bench.utils import logger
-class CSVReader(object):
- def __init__(self, file):
- try:
- file = open(file)
- except TypeError:
- # "file" was already a pre-opened file-like object
- pass
- self.file = file
- self.reader = csv.reader(file)
-
- def __next__(self):
- try:
- return next(self.reader)
- except StopIteration:
- # reuse file on EOF
- self.file.seek(0, 0)
- return next(self.reader)
-
-
-class NebulaClient(object):
- def __init__(
- self,
- max_connection=None,
- user=None,
- password=None,
- address=None,
- space=None,
- timeout=None,
- ):
- self.user = user or setting.NEBULA_USER
- self.password = password or setting.NEBULA_PASSWORD
- self.space = space or setting.NEBULA_SPACE
- self.config = Config()
- self.config.max_connection_pool_size = max_connection or setting.NEBULA_MAX_CONNECTION
-
- if timeout is not None:
- self.config.timeout = timeout
-
- address = address or setting.NEBULA_ADDRESS
- address_list = address.split(",")
- self.address = [(item.split(":")[0], int(item.split(":")[1])) for item in address_list]
-
- self.deque = deque(maxlen=self.config.max_connection_pool_size)
-
- # init connection pool
- self.connection_pool = ConnectionPool()
- # if the given servers are ok, return true, else return false
- ok = self.connection_pool.init(self.address, self.config)
- assert ok, "cannot connect the server, address is {}".format(address)
-
- def add_session(self):
- _session = self.connection_pool.get_session(self.user, self.password)
- _session.execute("USE {}".format(self.space))
- self.deque.append(_session)
-
- def release_session(self):
- _session = None
- try:
- _session = self.deque.pop()
- except:
- pass
- if _session is not None:
- _session.release()
-
- def execute(self, stmt):
- if len(self.deque) == 0:
- self.add_session()
- _session = self.deque.popleft()
- try:
- r = _session.execute(stmt)
- except Exception as e:
- logger.error("execute stmt error, e is {}".format(e))
- r = None
- finally:
- self.deque.append(_session)
- return r
-
- def release(self):
- for _session in self.deque:
- _session.release()
- self.connection_pool.close()
-
-
-re_pattern = r"\$\{\}"
-
-
-class StmtGenerator(object):
- def __init__(self, stmt_template, parameters, data_folder):
- """
-
- :param stmt_template: statement template
- :param parameters: ((csv_file, index),)
- """
- self.csv_reader_list = []
- self.index_list = []
- self.stmt_template = stmt_template
- self.data_folder = Path(data_folder)
-
- for p in parameters:
- csv_file, index = p
- csv_path = str((self.data_folder / csv_file).absolute())
- csv_reader = CSVReader(csv_path)
- self.csv_reader_list.append(csv_reader)
- self.index_list.append(index)
-
- def __next__(self):
- _stmt = self.stmt_template
- for _index, csv_reader in enumerate(self.csv_reader_list):
- index = self.index_list[_index]
-
- line = ",".join(next(csv_reader))
- value = line.split("|")[index]
-
- _stmt = re.sub(re_pattern, value, _stmt, count=1)
-
- return _stmt
-
-
class ScenarioMeta(type):
def __new__(cls, name, bases, attrs, *args, **kwargs):
# super(ScenarioMeta, cls).__new__(cls, name, bases, attrs, *args, **kwargs)
@@ -151,11 +21,3 @@ class BaseScenario(metaclass=ScenarioMeta):
csv_path: str
csv_index: list
name: str
-
-
-class BaseQuery(object):
- queries: tuple
-
-
-class BaseImport(object):
- pass
diff --git a/nebula_bench/controller.py b/nebula_bench/controller.py
index 7cd7a6e..078af1b 100644
--- a/nebula_bench/controller.py
+++ b/nebula_bench/controller.py
@@ -1,12 +1,10 @@
# -*- coding: utf-8 -*-
-import time
from pathlib import Path
-from datetime import datetime
from nebula_bench import parser
from nebula_bench import setting
from nebula_bench.utils import logger
-from nebula_bench.common.base import BaseScenario, BaseQuery, NebulaClient
+from nebula_bench.common.base import BaseScenario
from nebula_bench import utils
@@ -20,75 +18,6 @@ def __init__(self, data_folder=None, space=None, user=None, password=None, addre
self.password = password or setting.NEBULA_PASSWORD
self.address = address or setting.NEBULA_ADDRESS
- self.nebula_client = NebulaClient(max_connection=5, address=self.address)
- self.nebula_session = self.nebula_client.connection_pool.get_session(
- self.user, self.password
- )
- self.nebula_session.execute("USE {}".format(self.space))
-
- def release(self):
- self.nebula_session.release()
- self.nebula_client.release()
-
- def update_config(self, module, config):
- assert module.upper() in ["GRAPH", "STORAGE", "META"]
- self.nebula_session.execute("UPDATE CONFIGS {}:{}".format(module.upper(), config))
-
- def submit_job(self, statement, timeout=1800, retry=5):
- job_id = job_status = None
- submit_succeeded = False
- for i in range(retry):
- r = self.nebula_session.execute(statement)
- if r.is_succeeded():
- row = next(r._data_set_wrapper)
- job_id = row.get_value(0).as_int()
-
- show_r = self.nebula_session.execute("SHOW JOB {}".format(job_id))
- assert show_r.is_succeeded()
- row = next(show_r._data_set_wrapper)
- job_status = row.get_value(2).as_string()
- if job_status != "FAILED":
- submit_succeeded = True
- break
-
- else:
- logger.warning("submit job failed, error message is {}".format(r.error_msg()))
- time.sleep(10)
- assert submit_succeeded, "this job does not submit successfully, job is <{}>".format(
- statement
- )
- logger.info("succeeded job <{}>, job id is <{}>".format(statement, job_id))
- timeout_time = time.time() + timeout
- while time.time() < timeout_time:
- show_r = self.nebula_session.execute("SHOW JOB {}".format(job_id))
- assert show_r.is_succeeded()
- row = next(show_r._data_set_wrapper)
- job_status = row.get_value(2).as_string()
- if job_status == "FINISHED":
- # job_id, command, start_time, stop_time,
- return (
- row.get_value(0).as_int(),
- row.get_value(1).as_string(),
- row.get_value(3).as_datetime(),
- row.get_value(4).as_datetime(),
- )
- elif job_status == "FAILED":
- logger.error("running job failed, job id is <{}>".format(job_id))
- raise Exception("job <{}> failed".format(job_id))
-
- time.sleep(2)
-
- raise Exception("timeout")
-
- def compact(self, timeout=3600):
- """
- TODO should move to another controller to test multiple import scenarios.
- :param timeout:
- :return:
- """
- self.nebula_session.execute("USE {}".format(self.space))
- return self.submit_job("SUBMIT JOB COMPACT", timeout)
-
class NebulaController(BaseController):
def __init__(
@@ -104,33 +33,14 @@ def __init__(
self.vid_type = vid_type or "int"
def import_space(self, dry_run=False):
- self.update_config("graph", "heartbeat_interval_secs=1")
- self.update_config("storage", "heartbeat_interval_secs=1")
result_file = self.dump_nebula_importer()
command = ["scripts/nebula-importer", "--config", result_file]
if not dry_run:
return utils.run_process(command)
return 0
- def init_space(self):
- """
- create index and then rebuild
- :return:
- """
- self.update_config("graph", "heartbeat_interval_secs=1")
- self.update_config("storage", "heartbeat_interval_secs=1")
- r = self.nebula_session.execute(
- "CREATE TAG INDEX IF NOT EXISTS idx_person on Person(firstName(20))"
- )
- assert r.is_succeeded()
- self.submit_job("REBUILD TAG INDEX idx_person")
-
def dump_nebula_importer(self):
_type = "int64" if self.vid_type == "int" else "fixed_string(20)"
- statment = "CREATE SPACE IF NOT EXISTS {}(PARTITION_NUM = 24, REPLICA_FACTOR = 3, vid_type ={} );".format(
- self.space, _type
- )
- self.nebula_session.execute(statment)
p = parser.Parser(parser.NebulaDumper, self.data_folder)
dumper = p.parse()
kwargs = {}
@@ -142,23 +52,6 @@ def dump_nebula_importer(self):
return dumper.dump(**kwargs)
- def clean_spaces(self, keep=None):
- """
- delete the spaces
- """
- if keep is None:
- keep_spaces = []
- else:
- keep_spaces = [item.strip() for item in keep.split(",")]
-
- result = self.nebula_session.execute("show spaces;")
-
- for r in result:
- name = r.get_value(0).as_string()
- if name in keep_spaces:
- continue
- self.nebula_session.execute("drop space {}; ".format(name))
-
class StressController(BaseController):
def __init__(
@@ -217,95 +110,3 @@ def run(self, nebula_scenario):
"{}/{}.html".format(result_folder, _class.result_file_name),
]
utils.run_process(command)
-
-
-class QueryController(BaseController):
- def __init__(
- self,
- alert_class=None,
- report_class=None,
- space=None,
- user=None,
- password=None,
- address=None,
- ):
- BaseController.__init__(self, space=space, user=user, password=password, address=address)
- self.queries = []
- self.alert_class = alert_class
- self.report_class = report_class
- self.alert = []
-
- def load_queries(self):
- package_name = "nebula_bench.queries"
- query_classes = utils.load_class(package_name, load_all=True, base_class=BaseQuery)
- for _class in query_classes:
- for q in _class.queries:
- self.queries.append(q)
-
- def run(self):
- # balance leader
- self.nebula_session.execute("balance leader")
- time.sleep(10)
-
- # prevent leader change error, run some queries to update storage client.
- for _ in range(10):
- self.nebula_session.execute("GO 2 STEPS FROM 2608 OVER KNOWS")
-
- self.load_queries()
- for query in self.queries:
- self.run_statement(query.name, query.stmt)
-
- self.release()
- self.record.end_at = datetime.now()
- self.record.save()
- report_file = self.generate_report()
- utils.dingding_msg(
- "Finish query test, report html is http://{}/{}".format(
- self.record.execute_machine, report_file
- )
- )
-
- self.send_alert()
-
- def run_statement(self, name, stmt):
- logger.info("execute the statement <{}>, stmt is <{}>".format(name, stmt))
- count = 10
- latency_results = []
- response_results = []
- rows_counts = []
- succeeded = True
- latency = response_time = rows_count = 0
- for _ in range(count):
- now = time.monotonic()
- r = self.nebula_session.execute(stmt)
- if r.is_succeeded():
- # us
- response_results.append((time.monotonic() - now) * 1000 * 1000)
- latency_results.append(r.latency())
- rows_counts.append(r.row_size())
- else:
- logger.warning("execution is not succeeded, the err is <{}>".format(r.error_msg()))
- succeeded = False
-
- latency_results.sort()
- response_results.sort()
- if succeeded:
- # pop minimum and maximum value
- latency_results.pop(0)
- latency_results.pop(1)
- response_results.pop(0)
- response_results.pop(1)
- latency = sum(latency_results) / len(latency_results)
- response_time = sum(response_results) / len(latency_results)
- rows_count = sum(rows_counts) / len(rows_counts)
- self.save_statement_result(name, stmt, latency, response_time, rows_count, succeeded)
-
- def add_alert(self, stmt, latency, baseline_latency):
- self.alert.append((stmt, latency, baseline_latency))
-
- def send_alert(self, alter_class=None):
- pass
-
- def generate_report(self):
- if self.report_class is not None:
- return self.report_class(self).report()
diff --git a/nebula_bench/locust_file.py b/nebula_bench/locust_file.py
deleted file mode 100644
index 1a4fdb9..0000000
--- a/nebula_bench/locust_file.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# -*- coding: utf-8 -*-
-import importlib
-
-from locust import events, LoadTestShape
-from locust.argument_parser import parse_options
-
-
-class StepLoadShape(LoadTestShape):
- stages = [
- {"duration": 6, "users": 10, "spawn_rate": 2},
- {"duration": 10, "users": 50, "spawn_rate": 10},
- {"duration": 20, "users": 100, "spawn_rate": 20},
- {"duration": 30, "users": 50, "spawn_rate": 10},
- # {"duration": 230, "users": 10, "spawn_rate": 10},
- # {"duration": 240, "users": 1, "spawn_rate": 1},
- ]
-
- def tick(self):
- run_time = self.get_run_time()
-
- for stage in self.stages:
- if run_time < stage["duration"]:
- tick_data = (stage["users"], stage["spawn_rate"])
- return tick_data
-
- return None
-
-
-@events.init_command_line_parser.add_listener
-def add_arguments(parser):
- group = parser.add_argument_group(
- "customize nebula arguments",
- "",
- )
- group.add_argument(
- "--nebula-scenario",
- type=str,
- help="stress test scenario name, e.g. match.MatchPerson",
- env_var="",
- default="",
- ),
-
-
-p = parse_options()
-scenario = p.__dict__["nebula_scenario"]
-
-if scenario == "":
- print("need scenario")
- exit
-else:
- print("run the scenario {}".format(scenario))
- print("\n")
- module, clazz = scenario.split(".")
- _module = importlib.import_module(".".join(["nebula_bench.scenarios", module]))
- User = getattr(_module, clazz)
- print("statement is {}".format(User.statement))
diff --git a/nebula_bench/scenarios/go.py b/nebula_bench/scenarios/go.py
index b625612..d0da279 100644
--- a/nebula_bench/scenarios/go.py
+++ b/nebula_bench/scenarios/go.py
@@ -18,6 +18,7 @@ class Go2Step(BaseGoScenario):
abstract = False
nGQL = "GO 2 STEP FROM {} OVER KNOWS"
+
class Go3Step(BaseGoScenario):
abstract = False
nGQL = "GO 3 STEP FROM {} OVER KNOWS"
diff --git a/requirements.txt b/requirements.txt
index e2c3906..a81d57a 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,4 +1,3 @@
Jinja2
click
-nebula2-python == 2.0.0
python-dotenv
diff --git a/scripts/env.sh b/scripts/env.sh
index d7757a1..cc4d86f 100755
--- a/scripts/env.sh
+++ b/scripts/env.sh
@@ -5,7 +5,7 @@ HADOOP_VERSION=3.2.1
scaleFactor=${scaleFactor:-1}
-NEBULA_IMPORTER_VERSION=${NEBULA_IMPORTER_VERSION:-master}
-NEBULA_XK6_VERSION=${NEBULA_XK6_VERSION:-master}
+NEBULA_IMPORTER_VERSION=${NEBULA_IMPORTER_VERSION:-v2.0.0-ga}
+NEBULA_XK6_VERSION=${NEBULA_XK6_VERSION:-v0.0.6}
GOLANG_VERSION=${GOLANG_VERSION:-1.16.6}
\ No newline at end of file
diff --git a/scripts/setup.sh b/scripts/setup.sh
index b2b9c7a..1971c10 100755
--- a/scripts/setup.sh
+++ b/scripts/setup.sh
@@ -33,8 +33,8 @@ function setup_nebula_importer(){
}
function setup_nebula_k6(){
- git clone --branch ${NEBULA_XK6_VERSION} https://github.com/HarrisChu/xk6-nebula ${TEMP_DIR}/xk6-nebula
- cd ${TEMP_DIR}/xk6-nebula
+ git clone --branch ${NEBULA_XK6_VERSION} https://github.com/vesoft-inc/k6-plugin ${TEMP_DIR}/k6-plugin
+ cd ${TEMP_DIR}/k6-plugin
make build
mv k6 ${PROJECT_DIR}/scripts/.
}
diff --git a/util/LdbcGoStep/pom.xml b/util/LdbcGoStep/pom.xml
deleted file mode 100644
index f5eedcc..0000000
--- a/util/LdbcGoStep/pom.xml
+++ /dev/null
@@ -1,89 +0,0 @@
-
- 4.0.0
- com.vesoft
- LdbcGoStep
- jar
- 2
- Maven Quick Start Archetype
- http://maven.apache.org
-
-
- junit
- junit
- 4.12
-
-
- org.slf4j
- slf4j-api
- 1.7.25
-
-
- com.vesoft
- client
- 2.0.0-SNAPSHOT
-
-
-
- org.apache.commons
- commons-csv
- 1.7
-
-
- org.apache.jmeter
- ApacheJMeter_core
-
-
- org.apache.logging.log4j
- log4j-slf4j-impl
-
-
- 5.4
- compile
-
-
-
- org.apache.jmeter
- ApacheJMeter_java
- 5.4
- compile
-
-
-
-
-
- snapshots
- https://oss.sonatype.org/content/repositories/snapshots/
-
-
-
-
-
-
- org.apache.maven.plugins
- maven-assembly-plugin
- 2.4.1
-
-
-
- src/main/resources/package.xml
-
-
-
-
- com.healchow.consumer.Main
-
-
-
-
-
- make-assembly
- package
-
- single
-
-
-
-
-
-
-
\ No newline at end of file
diff --git a/util/LdbcGoStep/src/main/java/vesoft/LdbcGoStep.java b/util/LdbcGoStep/src/main/java/vesoft/LdbcGoStep.java
deleted file mode 100644
index 9393587..0000000
--- a/util/LdbcGoStep/src/main/java/vesoft/LdbcGoStep.java
+++ /dev/null
@@ -1,181 +0,0 @@
-/* Copyright (c) 2021 vesoft inc. All rights reserved.
- *
- * This source code is licensed under Apache 2.0 License,
- * attached with Common Clause Condition 1.0, found in the LICENSES directory.
- */
-
-package com.vesoft;
-
-import com.vesoft.nebula.client.graph.NebulaPoolConfig;
-import com.vesoft.nebula.client.graph.data.HostAddress;
-import com.vesoft.nebula.client.graph.data.ResultSet;
-import com.vesoft.nebula.client.graph.net.NebulaPool;
-import com.vesoft.nebula.client.graph.net.Session;
-
-import java.util.Arrays;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.jmeter.protocol.java.sampler.AbstractJavaSamplerClient;
-import org.slf4j.Logger;
-import org.apache.jmeter.config.Arguments;
-import org.apache.jmeter.protocol.java.sampler.JavaSamplerContext;
-import org.apache.jmeter.samplers.SampleResult;
-
-/**
- * LDBC Go Step
- */
-public class LdbcGoStep extends AbstractJavaSamplerClient {
-
- private final Logger log = getNewLogger();
- private NebulaPool pool = null;
- private Session session = null;
- private Integer maxVars = 20;
-
-
- @Override
- public Arguments getDefaultParameters() {
- Arguments arguments = new Arguments();
- arguments.addArgument("hosts", "127.0.0.1:9669");
- arguments.addArgument("maxconn", "10");
- arguments.addArgument("user", "root");
- arguments.addArgument("pwd", "nebula");
- arguments.addArgument("space", "");
- arguments.addArgument("nGQL", "yield 1");
- arguments.addArgument("person", "");
- return arguments;
- }
-
-
- public void initNebulaPool(String hosts, int maxconn, int id) {
- pool = new NebulaPool();
- try {
- List addresses = new ArrayList();
- NebulaPoolConfig nebulaPoolConfig = new NebulaPoolConfig();
- nebulaPoolConfig.setMaxConnSize(maxconn);
- List host_list = new ArrayList(Arrays.asList(hosts.split(",")));
- if (host_list == null) {
- log.error("host_list is null!");
-
- }
-
- String host = host_list.get(id % host_list.size());
- String[] splits = host.split(":");
- addresses.add(new HostAddress(splits[0], Integer.parseInt(splits[1])));
- boolean init = pool.init(addresses, nebulaPoolConfig);
- if (init != true) {
- if (pool != null) {
- pool.close();
- }
- log.info("pool init failed!");
-
- }
- } catch (Exception e) {
- e.printStackTrace();
- if (pool != null) {
- pool.close();
- }
- log.error("pool init failed, error message is ", e);
-
- } finally {
- log.info(String.format("initNebulaPool success!"));
- }
- }
-
-
- @Override
- public void setupTest(JavaSamplerContext javaSamplerContext) {
- System.out.println("Perf thread start:" + Thread.currentThread().getName());
- String hosts = javaSamplerContext.getParameter("hosts");
- int maxconn = Integer.parseInt(javaSamplerContext.getParameter("maxconn").trim());
- String user = javaSamplerContext.getParameter("user");
- String pwd = javaSamplerContext.getParameter("pwd");
- String space = javaSamplerContext.getParameter("space");
- int id = javaSamplerContext.getJMeterContext().getThreadNum();
- initNebulaPool(hosts, maxconn, id);
- try {
- session = pool.getSession(user, pwd, false);
- if (session != null) {
- String use_space = "use " + space + ";";
- ResultSet resp = null;
- resp = session.execute(use_space);
- if (!resp.isSucceeded()) {
- System.out.println("Switch space failed:" + space + "\nError is " + resp.getErrorMessage());
- System.exit(1);
- }
- } else {
- log.info("getSession failed !");
- pool.close();
-
- }
- } catch (
- Exception e) {
- log.error(e.getMessage());
- if (session != null) {
- session.release();
- }
- if (pool != null) {
- pool.close();
- }
- } finally {
- log.info(String.format("setupTest success!"));
- }
-
- }
-
- @Override
- public SampleResult runTest(JavaSamplerContext javaSamplerContext) {
- String nGQL = javaSamplerContext.getParameter("nGQL");
- for (int i=0;i
-
- jar-with-dependencies
-
- jar
-
- false
-
-
- /
- true
- true
- runtime
-
- org.apache.logging.log4j:log4j-slf4j-impl
- org.slf4j:slf4j-log4j12
-
-
-
-
-
\ No newline at end of file