Skip to content

Commit

Permalink
Merge branch 'main' into jaegertracing#5068
Browse files Browse the repository at this point in the history
  • Loading branch information
yurishkuro authored Jan 3, 2024
2 parents ec05c47 + 6c70344 commit 874d426
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 22 deletions.
5 changes: 4 additions & 1 deletion cmd/agent/app/proxy_builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package app

import (
"context"

"github.com/jaegertracing/jaeger/cmd/agent/app/reporter/grpc"
)

// GRPCCollectorProxyBuilder creates CollectorProxyBuilder for GRPC reporter
func GRPCCollectorProxyBuilder(builder *grpc.ConnBuilder) CollectorProxyBuilder {
return func(opts ProxyBuilderOptions) (proxy CollectorProxy, err error) {
return grpc.NewCollectorProxy(builder, opts.AgentTags, opts.Metrics, opts.Logger)
ctx := context.Background()
return grpc.NewCollectorProxy(ctx, builder, opts.AgentTags, opts.Metrics, opts.Logger)
}
}
26 changes: 16 additions & 10 deletions cmd/agent/app/reporter/grpc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func NewConnBuilder() *ConnBuilder {
}

// CreateConnection creates the gRPC connection
func (b *ConnBuilder) CreateConnection(logger *zap.Logger, mFactory metrics.Factory) (*grpc.ClientConn, error) {
func (b *ConnBuilder) CreateConnection(ctx context.Context, logger *zap.Logger, mFactory metrics.Factory) (*grpc.ClientConn, error) {
var dialOptions []grpc.DialOption
var dialTarget string
if b.TLS.Enabled { // user requested a secure connection
Expand Down Expand Up @@ -115,16 +115,22 @@ func (b *ConnBuilder) CreateConnection(logger *zap.Logger, mFactory metrics.Fact
logger.Info("Checking connection to collector")

for {
s := cc.GetState()
if s == connectivity.Ready {
cm.OnConnectionStatusChange(true)
cm.RecordTarget(cc.Target())
} else {
cm.OnConnectionStatusChange(false)
select {
case <-ctx.Done():
logger.Info("Stopping connection")
return
default:
s := cc.GetState()
if s == connectivity.Ready {
cm.OnConnectionStatusChange(true)
cm.RecordTarget(cc.Target())
} else {
cm.OnConnectionStatusChange(false)
}

logger.Info("Agent collector connection state change", zap.String("dialTarget", dialTarget), zap.Stringer("status", s))
cc.WaitForStateChange(ctx, s)
}

logger.Info("Agent collector connection state change", zap.String("dialTarget", dialTarget), zap.Stringer("status", s))
cc.WaitForStateChange(context.Background(), s)
}
}(conn, connectMetrics)

Expand Down
28 changes: 22 additions & 6 deletions cmd/agent/app/reporter/grpc/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,11 @@ func TestBuilderFromConfig(t *testing.T) {
t,
[]string{"127.0.0.1:14268", "127.0.0.1:14269"},
cfg.CollectorHostPorts)
r, err := cfg.CreateConnection(zap.NewNop(), metrics.NullFactory)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r, err := cfg.CreateConnection(ctx, zap.NewNop(), metrics.NullFactory)
require.NoError(t, err)
defer r.Close()
assert.NotNil(t, r)
}

Expand Down Expand Up @@ -149,9 +152,12 @@ func TestBuilderWithCollectors(t *testing.T) {
cfg.Notifier = test.notifier
cfg.Discoverer = test.discoverer

conn, err := cfg.CreateConnection(zap.NewNop(), metrics.NullFactory)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
conn, err := cfg.CreateConnection(ctx, zap.NewNop(), metrics.NullFactory)
if test.expectedError == "" {
require.NoError(t, err)
defer conn.Close()
require.NotNil(t, conn)
if test.checkConnectionState {
assertConnectionState(t, conn, test.expectedState)
Expand Down Expand Up @@ -207,10 +213,12 @@ func TestProxyBuilder(t *testing.T) {
expectError: false,
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
proxy, err := NewCollectorProxy(test.grpcBuilder, nil, metrics.NullFactory, zap.NewNop())
proxy, err := NewCollectorProxy(ctx, test.grpcBuilder, nil, metrics.NullFactory, zap.NewNop())

if test.expectError {
require.Error(t, err)
} else {
Expand Down Expand Up @@ -333,6 +341,8 @@ func TestProxyClientTLS(t *testing.T) {
expectError: false,
},
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var opts []grpc.ServerOption
Expand All @@ -342,20 +352,23 @@ func TestProxyClientTLS(t *testing.T) {
opts = []grpc.ServerOption{grpc.Creds(credentials.NewTLS(tlsCfg))}
}

defer test.serverTLS.Close()
spanHandler := &mockSpanHandler{}
s, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterCollectorServiceServer(s, spanHandler)
}, opts...)
defer s.Stop()

mFactory := metricstest.NewFactory(time.Microsecond)
defer mFactory.Stop()
_, port, _ := net.SplitHostPort(addr.String())

grpcBuilder := &ConnBuilder{
CollectorHostPorts: []string{net.JoinHostPort("localhost", port)},
TLS: test.clientTLS,
}
proxy, err := NewCollectorProxy(
ctx,
grpcBuilder,
nil,
mFactory,
Expand All @@ -369,7 +382,7 @@ func TestProxyClientTLS(t *testing.T) {

r := proxy.GetReporter()

err = r.EmitBatch(context.Background(), &jaeger.Batch{Spans: []*jaeger.Span{{OperationName: "op"}}, Process: &jaeger.Process{ServiceName: "service"}})
err = r.EmitBatch(ctx, &jaeger.Batch{Spans: []*jaeger.Span{{OperationName: "op"}}, Process: &jaeger.Process{ServiceName: "service"}})

if test.expectError {
require.Error(t, err)
Expand Down Expand Up @@ -418,8 +431,11 @@ func TestBuilderWithAdditionalDialOptions(t *testing.T) {
AdditionalDialOptions: []grpc.DialOption{grpc.WithUnaryInterceptor(fi.intercept)},
}

r, err := cb.CreateConnection(zap.NewNop(), metrics.NullFactory)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
r, err := cb.CreateConnection(ctx, zap.NewNop(), metrics.NullFactory)
require.NoError(t, err)
defer r.Close()
assert.NotNil(t, r)

err = r.Invoke(context.Background(), "test", map[string]string{}, map[string]string{}, []grpc.CallOption{}...)
Expand Down
5 changes: 3 additions & 2 deletions cmd/agent/app/reporter/grpc/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package grpc

import (
"context"
"errors"
"io"

Expand All @@ -36,8 +37,8 @@ type ProxyBuilder struct {
}

// NewCollectorProxy creates ProxyBuilder
func NewCollectorProxy(builder *ConnBuilder, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
conn, err := builder.CreateConnection(logger, mFactory)
func NewCollectorProxy(ctx context.Context, builder *ConnBuilder, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
conn, err := builder.CreateConnection(ctx, logger, mFactory)
if err != nil {
return nil, err
}
Expand Down
5 changes: 4 additions & 1 deletion cmd/agent/app/reporter/grpc/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ func TestMultipleCollectors(t *testing.T) {
defer s2.Stop()

mFactory := metricstest.NewFactory(time.Microsecond)
proxy, err := NewCollectorProxy(&ConnBuilder{CollectorHostPorts: []string{addr1.String(), addr2.String()}}, nil, mFactory, zap.NewNop())
defer mFactory.Stop()
ctx, cancel := context.WithCancel(context.Background())
cancel()
proxy, err := NewCollectorProxy(ctx, &ConnBuilder{CollectorHostPorts: []string{addr1.String(), addr2.String()}}, nil, mFactory, zap.NewNop())
require.NoError(t, err)
require.NotNil(t, proxy)
assert.NotNil(t, proxy.GetReporter())
Expand Down
25 changes: 25 additions & 0 deletions cmd/agent/app/reporter/grpc/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) 2024 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package grpc

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
5 changes: 3 additions & 2 deletions cmd/agent/app/reporter/grpc/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ func TestReporter_EmitZipkinBatch(t *testing.T) {
defer s.Stop()
conn, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
//nolint:staticcheck // don't care about errors
defer conn.Close()
require.NoError(t, err)
defer conn.Close()

rep := NewReporter(conn, nil, zap.NewNop())

Expand Down Expand Up @@ -104,8 +104,8 @@ func TestReporter_EmitBatch(t *testing.T) {
defer s.Stop()
conn, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
//nolint:staticcheck // don't care about errors
defer conn.Close()
require.NoError(t, err)
defer conn.Close()
rep := NewReporter(conn, nil, zap.NewNop())

tm := time.Unix(158, 0)
Expand Down Expand Up @@ -133,6 +133,7 @@ func TestReporter_EmitBatch(t *testing.T) {
func TestReporter_SendFailure(t *testing.T) {
conn, err := grpc.Dial("invalid-host-name-blah:12345", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()
rep := NewReporter(conn, nil, zap.NewNop())
err = rep.send(context.Background(), nil, nil)
require.Error(t, err)
Expand Down

0 comments on commit 874d426

Please sign in to comment.