Skip to content

Commit

Permalink
Update to version v3.15.0
Browse files Browse the repository at this point in the history
  • Loading branch information
graveart committed Apr 24, 2023
1 parent 5bd9170 commit ba30fcd
Show file tree
Hide file tree
Showing 171 changed files with 6,742 additions and 1,824 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/install_gtest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ git clone --branch release-1.12.1 https://github.com/google/googletest.git
cd googletest
mkdir build
cd build
cmake -DBUILD_GMOCK=OFF -DBUILD_GTEST=ON ..
cmake -DBUILD_GMOCK=ON -DBUILD_GTEST=ON ..
make -j4
sudo make install
cd ..
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,6 @@ __pycache__
*.so
.pytest_cache/
qa_tests/logs/

#ignore directory with compile commands
.cache
4 changes: 4 additions & 0 deletions bindings.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,10 @@ func WithConnPoolSize(connPoolSize int) interface{} {
return bindings.OptionConnPoolSize{ConnPoolSize: connPoolSize}
}

func WithConnPoolLoadBalancing(algorithm bindings.LoadBalancingAlgorithm) interface{} {
return bindings.OptionConnPoolLoadBalancing{Algorithm: algorithm}
}

func WithRetryAttempts(read int, write int) interface{} {
return bindings.OptionRetryAttempts{Read: read, Write: write}
}
Expand Down
44 changes: 30 additions & 14 deletions bindings/builtin/ctx_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,17 @@ var resultChPool = sync.Pool{
},
}

//Wrapper of 'chan ctxWatcherCmd' for correct processing of the closed chan if StopWatchOnCtx called after Finalize
type watcherTSWrapper struct {
ch chan ctxWatcherCmd
isClosed bool
mtx sync.RWMutex
}

//CtxWatcher perfomrs contexts canceling on expiration with some delay
type CtxWatcher struct {
resultChPool sync.Pool
cmdChArr []chan ctxWatcherCmd
cmdChArr []watcherTSWrapper
watchDelay time.Duration
}

Expand All @@ -39,7 +46,7 @@ type CCtxWrapper struct {
cCtx C.reindexer_ctx_info
goCtx context.Context
isCancelable bool
watcherCh chan ctxWatcherCmd
watcherCh *watcherTSWrapper
}

type ctxWatcherCmdCode int
Expand All @@ -59,10 +66,10 @@ func NewCtxWatcher(watchersPoolSize int, watchDelay time.Duration) *CtxWatcher {
if watchersPoolSize < 1 {
watchersPoolSize = defWatchersPoolSize
}
watcher := CtxWatcher{cmdChArr: make([]chan ctxWatcherCmd, watchersPoolSize), watchDelay: watchDelay}
watcher := CtxWatcher{cmdChArr: make([]watcherTSWrapper, watchersPoolSize), watchDelay: watchDelay}
for i := 0; i < watchersPoolSize; i++ {
watcher.cmdChArr[i] = make(chan ctxWatcherCmd, watcherChSize)
go watcher.watchRoutine(watcher.cmdChArr[i])
watcher.cmdChArr[i] = watcherTSWrapper{ch: make(chan ctxWatcherCmd, watcherChSize), isClosed: false}
go watcher.watchRoutine(watcher.cmdChArr[i].ch)
}
return &watcher
}
Expand Down Expand Up @@ -129,21 +136,22 @@ func (watcher *CtxWatcher) StartWatchOnCtx(ctx context.Context) (CCtxWrapper, er
}

watcherID := rand.Intn(len(watcher.cmdChArr))
watcher.cmdChArr[watcherID].mtx.RLock()
defer watcher.cmdChArr[watcherID].mtx.RUnlock()
ctxInfo := CCtxWrapper{
cCtx: C.reindexer_ctx_info{
ctx_id: C.uint64_t(ctxID),
exec_timeout: C.int64_t(execTimeout),
},
goCtx: ctx,
isCancelable: true,
watcherCh: watcher.cmdChArr[watcherID],
watcherCh: &watcher.cmdChArr[watcherID],
}

