Skip to content

Commit

Permalink
Merge pull request #10 from HarrisChu/add_ssl
Browse files Browse the repository at this point in the history
add ssl connection pool
  • Loading branch information
darionyaphet authored Dec 28, 2021
2 parents 3233291 + 429e9ef commit c237866
Show file tree
Hide file tree
Showing 8 changed files with 279 additions and 92 deletions.
237 changes: 155 additions & 82 deletions client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nebulagraph

import (
"crypto/tls"
"fmt"
"strconv"
"strings"
Expand All @@ -10,31 +11,78 @@ import (
nebula "github.com/vesoft-inc/nebula-go/v2"
)

type Data []string
type (
// NebulaPool nebula connection pool
NebulaPool struct {
HostList []nebula.HostAddress
Pool *nebula.ConnectionPool
Log nebula.Logger
DataChs []chan Data
OutoptCh chan []string
Version string
csvStrategy csvReaderStrategy
initialized bool
sessions []*nebula.Session
channelBufferSize int
sslconfig *sslConfig
mutex sync.Mutex
}

type Output struct {
TimeStamp int64
NGQL string
Latency int64
ResponseTime int32
IsSucceed bool
Rows int32
ErrorMsg string
}
// NebulaSession a wrapper for nebula session, could read data from DataCh
NebulaSession struct {
Session *nebula.Session
Pool *NebulaPool
DataCh chan Data
}

// Response a wrapper for nebula resultset
Response struct {
*nebula.ResultSet
ResponseTime int32
}

csvReaderStrategy int

sslConfig struct {
rootCAPath string
certPath string
privateKeyPath string
}

// Data data in csv file
Data []string

output struct {
timeStamp int64
nGQL string
latency int64
responseTime int32
isSucceed bool
rows int32
errorMsg string
}
)

const (
// AllInOne all the vus use the same DataCh
AllInOne csvReaderStrategy = iota
// Separate each vu has a seprate DataCh
Separate
)

func formatOutput(o *Output) []string {
func formatOutput(o *output) []string {
return []string{
strconv.FormatInt(o.TimeStamp, 10),
o.NGQL,
strconv.Itoa(int(o.Latency)),
strconv.Itoa(int(o.ResponseTime)),
strconv.FormatBool(o.IsSucceed),
strconv.Itoa(int(o.Rows)),
o.ErrorMsg,
strconv.FormatInt(o.timeStamp, 10),
o.nGQL,
strconv.Itoa(int(o.latency)),
strconv.Itoa(int(o.responseTime)),
strconv.FormatBool(o.isSucceed),
strconv.Itoa(int(o.rows)),
o.errorMsg,
}
}

var OutputHeader []string = []string{
var outputHeader []string = []string{
"timestamp",
"nGQL",
"latency",
Expand All @@ -44,37 +92,7 @@ var OutputHeader []string = []string{
"errorMsg",
}

type Response struct {
*nebula.ResultSet
ResponseTime int32
}
type CSVReaderStrategy int

const (
AllInOne CSVReaderStrategy = iota
Separate
)

type NebulaPool struct {
HostList []nebula.HostAddress
Pool *nebula.ConnectionPool
Log nebula.Logger
DataChs []chan Data
OutoptCh chan []string
Version string
csvStrategy CSVReaderStrategy
initialized bool
sessions []*nebula.Session
channelBufferSize int
mutex sync.Mutex
}

type NebulaSession struct {
Session *nebula.Session
Pool *NebulaPool
DataCh chan Data
}

// New for k6 initialization.
func New() *NebulaPool {
return &NebulaPool{
Log: nebula.DefaultLogger{},
Expand All @@ -83,57 +101,105 @@ func New() *NebulaPool {
}
}

// NewSSLConfig return sslConfig
func (np *NebulaPool) NewSSLConfig(rootCAPath, certPath, privateKeyPath string) {
np.sslconfig = &sslConfig{
rootCAPath: rootCAPath,
certPath: certPath,
privateKeyPath: privateKeyPath,
}
}

// Init init nebula pool with address and concurrent, by default the buffersize is 20000
func (np *NebulaPool) Init(address string, concurrent int) (*NebulaPool, error) {
return np.InitWithSize(address, concurrent, 20000)

}

// InitWithSize init nebula pool with channel buffer size
func (np *NebulaPool) InitWithSize(address string, concurrent int, size int) (*NebulaPool, error) {
if np.initialized {
return np, nil
}
np.mutex.Lock()
defer np.mutex.Unlock()
np.Log.Info("begin init the nebula pool")
np.sessions = make([]*nebula.Session, concurrent)
np.channelBufferSize = size
np.OutoptCh = make(chan []string, np.channelBufferSize)
np.initialized = true
var (
sslConfig *tls.Config
err error
pool *nebula.ConnectionPool
)

if np.sslconfig != nil {
sslConfig, err = nebula.GetDefaultSSLConfig(
np.sslconfig.rootCAPath,
np.sslconfig.certPath,
np.sslconfig.privateKeyPath,
)
if err != nil {
return nil, err
}
// skip insecure verification for stress testing.
sslConfig.InsecureSkipVerify = true
}
err = np.initAndVerifyPool(address, concurrent, size)
if err != nil {
return nil, err
}
conf := np.getDefaultConf(concurrent)
if sslConfig != nil {
pool, err = nebula.NewSslConnectionPool(np.HostList, *conf, sslConfig, np.Log)

} else {
pool, err = nebula.NewConnectionPool(np.HostList, *conf, np.Log)
}

if err != nil {
return nil, err
}
np.Pool = pool
np.Log.Info("finish init the pool")
np.initialized = true
return np, nil
}

func (np *NebulaPool) initAndVerifyPool(address string, concurrent int, size int) error {

addrs := strings.Split(address, ",")
var hosts []nebula.HostAddress
for _, addr := range addrs {
hostPort := strings.Split(addr, ":")
if len(hostPort) != 2 {
return nil, fmt.Errorf("Invalid address: %s", addr)
return fmt.Errorf("Invalid address: %s", addr)
}
port, err := strconv.Atoi(hostPort[1])
if err != nil {
return nil, err
return err
}
host := hostPort[0]
hostAddr := nebula.HostAddress{Host: host, Port: port}
hosts = append(hosts, hostAddr)

np.HostList = hosts
}
np.sessions = make([]*nebula.Session, concurrent)
np.channelBufferSize = size
np.OutoptCh = make(chan []string, np.channelBufferSize)
return nil
}

func (np *NebulaPool) getDefaultConf(concurrent int) *nebula.PoolConfig {
conf := nebula.PoolConfig{
TimeOut: 0,
IdleTime: 0,
MaxConnPoolSize: concurrent,
MinConnPoolSize: 1,
}
pool, err := nebula.NewConnectionPool(hosts, conf, np.Log)
if err != nil {
return nil, err
}

np.Log.Info("finish init the pool")
np.Pool = pool
np.initialized = true
return np, nil
return &conf
}

// ConfigCsvStrategy set csv reader strategy
func (np *NebulaPool) ConfigCsvStrategy(strategy int) {
np.csvStrategy = CSVReaderStrategy(strategy)
np.csvStrategy = csvReaderStrategy(strategy)
}

// ConfigCSV config the csv file to be read
func (np *NebulaPool) ConfigCSV(path, delimiter string, withHeader bool) error {
for _, dataCh := range np.DataChs {
reader := NewCsvReader(path, delimiter, withHeader, dataCh)
Expand All @@ -144,15 +210,19 @@ func (np *NebulaPool) ConfigCSV(path, delimiter string, withHeader bool) error {
return nil
}

// ConfigOutput config the output file, would write the execution outputs
func (np *NebulaPool) ConfigOutput(path string) error {
writer := NewCsvWriter(path, ",", OutputHeader, np.OutoptCh)
writer := NewCsvWriter(path, ",", outputHeader, np.OutoptCh)
if err := writer.WriteForever(); err != nil {
return err
}
return nil
}

// Close close the nebula pool
func (np *NebulaPool) Close() error {
np.mutex.Lock()
defer np.mutex.Unlock()
if !np.initialized {
return nil
}
Expand All @@ -162,11 +232,11 @@ func (np *NebulaPool) Close() error {
s.Release()
}
}
np.Pool.Close()
np.initialized = false
return nil
}

// GetSession get the session from pool
func (np *NebulaPool) GetSession(user, password string) (*NebulaSession, error) {
session, err := np.Pool.GetSession(user, password)
if err != nil {
Expand All @@ -176,12 +246,12 @@ func (np *NebulaPool) GetSession(user, password string) (*NebulaSession, error)
defer np.mutex.Unlock()
np.sessions = append(np.sessions, session)
s := &NebulaSession{Session: session, Pool: np}
s.PrepareCsvReader()
s.prepareCsvReader()

return s, nil
}

func (s *NebulaSession) PrepareCsvReader() error {
func (s *NebulaSession) prepareCsvReader() error {
np := s.Pool
if np.csvStrategy == AllInOne {
if len(np.DataChs) == 0 {
Expand All @@ -197,6 +267,7 @@ func (s *NebulaSession) PrepareCsvReader() error {
return nil
}

// GetData get data from csv reader
func (s *NebulaSession) GetData() (Data, error) {
if s.DataCh != nil && len(s.DataCh) != 0 {
if d, ok := <-s.DataCh; ok {
Expand All @@ -206,6 +277,7 @@ func (s *NebulaSession) GetData() (Data, error) {
return nil, fmt.Errorf("no Data at all")
}

// Execute execute nebula query
func (s *NebulaSession) Execute(stmt string) (*Response, error) {
start := time.Now()
rs, err := s.Session.Execute(stmt)
Expand All @@ -217,14 +289,14 @@ func (s *NebulaSession) Execute(stmt string) (*Response, error) {

// output
if s.Pool.OutoptCh != nil && len(s.Pool.OutoptCh) != cap(s.Pool.OutoptCh) {
o := &Output{
TimeStamp: start.Unix(),
NGQL: stmt,
Latency: rs.GetLatency(),
ResponseTime: responseTime,
IsSucceed: rs.IsSucceed(),
Rows: int32(rs.GetRowSize()),
ErrorMsg: rs.GetErrorMsg(),
o := &output{
timeStamp: start.Unix(),
nGQL: stmt,
latency: rs.GetLatency(),
responseTime: responseTime,
isSucceed: rs.IsSucceed(),
rows: int32(rs.GetRowSize()),
errorMsg: rs.GetErrorMsg(),
}
s.Pool.OutoptCh <- formatOutput(o)

Expand All @@ -233,6 +305,7 @@ func (s *NebulaSession) Execute(stmt string) (*Response, error) {
return &Response{ResultSet: rs, ResponseTime: responseTime}, nil
}

// GetResponseTime GetResponseTime
func (r *Response) GetResponseTime() int32 {
return r.ResponseTime
}
24 changes: 24 additions & 0 deletions example/cert/test.ca.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
-----BEGIN CERTIFICATE-----
MIIEGzCCAwOgAwIBAgIUDcmZFpL4PcdCXfLRBK8bR2vb39cwDQYJKoZIhvcNAQEL
BQAwgZwxCzAJBgNVBAYTAkNOMREwDwYDVQQIDAhaaGVqaWFuZzERMA8GA1UEBwwI
SGFuZ3pob3UxFDASBgNVBAoMC1Zlc29mdCBJbmMuMRAwDgYDVQQLDAdzZWN0aW9u
MRYwFAYDVQQDDA1zaHlsb2NrIGh1YW5nMScwJQYJKoZIhvcNAQkBFhhzaHlsb2Nr
Lmh1YW5nQHZlc29mdC5jb20wHhcNMjEwODE5MDkyNDQ3WhcNMjUwODE4MDkyNDQ3
WjCBnDELMAkGA1UEBhMCQ04xETAPBgNVBAgMCFpoZWppYW5nMREwDwYDVQQHDAhI
YW5nemhvdTEUMBIGA1UECgwLVmVzb2Z0IEluYy4xEDAOBgNVBAsMB3NlY3Rpb24x
FjAUBgNVBAMMDXNoeWxvY2sgaHVhbmcxJzAlBgkqhkiG9w0BCQEWGHNoeWxvY2su
aHVhbmdAdmVzb2Z0LmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEB
AMEAgpamCQHl+8JnUHI6/VmJHjDLYJLTliN/CwpFrhMqIVjJ8wG57WYLpXpn91Lz
eHu52LkVzcikybIJ2a+LOTvnhNFdbmTbqDtrb+s6wM/sO+nF6tU2Av4e5zhyKoeR
LL+rHMk3nymohbdN4djySFmOOU5A1O/4b0bZz4Ylu995kUawdiaEo13BzxxOC7Ik
Gge5RyDcm0uLXZqTAPy5Sjv/zpOyj0AqL1CJUH7XBN9OMRhVU0ZX9nHWl1vgLRld
J6XT17Y9QbbHhCNEdAmFE5kEFgCvZc+MungUYABlkvoj86TLmC/FMV6fWdxQssyd
hS+ssfJFLaTDaEFz5a/Tr48CAwEAAaNTMFEwHQYDVR0OBBYEFK0GVrQx+wX1GCHy
e+6fl4X+prmYMB8GA1UdIwQYMBaAFK0GVrQx+wX1GCHye+6fl4X+prmYMA8GA1Ud
EwEB/wQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAHqP8P+ZUHmngviHLSSN1ln5
Mx4BCkVeFRUaFx0yFXytV/iLXcG2HpFg3A9rAFoYgCDwi1xpsERnBZ/ShTv/eFOc
IxBY5yggx3/lGi8tAgvUdarhd7mQO67UJ0V4YU3hAkbnZ8grHHXj+4hfgUpY4ok6
yaed6HXwknBb9W8N1jZI8ginhkhjaeRCHdMiF+fBvNCtmeR1bCml1Uz7ailrpcaT
Mf84+5VYuFEnaRZYWFNsWNCOBlJ/6/b3V10vMXzMmYHqz3xgAq0M3fVTFTzopnAX
DLSzorL/dYVdqEDCQi5XI9YAlgWN4VeGzJI+glkLOCNzHxRNP6Qev+YI+7Uxz6I=
-----END CERTIFICATE-----
Loading

0 comments on commit c237866

Please sign in to comment.