Skip to content

Commit

Permalink
feat: support http2 (vesoft-inc#246)
Browse files Browse the repository at this point in the history
* feat: support http2

* fix: use vesoft-inc/fbthrift
  • Loading branch information
veezhang authored Feb 1, 2023
1 parent f38f53c commit c284022
Show file tree
Hide file tree
Showing 15 changed files with 350 additions and 159 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
goVer: ['1.13', '1.17', '1.18']
goVer: ['1.16', '1.17', '1.18']
steps:
- uses: actions/checkout@v2
- name: Setup go ${{ matrix.goVer }}
Expand All @@ -34,7 +34,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
goVer: ['1.13', '1.17', '1.18']
goVer: ['1.16', '1.17', '1.18']
steps:
- uses: actions/checkout@v2
- name: Setup go ${{ matrix.goVer }}
Expand All @@ -59,7 +59,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
goVer: ['1.13', '1.17', '1.18']
goVer: ['1.16', '1.17', '1.18']
steps:
- uses: actions/checkout@v2
- name: Setup go ${{ matrix.goVer }}
Expand Down
10 changes: 5 additions & 5 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func logoutAndClose(conn *connection, sessionID int64) {
func TestConnection(t *testing.T) {
hostAddress := HostAddress{Host: address, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -122,7 +122,7 @@ func TestConnection(t *testing.T) {
func TestConnectionIPv6(t *testing.T) {
hostAddress := HostAddress{Host: addressIPv6, Port: port}
conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestAuthentication(t *testing.T) {
hostAddress := HostAddress{Host: address, Port: port}

conn := newConnection(hostAddress)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
t.Fatalf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -1405,7 +1405,7 @@ func prepareSpace(spaceName string) error {
conn := newConnection(hostAddress)
testPoolConfig := GetDefaultConf()

err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
return fmt.Errorf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down Expand Up @@ -1442,7 +1442,7 @@ func dropSpace(spaceName string) error {
conn := newConnection(hostAddress)
testPoolConfig := GetDefaultConf()

err := conn.open(hostAddress, testPoolConfig.TimeOut, nil)
err := conn.open(hostAddress, testPoolConfig.TimeOut, nil, false)
if err != nil {
return fmt.Errorf("fail to open connection, address: %s, port: %d, %s", address, port, err.Error())
}
Expand Down
11 changes: 11 additions & 0 deletions configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type PoolConfig struct {
MaxConnPoolSize int
// The min connections in pool for all addresses
MinConnPoolSize int
// UseHTTP2 indicates whether to use HTTP2
UseHTTP2 bool
}

// validateConf validates config
Expand Down Expand Up @@ -58,6 +60,7 @@ func GetDefaultConf() PoolConfig {
IdleTime: 0 * time.Millisecond,
MaxConnPoolSize: 10,
MinConnPoolSize: 0,
UseHTTP2: false,
}
}

Expand Down Expand Up @@ -127,6 +130,8 @@ type SessionPoolConf struct {
maxSize int
// The min sessions in pool for all addresses
minSize int
// useHTTP2 indicates whether to use HTTP2
useHTTP2 bool
}

type SessionPoolConfOption func(*SessionPoolConf)
Expand Down Expand Up @@ -190,6 +195,12 @@ func WithMinSize(minSize int) SessionPoolConfOption {
}
}

func WithHTTP2(useHTTP2 bool) SessionPoolConfOption {
return func(conf *SessionPoolConf) {
conf.useHTTP2 = useHTTP2
}
}

func (conf *SessionPoolConf) checkMandatoryFields() error {
// Check mandatory fields
if conf.username == "" {
Expand Down
71 changes: 55 additions & 16 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,23 +9,27 @@
package nebula_go

import (
"context"
"crypto/tls"
"fmt"
"math"
"net"
"net/http"
"strconv"
"time"

"github.com/facebook/fbthrift/thrift/lib/go/thrift"
"github.com/vesoft-inc/nebula-go/v3/nebula"
"github.com/vesoft-inc/nebula-go/v3/nebula/graph"
"golang.org/x/net/http2"
)

type connection struct {
severAddress HostAddress
timeout time.Duration
returnedAt time.Time // the connection was created or returned.
sslConfig *tls.Config
useHTTP2 bool
graph *graph.GraphServiceClient
}

Expand All @@ -41,28 +45,63 @@ func newConnection(severAddress HostAddress) *connection {

// open opens a transport for the connection
// if sslConfig is not nil, an SSL transport will be created
func (cn *connection) open(hostAddress HostAddress, timeout time.Duration, sslConfig *tls.Config) error {
func (cn *connection) open(hostAddress HostAddress, timeout time.Duration, sslConfig *tls.Config, useHTTP2 bool) error {
ip := hostAddress.Host
port := hostAddress.Port
newAdd := net.JoinHostPort(ip, strconv.Itoa(port))
cn.timeout = timeout
bufferSize := 128 << 10
frameMaxLength := uint32(math.MaxUint32)

var err error
var sock thrift.Transport
if sslConfig != nil {
sock, err = thrift.NewSSLSocketTimeout(newAdd, sslConfig, timeout)
cn.useHTTP2 = useHTTP2

var (
err error
transport thrift.Transport
)
if useHTTP2 {
if sslConfig != nil {
transport, err = thrift.NewHTTPPostClientWithOptions("https://"+newAdd, thrift.HTTPClientOptions{
Client: &http.Client{
Transport: &http2.Transport{
TLSClientConfig: sslConfig,
},
},
})
} else {
transport, err = thrift.NewHTTPPostClientWithOptions("http://"+newAdd, thrift.HTTPClientOptions{
Client: &http.Client{
Transport: &http2.Transport{
// So http2.Transport doesn't complain the URL scheme isn't 'https'
AllowHTTP: true,
// Pretend we are dialing a TLS endpoint. (Note, we ignore the passed tls.Config)
DialTLSContext: func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error) {
_ = cfg
var d net.Dialer
return d.DialContext(ctx, network, addr)
},
},
},
})
}
if err != nil {
return fmt.Errorf("failed to create a net.Conn-backed Transport,: %s", err.Error())
}
} else {
sock, err = thrift.NewSocket(thrift.SocketAddr(newAdd), thrift.SocketTimeout(timeout))
}
if err != nil {
return fmt.Errorf("failed to create a net.Conn-backed Transport,: %s", err.Error())
bufferSize := 128 << 10
frameMaxLength := uint32(math.MaxUint32)

var sock thrift.Transport
if sslConfig != nil {
sock, err = thrift.NewSSLSocketTimeout(newAdd, sslConfig, timeout)
} else {
sock, err = thrift.NewSocket(thrift.SocketAddr(newAdd), thrift.SocketTimeout(timeout))
}
if err != nil {
return fmt.Errorf("failed to create a net.Conn-backed Transport,: %s", err.Error())
}
// Set transport buffer
bufferedTranFactory := thrift.NewBufferedTransportFactory(bufferSize)
transport = thrift.NewFramedTransportMaxLength(bufferedTranFactory.GetTransport(sock), frameMaxLength)
}

// Set transport buffer
bufferedTranFactory := thrift.NewBufferedTransportFactory(bufferSize)
transport := thrift.NewFramedTransportMaxLength(bufferedTranFactory.GetTransport(sock), frameMaxLength)
pf := thrift.NewBinaryProtocolFactoryDefault()
cn.graph = graph.NewGraphServiceClientFactory(transport, pf)
if err = cn.graph.Open(); err != nil {
Expand Down Expand Up @@ -93,7 +132,7 @@ func (cn *connection) verifyClientVersion() error {
// When the timeout occurs, the connection will be reopened to avoid the impact of the message.
func (cn *connection) reopen() error {
cn.close()
return cn.open(cn.severAddress, cn.timeout, cn.sslConfig)
return cn.open(cn.severAddress, cn.timeout, cn.sslConfig, cn.useHTTP2)
}

// Authenticate
Expand Down
16 changes: 8 additions & 8 deletions connection_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func NewSslConnectionPool(addresses []HostAddress, conf PoolConfig, sslConfig *t

// initPool initializes the connection pool
func (pool *ConnectionPool) initPool() error {
if err := checkAddresses(pool.conf.TimeOut, pool.addresses, pool.sslConfig); err != nil {
if err := checkAddresses(pool.conf.TimeOut, pool.addresses, pool.sslConfig, pool.conf.UseHTTP2); err != nil {
return fmt.Errorf("failed to open connection, error: %s ", err.Error())
}

Expand All @@ -79,7 +79,7 @@ func (pool *ConnectionPool) initPool() error {
newConn := newConnection(pool.addresses[i%len(pool.addresses)])

// Open connection to host
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig); err != nil {
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig, pool.conf.UseHTTP2); err != nil {
// If initialization failed, clean idle queue
idleLen := pool.idleConnectionQueue.Len()
for i := 0; i < idleLen; i++ {
Expand Down Expand Up @@ -191,7 +191,7 @@ func (pool *ConnectionPool) release(conn *connection) {

// Ping checks availability of host
func (pool *ConnectionPool) Ping(host HostAddress, timeout time.Duration) error {
return pingAddress(host, timeout, pool.sslConfig)
return pingAddress(host, timeout, pool.sslConfig, pool.conf.UseHTTP2)
}

// Close closes all connection
Expand Down Expand Up @@ -242,7 +242,7 @@ func (pool *ConnectionPool) newConnToHost() (*connection, error) {
host := pool.getHost()
newConn := newConnection(host)
// Open connection to host
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig); err != nil {
if err := newConn.open(newConn.severAddress, pool.conf.TimeOut, pool.sslConfig, pool.conf.UseHTTP2); err != nil {
return nil, err
}
// Add connection to active queue
Expand Down Expand Up @@ -349,24 +349,24 @@ func (pool *ConnectionPool) timeoutConnectionList() (closing []*connection) {
// checkAddresses checks addresses availability
// It opens a temporary connection to each address and closes it immediately.
// If no error is returned, the addresses are available.
func checkAddresses(confTimeout time.Duration, addresses []HostAddress, sslConfig *tls.Config) error {
func checkAddresses(confTimeout time.Duration, addresses []HostAddress, sslConfig *tls.Config, useHTTP2 bool) error {
var timeout = 3 * time.Second
if confTimeout != 0 && confTimeout < timeout {
timeout = confTimeout
}
for _, address := range addresses {
if err := pingAddress(address, timeout, sslConfig); err != nil {
if err := pingAddress(address, timeout, sslConfig, useHTTP2); err != nil {
return err
}
}
return nil
}

func pingAddress(address HostAddress, timeout time.Duration, sslConfig *tls.Config) error {
func pingAddress(address HostAddress, timeout time.Duration, sslConfig *tls.Config, useHTTP2 bool) error {
newConn := newConnection(address)
defer newConn.close()
// Open connection to host
if err := newConn.open(newConn.severAddress, timeout, sslConfig); err != nil {
if err := newConn.open(newConn.severAddress, timeout, sslConfig, useHTTP2); err != nil {
return err
}
return nil
Expand Down
20 changes: 19 additions & 1 deletion examples/basic_example/graph_client_basic_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package main

import (
"crypto/tls"
"fmt"

nebula "github.com/vesoft-inc/nebula-go/v3"
Expand All @@ -21,6 +22,8 @@ const (
port = 3699
username = "root"
password = "nebula"
useSSL = false
useHTTP2 = false
)

// Initialize logger
Expand All @@ -31,9 +34,24 @@ func main() {
hostList := []nebula.HostAddress{hostAddress}
// Create configs for connection pool using default values
testPoolConfig := nebula.GetDefaultConf()
testPoolConfig.UseHTTP2 = useHTTP2

var sslConfig *tls.Config
if useSSL {
var err error
sslConfig, err = nebula.GetDefaultSSLConfig(
"./nebula-docker-compose/secrets/test.ca.pem",
"./nebula-docker-compose/secrets/test.client.crt",
"./nebula-docker-compose/secrets/test.client.key",
)
if err != nil {
log.Fatal("Fail to create ssl config")
}
sslConfig.InsecureSkipVerify = true
}

// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
pool, err := nebula.NewSslConnectionPool(hostList, testPoolConfig, sslConfig, log)
if err != nil {
log.Fatal(fmt.Sprintf("Fail to initialize the connection pool, host: %s, port: %d, %s", address, port, err.Error()))
}
Expand Down
20 changes: 19 additions & 1 deletion examples/gorountines_example/graph_client_goroutines_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package main

import (
"crypto/tls"
"fmt"
"strings"
"sync"
Expand All @@ -24,6 +25,8 @@ const (
port = 3699
username = "root"
password = "nebula"
useSSL = false
useHTTP2 = false
)

// Initialize logger
Expand All @@ -34,9 +37,24 @@ func main() {
hostList := []nebula.HostAddress{hostAddress}
// Create configs for connection pool using default values
testPoolConfig := nebula.GetDefaultConf()
testPoolConfig.UseHTTP2 = useHTTP2

var sslConfig *tls.Config
if useSSL {
var err error
sslConfig, err = nebula.GetDefaultSSLConfig(
"./nebula-docker-compose/secrets/test.ca.pem",
"./nebula-docker-compose/secrets/test.client.crt",
"./nebula-docker-compose/secrets/test.client.key",
)
if err != nil {
log.Fatal("Fail to create ssl config")
}
sslConfig.InsecureSkipVerify = true
}

// Initialize connection pool
pool, err := nebula.NewConnectionPool(hostList, testPoolConfig, log)
pool, err := nebula.NewSslConnectionPool(hostList, testPoolConfig, sslConfig, log)
if err != nil {
log.Fatal(fmt.Sprintf("Fail to initialize the connection pool, host: %s, port: %d, %s", address, port, err.Error()))
}
Expand Down
Loading

0 comments on commit c284022

Please sign in to comment.