Skip to content

Commit

Permalink
[IMPROVED] Use faster gha runners and fix leaking goroutines in Servi…
Browse files Browse the repository at this point in the history
…ce API (#1794)

Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio authored Feb 4, 2025
1 parent faec055 commit b1be9bf
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 6 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ on:

jobs:
lint:
runs-on: ubuntu-latest
runs-on: ubuntu-latest-8-cores
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand Down Expand Up @@ -37,7 +37,7 @@ jobs:
golangci-lint run --timeout 5m0s ./jetstream/...
test:
runs-on: ubuntu-latest
runs-on: ubuntu-latest-8-cores

strategy:
matrix:
Expand Down
2 changes: 1 addition & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ golang.org/x/crypto v0.31.0/go.mod h1:kDsLvtWBEx7MV9tJOj9bnXsPbxwJQ6csT/x4KIN4Ss
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ=
22 changes: 19 additions & 3 deletions micro/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ type (

asyncCallbacksHandler struct {
cbQueue chan func()
closed bool
}
)

Expand Down Expand Up @@ -351,6 +352,7 @@ func AddService(nc *nats.Conn, config Config) (Service, error) {
opts = append(opts, WithEndpointQueueGroup(config.QueueGroup))
}
if err := svc.AddEndpoint("default", config.Endpoint.Handler, opts...); err != nil {
svc.asyncDispatcher.close()
return nil, err
}
}
Expand Down Expand Up @@ -462,8 +464,8 @@ func (s *service) AddGroup(name string, opts ...GroupOpt) Group {
// dispatch is responsible for calling any async callbacks
func (ac *asyncCallbacksHandler) run() {
for {
f := <-ac.cbQueue
if f == nil {
f, ok := <-ac.cbQueue
if !ok || f == nil {
return
}
f()
Expand All @@ -476,7 +478,11 @@ func (ac *asyncCallbacksHandler) push(f func()) {
}

func (ac *asyncCallbacksHandler) close() {
if ac.closed {
return
}
close(ac.cbQueue)
ac.closed = true
}

func (c *Config) valid() error {
Expand Down Expand Up @@ -565,6 +571,9 @@ func (s *service) wrapConnectionEventCallbacks() {
}

func unwrapConnectionEventCallbacks(nc *nats.Conn, handlers handlers) {
if nc.IsClosed() {
return
}
nc.SetClosedHandler(handlers.closed)
nc.SetErrorHandler(handlers.asyncErr)
}
Expand Down Expand Up @@ -666,13 +675,18 @@ func (s *service) Stop() error {
}
for _, e := range s.endpoints {
if err := e.stop(); err != nil {
fmt.Println("Error stopping endpoint: ", err)
return err
}
}
var keys []string
for key, sub := range s.verbSubs {
keys = append(keys, key)
if err := sub.Drain(); err != nil {
// connection is closed so draining is not possible
if errors.Is(err, nats.ErrConnectionClosed) {
break
}
return fmt.Errorf("draining subscription for subject %q: %w", sub.Subject, err)
}
}
Expand Down Expand Up @@ -828,7 +842,9 @@ func (g *group) AddGroup(name string, opts ...GroupOpt) Group {
}

func (e *Endpoint) stop() error {
if err := e.subscription.Drain(); err != nil {
// Drain the subscription. If the connection is closed, draining is not possible
// but we should still remove the endpoint from the service.
if err := e.subscription.Drain(); err != nil && !errors.Is(err, nats.ErrConnectionClosed) {
return fmt.Errorf("draining subscription for request handler: %w", err)
}
for i := 0; i < len(e.service.endpoints); i++ {
Expand Down
24 changes: 24 additions & 0 deletions micro/test/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package micro_test

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
6 changes: 6 additions & 0 deletions micro/test/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -444,6 +444,9 @@ func TestAddService(t *testing.T) {
t.Fatalf("Expected to connect to server, got %v", err)
}
defer nc.Close()
// cleanup handlers since we invoke them manually
defer nc.SetClosedHandler(nil)
defer nc.SetErrorHandler(nil)

srv, err := micro.AddService(nc, test.givenConfig)
if test.withError != nil {
Expand Down Expand Up @@ -1389,6 +1392,9 @@ func TestRequestRespond(t *testing.T) {
Header: nats.Header{"key": []string{"value"}},
}, 50*time.Millisecond)
if test.withRespondError != nil {
if err == nil {
t.Fatalf("Expected error when receiving response")
}
return
}
if err != nil {
Expand Down

0 comments on commit b1be9bf

Please sign in to comment.