Skip to content

Commit

Permalink
[#113]: chore: manage pool of workers directly in the plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
rustatian authored Aug 30, 2024
2 parents 0a829f4 + cb9fc70 commit d40870b
Show file tree
Hide file tree
Showing 10 changed files with 97 additions and 26 deletions.
3 changes: 1 addition & 2 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
- errcheck # Errcheck is a program for checking for unchecked errors in go programs. These unchecked errors can be critical bugs in some cases
- errorlint # find code that will cause problems with the error wrapping scheme introduced in Go 1.13
- exhaustive # check exhaustiveness of enum switch statements
- exportloopref # checks for pointers to enclosing loop variables
- copyloopvar # checks for pointers to enclosing loop variables
- gochecknoglobals # Checks that no globals are present in Go code
- goconst # Finds repeated strings that could be replaced by a constant
- gocritic # The most opinionated Go source code linter
Expand All @@ -50,7 +50,6 @@ linters: # All available linters list: <https://golangci-lint.run/usage/linters/
- misspell # Finds commonly misspelled English words in comments
- nakedret # Finds naked returns in functions greater than a specified function length
- nestif # Reports deeply nested if statements
- nlreturn # checks for a new line before return and branch statements to increase code clarity
- noctx # finds sending http request without context.Context
- nolintlint # Reports ill-formed or insufficient nolint directives
- prealloc # Finds slice declarations that could potentially be preallocated
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/common v0.57.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/roadrunner-server/events v1.0.1 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
Expand All @@ -46,6 +46,6 @@ require (
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240826202546-f6391c0de4c7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/common v0.57.0 h1:Ro/rKjwdq9mZn1K5QPctzh+MA4Lp0BuYk5ZZEVhoNcY=
github.com/prometheus/common v0.57.0/go.mod h1:7uRPFSUTbfZWsJ7MHY56sqt7hLQu3bxXHDnNhl8E9qI=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/roadrunner-server/api/v4 v4.16.0 h1:UaaKWHelc7bZC4cRdTD802gyIrJFRFTPEk7Bt3U01qI=
Expand Down Expand Up @@ -75,6 +77,8 @@ golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240826202546-f6391c0de4c7 h1:2035KHhUv+EpyB+hWgJnaWKJOdX1E95w2S8Rr4uWKTs=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240826202546-f6391c0de4c7/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed h1:J6izYgfBXAI3xTKLgxzTmUltdYaLsuBxFCgDHWJ/eXg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c=
google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
Expand Down
40 changes: 40 additions & 0 deletions go.work.sum

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ func (p *Plugin) Stop(ctx context.Context) error {
stCh := make(chan struct{}, 1)
go func() {
p.mu.Lock()
p.gRPCServer.Stop()
p.gRPCServer.GracefulStop()
p.pool.Destroy(ctx)
p.mu.Unlock()
stCh <- struct{}{}
}()
Expand Down
7 changes: 7 additions & 0 deletions proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ func (p *Proxy) Connect(ctx context.Context, request *centrifugov1.ConnectReques
return nil, err
}

p.log.Debug("finished connect proxy request")
return cr, nil
}

Expand Down Expand Up @@ -87,6 +88,7 @@ func (p *Proxy) Refresh(ctx context.Context, request *centrifugov1.RefreshReques
return nil, err
}

p.log.Debug("finished refresh proxy request")
return rr, nil
}

Expand Down Expand Up @@ -124,6 +126,7 @@ func (p *Proxy) Subscribe(ctx context.Context, request *centrifugov1.SubscribeRe
return nil, err
}

p.log.Debug("finished subscribe proxy request")
return sr, nil
}

Expand Down Expand Up @@ -161,6 +164,7 @@ func (p *Proxy) Publish(ctx context.Context, request *centrifugov1.PublishReques
return nil, err
}

p.log.Debug("finished publish proxy request")
return pr, nil
}

Expand Down Expand Up @@ -198,6 +202,7 @@ func (p *Proxy) RPC(ctx context.Context, request *centrifugov1.RPCRequest) (*cen
return nil, err
}

p.log.Debug("finished RPC proxy request")
return rresp, nil
}

Expand Down Expand Up @@ -235,6 +240,7 @@ func (p *Proxy) SubRefresh(ctx context.Context, request *centrifugov1.SubRefresh
return nil, err
}

p.log.Debug("finished RPC SubRefresh request")
return rresp, nil
}

Expand Down Expand Up @@ -272,6 +278,7 @@ func (p *Proxy) NotifyChannelState(ctx context.Context, request *centrifugov1.No
return nil, err
}

p.log.Debug("finished NotifyChannelState request")
return rresp, nil
}

Expand Down
19 changes: 14 additions & 5 deletions tests/centrifugo_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package centrifugo