cmd := ctxWatcherCmd{
ctxInfo.watcherCh.ch <- ctxWatcherCmd{
ctx: ctxInfo,
cmdCode: cWCCAdd,
}
ctxInfo.watcherCh <- cmd

return ctxInfo, nil
}
Expand All @@ -162,11 +170,14 @@ func (watcher *CtxWatcher) StartWatchOnCtx(ctx context.Context) (CCtxWrapper, er
//StopWatchOnCtx removes context from watch queue
func (watcher *CtxWatcher) StopWatchOnCtx(ctxInfo CCtxWrapper) {
if ctxInfo.isCancelable {
cmd := ctxWatcherCmd{
ctx: ctxInfo,
cmdCode: cWCCRemove,
ctxInfo.watcherCh.mtx.RLock()
defer ctxInfo.watcherCh.mtx.RUnlock()
if !ctxInfo.watcherCh.isClosed {
ctxInfo.watcherCh.ch <- ctxWatcherCmd{
ctx: ctxInfo,
cmdCode: cWCCRemove,
}
}
ctxInfo.watcherCh <- cmd
}
}

Expand Down Expand Up @@ -219,8 +230,13 @@ func (watcher *CtxWatcher) watchRoutine(watchCh chan ctxWatcherCmd) {

//Finalize CtxWatcher
func (watcher *CtxWatcher) Finalize() error {
for _, ch := range watcher.cmdChArr {
close(ch)
for idx := range watcher.cmdChArr {
chWr := &watcher.cmdChArr[idx]

chWr.mtx.Lock()
defer chWr.mtx.Unlock()
chWr.isClosed = true
close(chWr.ch)
}
return nil
}
4 changes: 3 additions & 1 deletion bindings/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package bindings

const CInt32Max = int(^uint32(0) >> 1)

const ReindexerVersion = "v3.14.2"
const ReindexerVersion = "v3.15.0"

// public go consts from type_consts.h and reindexer_ctypes.h
const (
Expand Down Expand Up @@ -174,4 +174,6 @@ const (
ErrAlreadyProxied = 34
ErrStrictMode = 35
ErrQrUIDMissmatch = 36
ErrSystem = 37
ErrAssert = 38
)
66 changes: 33 additions & 33 deletions bindings/cproto/cproto.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (
)

const (
defConnPoolSize = 8
pingerTimeoutSec = 60
defAppName = "Go-connector"
defConnPoolSize = 8
defConnPoolLBAlgorithm = bindings.LBRoundRobin
pingerTimeoutSec = 60
defAppName = "Go-connector"

opRd = 0
opWr = 1
Expand Down Expand Up @@ -52,31 +53,13 @@ type NetCProto struct {
lock sync.RWMutex
}

type pool struct {
conns []*connection
next uint64
}

type dsn struct {
url []url.URL
connVersion int
connTry int
active int
}

func (p *pool) get() *connection {
id := atomic.AddUint64(&p.next, 1)

for id >= uint64(len(p.conns)) {
if atomic.CompareAndSwapUint64(&p.next, id, 0) {
id = 0
} else {
id = atomic.AddUint64(&p.next, 1)
}
}
return p.conns[id]
}

func (binding *NetCProto) getActiveDSN() *url.URL {
return &binding.dsn.url[binding.dsn.active]
}
Expand All @@ -88,24 +71,35 @@ func (binding *NetCProto) nextDSN() {

func (binding *NetCProto) Init(u []url.URL, options ...interface{}) (err error) {
connPoolSize := defConnPoolSize
connPoolLBAlgorithm := defConnPoolLBAlgorithm
binding.appName = defAppName

for _, option := range options {
switch v := option.(type) {
case bindings.OptionConnPoolSize:
connPoolSize = v.ConnPoolSize

case bindings.OptionConnPoolLoadBalancing:
connPoolLBAlgorithm = v.Algorithm

case bindings.OptionRetryAttempts:
binding.retryAttempts = v

case bindings.OptionTimeouts:
binding.timeouts = v

case bindings.OptionConnect:
binding.connectOpts = v

case bindings.OptionCompression:
binding.compression = v

case bindings.OptionAppName:
binding.appName = v.AppName

case bindings.OptionDedicatedThreads:
binding.dedicatedThreads = v

default:
fmt.Printf("Unknown cproto option: %#v\n", option)
}
Expand All @@ -123,34 +117,40 @@ func (binding *NetCProto) Init(u []url.URL, options ...interface{}) (err error)
}

binding.dsn.url = u
binding.connectDSN(context.Background(), connPoolSize)
binding.connectDSN(context.Background(), connPoolSize, connPoolLBAlgorithm)
binding.termCh = make(chan struct{})
go binding.pinger()
return
}

func (binding *NetCProto) newPool(ctx context.Context, connPoolSize int) error {
func (binding *NetCProto) newPool(ctx context.Context, connPoolSize int, connPoolLBAlgorithm bindings.LoadBalancingAlgorithm) error {
var wg sync.WaitGroup
for _, conn := range binding.pool.conns {
conn.Close()
}

binding.pool = pool{
conns: make([]*connection, connPoolSize),
conns: make([]*connection, connPoolSize),
lbAlgorithm: connPoolLBAlgorithm,
}

wg.Add(connPoolSize)
for i := 0; i < connPoolSize; i++ {
go func(binding *NetCProto, wg *sync.WaitGroup, i int) {
defer wg.Done()

conn, _ := newConnection(ctx, binding)
binding.pool.conns[i] = conn
}(binding, &wg, i)
}
wg.Wait()

for _, conn := range binding.pool.conns {
if conn.err != nil {
return conn.err
}
}

return nil
}

Expand Down Expand Up @@ -427,7 +427,7 @@ func (binding *NetCProto) Status(ctx context.Context) bindings.Status {

var err error
if activeConns == 0 {
_, err = binding.getConn(ctx)
_, err = binding.getConnection(ctx)
}

return bindings.Status{
Expand Down Expand Up @@ -467,10 +467,10 @@ func (binding *NetCProto) logMsg(level int, fmt string, msg ...interface{}) {
}
}

func (binding *NetCProto) getConn(ctx context.Context) (conn *connection, err error) {
func (binding *NetCProto) getConnection(ctx context.Context) (conn *connection, err error) {
for {
binding.lock.RLock()
conn = binding.pool.get()
conn = binding.pool.GetConnection()
currVersion := binding.dsn.connVersion
binding.lock.RUnlock()

Expand All @@ -495,7 +495,7 @@ func (binding *NetCProto) getConn(ctx context.Context) (conn *connection, err er
}
return conn, nil
} else {
conn = binding.pool.get()
conn = binding.pool.GetConnection()
binding.lock.Unlock()
if conn.hasError() {
continue
Expand All @@ -508,10 +508,10 @@ func (binding *NetCProto) getConn(ctx context.Context) (conn *connection, err er
}
}

func (binding *NetCProto) connectDSN(ctx context.Context, connPoolSize int) error {
func (binding *NetCProto) connectDSN(ctx context.Context, connPoolSize int, connPoolLBAlgorithm bindings.LoadBalancingAlgorithm) error {
errWrap := errors.New("failed to connect with provided dsn")
for i := 0; i < len(binding.dsn.url); i++ {
err := binding.newPool(ctx, connPoolSize)
err := binding.newPool(ctx, connPoolSize, connPoolLBAlgorithm)
if err != nil {
binding.nextDSN()
errWrap = fmt.Errorf("%s; %s", errWrap, err)
Expand All @@ -530,13 +530,13 @@ func (binding *NetCProto) reconnect(ctx context.Context) (conn *connection, err
binding.dsn.connTry++
}

err = binding.connectDSN(ctx, len(binding.pool.conns))
err = binding.connectDSN(ctx, len(binding.pool.conns), binding.pool.lbAlgorithm)
if err != nil {
time.Sleep(time.Duration(binding.dsn.connTry) * time.Millisecond)
} else {
binding.dsn.connTry = 0
}
conn = binding.pool.get()
conn = binding.pool.GetConnection()

return conn, err
}
Expand All @@ -551,7 +551,7 @@ func (binding *NetCProto) rpcCall(ctx context.Context, op int, cmd int, args ...
}
for i := 0; i < attempts; i++ {
var conn *connection
if conn, err = binding.getConn(ctx); err == nil {
if conn, err = binding.getConnection(ctx); err == nil {
if buf, err = conn.rpcCall(ctx, cmd, uint32(binding.timeouts.RequestTimeout/time.Second), args...); err == nil {
return
}
Expand Down
10 changes: 7 additions & 3 deletions bindings/cproto/cproto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package cproto

import (
"context"
"flag"
"fmt"
"net"
"net/url"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -15,6 +17,8 @@ import (
"github.com/restream/reindexer/v3/test/helpers"
)

var benchmarkSeed = flag.Int64("seed", time.Now().Unix(), "seed number for random")

func BenchmarkGetConn(b *testing.B) {
srv1 := helpers.TestServer{T: nil, RpcPort: "6651", HttpPort: "9951", DbName: "cproto"}
if err := srv1.Run(); err != nil {
Expand All @@ -34,7 +38,7 @@ func BenchmarkGetConn(b *testing.B) {
var conn *connection
ctx := context.Background()
for i := 0; i < b.N; i++ {
conn, err = binding.getConn(ctx)
conn, err = binding.getConnection(ctx)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -75,7 +79,7 @@ func TestCprotoPool(t *testing.T) {

conns := make(map[*connection]bool)
for i := 0; i < defConnPoolSize; i++ {
conn, err := c.getConn(context.Background())
conn, err := c.getConnection(context.Background())
require.NoError(t, err)
if _, ok := conns[conn]; ok {
t.Fatalf("getConn not rotate conn")
Expand All @@ -84,7 +88,7 @@ func TestCprotoPool(t *testing.T) {
}

// return anew from the pool
conn, err := c.getConn(context.Background())
conn, err := c.getConnection(context.Background())
require.NoError(t, err)
if _, ok := conns[conn]; !ok {
t.Fatalf("getConn not rotate conn")
Expand Down
Loading

0 comments on commit ba30fcd

Please sign in to comment.