import (
"context"
"io"
"log/slog"
"net/http"
Expand Down Expand Up @@ -34,10 +35,15 @@ func TestCentrifugoPluginInit(t *testing.T) {
cfg := &config.Plugin{
Version: "2023.3.0",
Path: "configs/.rr-centrifugo-init.yaml",
Prefix: "rr",
}

cmd := exec.Command("env/centrifugo", "--config", "env/config.json", "--admin")
var cmd *exec.Cmd
if runtime.GOOS == "darwin" {
cmd = exec.Command("env/centrifugo_mac", "--config", "env/config.json", "--admin")
} else {
cmd = exec.Command("env/centrifugo", "--config", "env/config.json", "--admin")
}

err := cmd.Start()
require.NoError(t, err)

Expand Down Expand Up @@ -89,13 +95,15 @@ func TestCentrifugoPluginInit(t *testing.T) {
if err != nil {
assert.FailNow(t, "error", err.Error())
}

return
case <-stopCh:
// timeout
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}

return
}
}
Expand Down Expand Up @@ -139,7 +147,6 @@ func TestCentrifugoStatusChecks(t *testing.T) {
cfg := &config.Plugin{
Version: "2023.3.0",
Path: "configs/.rr-centrifugo-status.yaml",
Prefix: "rr",
}

var cmd *exec.Cmd
Expand Down Expand Up @@ -200,13 +207,15 @@ func TestCentrifugoStatusChecks(t *testing.T) {
if err != nil {
assert.FailNow(t, "error", err.Error())
}

return
case <-stopCh:
// timeout
err = cont.Stop()
if err != nil {
assert.FailNow(t, "error", err.Error())
}

return
}
}
Expand All @@ -216,7 +225,7 @@ func TestCentrifugoStatusChecks(t *testing.T) {
client := &http.Client{
Timeout: time.Second * 10,
}
req, err := http.NewRequest("GET", "http://127.0.0.1:35544/health?plugin=centrifuge", nil)
req, err := http.NewRequestWithContext(context.Background(), http.MethodGet, "http://127.0.0.1:35544/health?plugin=centrifuge", nil)
require.NoError(t, err)

resp, err := client.Do(req)
Expand All @@ -228,7 +237,7 @@ func TestCentrifugoStatusChecks(t *testing.T) {
assert.Equal(t, http.StatusOK, resp.StatusCode)
_ = resp.Body.Close()

req, err = http.NewRequest("GET", "http://127.0.0.1:35544/ready?plugin=centrifuge", nil)
req, err = http.NewRequestWithContext(context.Background(), http.MethodGet, "http://127.0.0.1:35544/ready?plugin=centrifuge", nil)
require.NoError(t, err)

resp, err = client.Do(req)
Expand Down
14 changes: 8 additions & 6 deletions tests/go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
module tests

go 1.23.0
go 1.23

toolchain go1.23.0

require (
github.com/centrifugal/centrifuge-go v0.10.2
Expand All @@ -9,7 +11,7 @@ require (
github.com/roadrunner-server/endure/v2 v2.5.0
github.com/roadrunner-server/logger/v5 v5.0.2
github.com/roadrunner-server/rpc/v5 v5.0.2
github.com/roadrunner-server/server/v5 v5.0.2
github.com/roadrunner-server/server/v5 v5.1.0
github.com/roadrunner-server/status/v5 v5.0.2
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
Expand Down Expand Up @@ -40,11 +42,11 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pelletier/go-toml/v2 v2.2.3 // indirect
github.com/planetscale/vtprotobuf v0.6.0 // indirect
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_golang v1.20.2 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.55.0 // indirect
github.com/prometheus/common v0.57.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/roadrunner-server/api/v4 v4.16.0 // indirect
github.com/roadrunner-server/errors v1.4.1 // indirect
Expand Down Expand Up @@ -75,8 +77,8 @@ require (
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.24.0 // indirect
golang.org/x/text v0.17.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240826202546-f6391c0de4c7 // indirect
google.golang.org/grpc v1.65.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect
google.golang.org/grpc v1.66.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.2.1 // indirect
Expand Down
20 changes: 10 additions & 10 deletions tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,16 @@ github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/pelletier/go-toml/v2 v2.2.3 h1:YmeHyLY8mFWbdkNWwpr+qIL2bEqT0o95WSdkNHvL12M=
github.com/pelletier/go-toml/v2 v2.2.3/go.mod h1:MfCQTFTvCcUyyvvwm1+G6H/jORL20Xlb6rzQu9GuUkc=
github.com/planetscale/vtprotobuf v0.6.0 h1:nBeETjudeJ5ZgBHUz1fVHvbqUKnYOXNhsIEabROxmNA=
github.com/planetscale/vtprotobuf v0.6.0/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v1.20.2 h1:5ctymQzZlyOON1666svgwn3s6IKWgfbjsejTMiXIyjg=
github.com/prometheus/client_golang v1.20.2/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E=
github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY=
github.com/prometheus/common v0.55.0 h1:KEi6DK7lXW/m7Ig5i47x0vRzuBsHuvJdi5ee6Y3G1dc=
github.com/prometheus/common v0.55.0/go.mod h1:2SECS4xJG1kd8XF9IcM1gMX6510RAEL65zxzNImwdc8=
github.com/prometheus/common v0.57.0 h1:Ro/rKjwdq9mZn1K5QPctzh+MA4Lp0BuYk5ZZEVhoNcY=
github.com/prometheus/common v0.57.0/go.mod h1:7uRPFSUTbfZWsJ7MHY56sqt7hLQu3bxXHDnNhl8E9qI=
github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/roadrunner-server/api/v4 v4.16.0 h1:UaaKWHelc7bZC4cRdTD802gyIrJFRFTPEk7Bt3U01qI=
Expand All @@ -87,8 +87,8 @@ github.com/roadrunner-server/pool v1.0.1 h1:biseQtnu1I/s0m2QolPQwH/9FhWIb2UgBy1O
github.com/roadrunner-server/pool v1.0.1/go.mod h1:4/fbpcesO2+W9oBbE0CRTE55FUdi8zZA28GvG1F1PWE=
github.com/roadrunner-server/rpc/v5 v5.0.2 h1:0tGgt/W6PMpkcTCrbZaddTL7ROd6DbnZ5BZ0pruXX/s=
github.com/roadrunner-server/rpc/v5 v5.0.2/go.mod h1:Wu73T8+sQnzU+KX+0ywC4vRPnQeM+rbOB4q3amqikd4=
github.com/roadrunner-server/server/v5 v5.0.2 h1:HNvd7rYLgM2s6fEBG7XFnygtEfIIJRgW27+afmbGfrU=
github.com/roadrunner-server/server/v5 v5.0.2/go.mod h1:pB2DwMsMElTw63KYH82kFTC7Vwcmy+PP8XfUcmBEcR4=
github.com/roadrunner-server/server/v5 v5.1.0 h1:3iMipJKvwQaxKwmdW1Zgb5G37eQ0egu1k9Lt4UyPXwI=
github.com/roadrunner-server/server/v5 v5.1.0/go.mod h1:/md/RxY/i0Waw7JOIxZCx2xcvORmLGwnZyyXPkqpNYU=
github.com/roadrunner-server/status/v5 v5.0.2 h1:mXdlMTwWZ819DtaOW0L/fAcDYhGCCk7yT5lLHBQ1OQk=
github.com/roadrunner-server/status/v5 v5.0.2/go.mod h1:+7gbGKtx+hpLtxu/aX8w2AfRh9GMOjCWWlQwpSF0Abw=
github.com/roadrunner-server/tcplisten v1.5.1 h1:CESLjVtnL0LEVaazzwEKUyQx84IcAgOQkdomA2+YHnY=
Expand Down Expand Up @@ -151,10 +151,10 @@ golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg=
golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240826202546-f6391c0de4c7 h1:2035KHhUv+EpyB+hWgJnaWKJOdX1E95w2S8Rr4uWKTs=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240826202546-f6391c0de4c7/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.65.0 h1:bs/cUb4lp1G5iImFFd3u5ixQzweKizoZJAwBNLR42lc=
google.golang.org/grpc v1.65.0/go.mod h1:WgYC2ypjlB0EiQi6wdKixMqukr6lBc0Vo+oOgjrM5ZQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed h1:J6izYgfBXAI3xTKLgxzTmUltdYaLsuBxFCgDHWJ/eXg=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed/go.mod h1:UqMtugtsSgubUsoxbuAoiCXvqvErP7Gf0so0mK9tHxU=
google.golang.org/grpc v1.66.0 h1:DibZuoBznOxbDQxRINckZcUvnCEvrW9pcWIE2yF9r1c=
google.golang.org/grpc v1.66.0/go.mod h1:s3/l6xSSCURdVfAnL+TqCNMyTDAGN6+lZeVxnZR128Y=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
9 changes: 9 additions & 0 deletions tests/php_test_files/centrifuge_connect.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@

continue;
}
if ($request instanceof Request\Subscribe) {
try {
$request->respond(new Payload\SubscribeResponse());
} catch (\Throwable $e) {
$request->error($e->getCode(), $e->getMessage());
}

continue;
}


if ($request instanceof Request\Connect) {
Expand Down

0 comments on commit d40870b

Please sign in to comment.