diff --git a/cdnsystem/cdnutil/cdn_util.go b/cdnsystem/cdnutil/cdn_util.go index 7c8f32efb88..9e301e57ebf 100644 --- a/cdnsystem/cdnutil/cdn_util.go +++ b/cdnsystem/cdnutil/cdn_util.go @@ -19,9 +19,27 @@ package cdnutil import ( "fmt" + "d7y.io/dragonfly/v2/cdnsystem/config" "d7y.io/dragonfly/v2/pkg/util/net/iputils" ) func GenCDNPeerID(taskID string) string { return fmt.Sprintf("%s-%s_%s", iputils.HostName, taskID, "CDN") } + +// ComputePieceSize computes the piece size with specified fileLength. +// +// If the fileLength<=0, which means failed to get fileLength +// and then use the DefaultPieceSize. +func ComputePieceSize(length int64) int32 { + if length <= 0 || length <= 200*1024*1024 { + return config.DefaultPieceSize + } + + gapCount := length / int64(100*1024*1024) + mpSize := (gapCount-2)*1024*1024 + config.DefaultPieceSize + if mpSize > config.DefaultPieceSizeLimit { + return config.DefaultPieceSizeLimit + } + return int32(mpSize) +} diff --git a/cdnsystem/daemon/task/manager_util.go b/cdnsystem/daemon/task/manager_util.go index 374137af78d..1883fd0d627 100644 --- a/cdnsystem/daemon/task/manager_util.go +++ b/cdnsystem/daemon/task/manager_util.go @@ -22,7 +22,9 @@ import ( "reflect" "time" - "d7y.io/dragonfly/v2/cdnsystem/config" + "github.com/pkg/errors" + + "d7y.io/dragonfly/v2/cdnsystem/cdnutil" cdnerrors "d7y.io/dragonfly/v2/cdnsystem/errors" "d7y.io/dragonfly/v2/cdnsystem/types" logger "d7y.io/dragonfly/v2/internal/dflog" @@ -30,7 +32,6 @@ import ( "d7y.io/dragonfly/v2/pkg/synclock" "d7y.io/dragonfly/v2/pkg/util/net/urlutils" "d7y.io/dragonfly/v2/pkg/util/stringutils" - "github.com/pkg/errors" ) const ( @@ -108,7 +109,7 @@ func (tm *Manager) addOrUpdateTask(ctx context.Context, request *types.TaskRegis // calculate piece size and update the PieceSize and PieceTotal if task.PieceSize <= 0 { - pieceSize := computePieceSize(task.SourceFileLength) + pieceSize := cdnutil.ComputePieceSize(task.SourceFileLength) task.PieceSize = pieceSize } tm.taskStore.Add(task.TaskID, task) @@ -193,20 +194,3 @@ func isSameTask(task1, task2 *types.SeedTask) bool { return true } - -// computePieceSize computes the piece size with specified fileLength. -// -// If the fileLength<=0, which means failed to get fileLength -// and then use the DefaultPieceSize. -func computePieceSize(length int64) int32 { - if length <= 0 || length <= 200*1024*1024 { - return config.DefaultPieceSize - } - - gapCount := length / int64(100*1024*1024) - mpSize := (gapCount-2)*1024*1024 + config.DefaultPieceSize - if mpSize > config.DefaultPieceSizeLimit { - return config.DefaultPieceSizeLimit - } - return int32(mpSize) -} diff --git a/client/daemon/peer/peertask_file.go b/client/daemon/peer/peertask_file.go index d63d36e611d..f8e6384aa35 100644 --- a/client/daemon/peer/peertask_file.go +++ b/client/daemon/peer/peertask_file.go @@ -29,6 +29,7 @@ import ( "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/internal/dfcodes" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" @@ -86,8 +87,8 @@ func newFilePeerTask(ctx context.Context, logger.Infof("request overview, url: %s, filter: %s, meta: %s, biz: %s, peer: %s", request.Url, request.UrlMeta.Filter, request.UrlMeta, request.UrlMeta.Tag, request.PeerId) // trace register - _, regSpan := tracer.Start(ctx, config.SpanRegisterTask) - result, err := schedulerClient.RegisterPeerTask(ctx, request) + regCtx, regSpan := tracer.Start(ctx, config.SpanRegisterTask) + result, err := schedulerClient.RegisterPeerTask(regCtx, request) logger.Infof("step 1: peer %s start to register", request.PeerId) regSpan.RecordError(err) regSpan.End() @@ -104,8 +105,10 @@ func newFilePeerTask(ctx context.Context, needBackSource = true // can not detect source or scheduler error, create a new dummy scheduler client schedulerClient = &dummySchedulerClient{} + result = &scheduler.RegisterResult{TaskId: idgen.TaskID(request.Url, request.UrlMeta)} logger.Warnf("register peer task failed: %s, peer id: %s, try to back source", err, request.PeerId) } + if result == nil { defer span.End() span.RecordError(err) diff --git a/client/daemon/peer/peertask_file_callback.go b/client/daemon/peer/peertask_file_callback.go index f4382aa1d30..2434dfd3652 100644 --- a/client/daemon/peer/peertask_file_callback.go +++ b/client/daemon/peer/peertask_file_callback.go @@ -17,7 +17,6 @@ package peer import ( - "context" "time" "d7y.io/dragonfly/v2/client/daemon/storage" @@ -27,7 +26,6 @@ import ( ) type filePeerTaskCallback struct { - ctx context.Context ptm *peerTaskManager pt *filePeerTask req *FilePeerTaskRequest @@ -42,7 +40,7 @@ func (p *filePeerTaskCallback) GetStartTime() time.Time { func (p *filePeerTaskCallback) Init(pt Task) error { // prepare storage - err := p.ptm.storageManager.RegisterTask(p.ctx, + err := p.ptm.storageManager.RegisterTask(p.pt.ctx, storage.RegisterTaskRequest{ CommonTaskRequest: storage.CommonTaskRequest{ PeerID: pt.GetPeerID(), @@ -60,7 +58,7 @@ func (p *filePeerTaskCallback) Init(pt Task) error { func (p *filePeerTaskCallback) Update(pt Task) error { // update storage - err := p.ptm.storageManager.UpdateTask(p.ctx, + err := p.ptm.storageManager.UpdateTask(p.pt.ctx, &storage.UpdateTaskRequest{ PeerTaskMetaData: storage.PeerTaskMetaData{ PeerID: pt.GetPeerID(), @@ -79,7 +77,7 @@ func (p *filePeerTaskCallback) Done(pt Task) error { var cost = time.Now().Sub(p.start).Milliseconds() pt.Log().Infof("file peer task done, cost: %dms", cost) e := p.ptm.storageManager.Store( - context.Background(), + p.pt.ctx, &storage.StoreRequest{ CommonTaskRequest: storage.CommonTaskRequest{ PeerID: pt.GetPeerID(), @@ -93,7 +91,7 @@ func (p *filePeerTaskCallback) Done(pt Task) error { return e } p.ptm.PeerTaskDone(p.req.PeerId) - err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{ + err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{ TaskId: pt.GetTaskID(), PeerId: pt.GetPeerID(), SrcIp: p.ptm.host.Ip, @@ -118,7 +116,7 @@ func (p *filePeerTaskCallback) Fail(pt Task, code base.Code, reason string) erro p.ptm.PeerTaskDone(p.req.PeerId) var end = time.Now() pt.Log().Errorf("file peer task failed, code: %d, reason: %s", code, reason) - err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{ + err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{ TaskId: pt.GetTaskID(), PeerId: pt.GetPeerID(), SrcIp: p.ptm.host.Ip, diff --git a/client/daemon/peer/peertask_file_test.go b/client/daemon/peer/peertask_file_test.go index 9ad34bacaae..83aed0bb1e6 100644 --- a/client/daemon/peer/peertask_file_test.go +++ b/client/daemon/peer/peertask_file_test.go @@ -26,17 +26,19 @@ import ( "testing" "time" + "d7y.io/dragonfly/v2/cdnsystem/cdnutil" "d7y.io/dragonfly/v2/pkg/rpc/base" rangers "d7y.io/dragonfly/v2/pkg/util/rangeutils" + "github.com/golang/mock/gomock" + testifyassert "github.com/stretchr/testify/assert" + "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/test" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" "d7y.io/dragonfly/v2/pkg/source" sourceMock "d7y.io/dragonfly/v2/pkg/source/mock" - "github.com/golang/mock/gomock" - testifyassert "github.com/stretchr/testify/assert" ) func TestFilePeerTask_BackSource_WithContentLength(t *testing.T) { @@ -100,7 +102,7 @@ func TestFilePeerTask_BackSource_WithContentLength(t *testing.T) { pieceManager: &pieceManager{ storageManager: storageManager, pieceDownloader: downloader, - computePieceSize: computePieceSize, + computePieceSize: cdnutil.ComputePieceSize, }, storageManager: storageManager, schedulerClient: schedulerClient, @@ -131,7 +133,6 @@ func TestFilePeerTask_BackSource_WithContentLength(t *testing.T) { pt.needBackSource = true pt.SetCallback(&filePeerTaskCallback{ - ctx: ctx, ptm: ptm, pt: pt, req: req, @@ -219,7 +220,7 @@ func TestFilePeerTask_BackSource_WithoutContentLength(t *testing.T) { pieceManager: &pieceManager{ storageManager: storageManager, pieceDownloader: downloader, - computePieceSize: computePieceSize, + computePieceSize: cdnutil.ComputePieceSize, }, storageManager: storageManager, schedulerClient: schedulerClient, @@ -250,7 +251,6 @@ func TestFilePeerTask_BackSource_WithoutContentLength(t *testing.T) { pt.needBackSource = true pt.SetCallback(&filePeerTaskCallback{ - ctx: ctx, ptm: ptm, pt: pt, req: req, diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index 7dafae93357..f10a2cbdc71 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -184,7 +184,6 @@ func (ptm *peerTaskManager) StartFilePeerTask(ctx context.Context, req *FilePeer return nil, tiny, nil } pt.SetCallback(&filePeerTaskCallback{ - ctx: ctx, ptm: ptm, pt: pt, req: req, @@ -224,7 +223,6 @@ func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *schedu pt.SetCallback( &streamPeerTaskCallback{ - ctx: ctx, ptm: ptm, pt: pt, req: req, diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index 92222f24890..65ed9b5b398 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -35,6 +35,7 @@ import ( testifyassert "github.com/stretchr/testify/assert" "google.golang.org/grpc" + "d7y.io/dragonfly/v2/cdnsystem/cdnutil" "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" @@ -362,7 +363,7 @@ func TestPeerTaskManager_StartStreamPeerTask_BackSource(t *testing.T) { pieceManager: &pieceManager{ storageManager: storageManager, pieceDownloader: NewMockPieceDownloader(ctrl), - computePieceSize: computePieceSize, + computePieceSize: cdnutil.ComputePieceSize, }, storageManager: storageManager, schedulerClient: sched, diff --git a/client/daemon/peer/peertask_reuse.go b/client/daemon/peer/peertask_reuse.go index 91b2b648885..09a656eae5b 100644 --- a/client/daemon/peer/peertask_reuse.go +++ b/client/daemon/peer/peertask_reuse.go @@ -22,15 +22,16 @@ import ( "io" "time" + "github.com/go-http-utils/headers" + "go.opentelemetry.io/otel/semconv" + "go.opentelemetry.io/otel/trace" + "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" "d7y.io/dragonfly/v2/internal/dfcodes" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" - "github.com/go-http-utils/headers" - "go.opentelemetry.io/otel/semconv" - "go.opentelemetry.io/otel/trace" ) var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation diff --git a/client/daemon/peer/peertask_stream.go b/client/daemon/peer/peertask_stream.go index 69e3b5aa94c..6833c0036cc 100644 --- a/client/daemon/peer/peertask_stream.go +++ b/client/daemon/peer/peertask_stream.go @@ -32,6 +32,7 @@ import ( "d7y.io/dragonfly/v2/client/daemon/storage" "d7y.io/dragonfly/v2/internal/dfcodes" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/rpc/scheduler" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" @@ -69,8 +70,8 @@ func newStreamPeerTask(ctx context.Context, logger.Debugf("request overview, pid: %s, url: %s, filter: %s, meta: %s, tag: %s", request.PeerId, request.Url, request.UrlMeta.Filter, request.UrlMeta, request.UrlMeta.Tag) // trace register - _, regSpan := tracer.Start(ctx, config.SpanRegisterTask) - result, err := schedulerClient.RegisterPeerTask(ctx, request) + regCtx, regSpan := tracer.Start(ctx, config.SpanRegisterTask) + result, err := schedulerClient.RegisterPeerTask(regCtx, request) logger.Infof("step 1: peer %s start to register", request.PeerId) regSpan.RecordError(err) regSpan.End() @@ -87,8 +88,10 @@ func newStreamPeerTask(ctx context.Context, needBackSource = true // can not detect source or scheduler error, create a new dummy scheduler client schedulerClient = &dummySchedulerClient{} + result = &scheduler.RegisterResult{TaskId: idgen.TaskID(request.Url, request.UrlMeta)} logger.Warnf("register peer task failed: %s, peer id: %s, try to back source", err, request.PeerId) } + if result == nil { defer span.End() span.RecordError(err) diff --git a/client/daemon/peer/peertask_stream_callback.go b/client/daemon/peer/peertask_stream_callback.go index 7984cb425a4..dd2cd03174a 100644 --- a/client/daemon/peer/peertask_stream_callback.go +++ b/client/daemon/peer/peertask_stream_callback.go @@ -17,7 +17,6 @@ package peer import ( - "context" "time" "d7y.io/dragonfly/v2/client/daemon/storage" @@ -27,7 +26,6 @@ import ( ) type streamPeerTaskCallback struct { - ctx context.Context ptm *peerTaskManager pt *streamPeerTask req *scheduler.PeerTaskRequest @@ -42,7 +40,7 @@ func (p *streamPeerTaskCallback) GetStartTime() time.Time { func (p *streamPeerTaskCallback) Init(pt Task) error { // prepare storage - err := p.ptm.storageManager.RegisterTask(p.ctx, + err := p.ptm.storageManager.RegisterTask(p.pt.ctx, storage.RegisterTaskRequest{ CommonTaskRequest: storage.CommonTaskRequest{ PeerID: pt.GetPeerID(), @@ -59,7 +57,7 @@ func (p *streamPeerTaskCallback) Init(pt Task) error { func (p *streamPeerTaskCallback) Update(pt Task) error { // update storage - err := p.ptm.storageManager.UpdateTask(p.ctx, + err := p.ptm.storageManager.UpdateTask(p.pt.ctx, &storage.UpdateTaskRequest{ PeerTaskMetaData: storage.PeerTaskMetaData{ PeerID: pt.GetPeerID(), @@ -78,7 +76,7 @@ func (p *streamPeerTaskCallback) Done(pt Task) error { var cost = time.Now().Sub(p.start).Milliseconds() pt.Log().Infof("stream peer task done, cost: %dms", cost) e := p.ptm.storageManager.Store( - context.Background(), + p.pt.ctx, &storage.StoreRequest{ CommonTaskRequest: storage.CommonTaskRequest{ PeerID: pt.GetPeerID(), @@ -91,7 +89,7 @@ func (p *streamPeerTaskCallback) Done(pt Task) error { return e } p.ptm.PeerTaskDone(p.req.PeerId) - err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{ + err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{ TaskId: pt.GetTaskID(), PeerId: pt.GetPeerID(), SrcIp: p.ptm.host.Ip, @@ -116,7 +114,7 @@ func (p *streamPeerTaskCallback) Fail(pt Task, code base.Code, reason string) er p.ptm.PeerTaskDone(p.req.PeerId) var end = time.Now() pt.Log().Errorf("stream peer task failed, code: %d, reason: %s", code, reason) - err := p.pt.schedulerClient.ReportPeerResult(context.Background(), &scheduler.PeerResult{ + err := p.pt.schedulerClient.ReportPeerResult(p.pt.ctx, &scheduler.PeerResult{ TaskId: pt.GetTaskID(), PeerId: pt.GetPeerID(), SrcIp: p.ptm.host.Ip, diff --git a/client/daemon/peer/peertask_stream_test.go b/client/daemon/peer/peertask_stream_test.go index 0ca48289950..5a00ce69329 100644 --- a/client/daemon/peer/peertask_stream_test.go +++ b/client/daemon/peer/peertask_stream_test.go @@ -28,6 +28,7 @@ import ( "github.com/golang/mock/gomock" testifyassert "github.com/stretchr/testify/assert" + "d7y.io/dragonfly/v2/cdnsystem/cdnutil" "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/test" @@ -97,7 +98,7 @@ func TestStreamPeerTask_BackSource_WithContentLength(t *testing.T) { pieceManager: &pieceManager{ storageManager: storageManager, pieceDownloader: downloader, - computePieceSize: computePieceSize, + computePieceSize: cdnutil.ComputePieceSize, }, storageManager: storageManager, schedulerClient: schedulerClient, @@ -129,7 +130,6 @@ func TestStreamPeerTask_BackSource_WithContentLength(t *testing.T) { 0) assert.Nil(err, "new stream peer task") pt.SetCallback(&streamPeerTaskCallback{ - ctx: ctx, ptm: ptm, pt: pt, req: req, @@ -204,7 +204,7 @@ func TestStreamPeerTask_BackSource_WithoutContentLength(t *testing.T) { pieceManager: &pieceManager{ storageManager: storageManager, pieceDownloader: downloader, - computePieceSize: computePieceSize, + computePieceSize: cdnutil.ComputePieceSize, }, storageManager: storageManager, schedulerClient: schedulerClient, @@ -236,7 +236,6 @@ func TestStreamPeerTask_BackSource_WithoutContentLength(t *testing.T) { 0) assert.Nil(err, "new stream peer task") pt.SetCallback(&streamPeerTaskCallback{ - ctx: ctx, ptm: ptm, pt: pt, req: req, diff --git a/client/daemon/peer/piece_manager.go b/client/daemon/peer/piece_manager.go index da1004d4ff5..fbdbbca906d 100644 --- a/client/daemon/peer/piece_manager.go +++ b/client/daemon/peer/piece_manager.go @@ -24,7 +24,7 @@ import ( "golang.org/x/time/rate" - cdnconfig "d7y.io/dragonfly/v2/cdnsystem/config" + "d7y.io/dragonfly/v2/cdnsystem/cdnutil" "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/storage" @@ -56,7 +56,7 @@ var _ PieceManager = (*pieceManager)(nil) func NewPieceManager(s storage.TaskStorageDriver, opts ...func(*pieceManager)) (PieceManager, error) { pm := &pieceManager{ storageManager: s, - computePieceSize: computePieceSize, + computePieceSize: cdnutil.ComputePieceSize, calculateDigest: true, } for _, opt := range opts { @@ -386,21 +386,3 @@ func (pm *pieceManager) DownloadSource(ctx context.Context, pt Task, request *sc log.Infof("download from source ok") return nil } - -// TODO copy from cdnsystem/daemon/mgr/task/manager_util.go -// computePieceSize computes the piece size with specified fileLength. -// -// If the fileLength<=0, which means failed to get fileLength -// and then use the DefaultPieceSize. -func computePieceSize(length int64) int32 { - if length <= 0 || length <= 200*1024*1024 { - return cdnconfig.DefaultPieceSize - } - - gapCount := length / int64(100*1024*1024) - mpSize := (gapCount-2)*1024*1024 + cdnconfig.DefaultPieceSize - if mpSize > cdnconfig.DefaultPieceSizeLimit { - return cdnconfig.DefaultPieceSizeLimit - } - return int32(mpSize) -} diff --git a/client/daemon/proxy/proxy.go b/client/daemon/proxy/proxy.go index 7ec276221b3..841c6a37787 100644 --- a/client/daemon/proxy/proxy.go +++ b/client/daemon/proxy/proxy.go @@ -28,12 +28,6 @@ import ( "sync" "time" - "d7y.io/dragonfly/v2/client/config" - "d7y.io/dragonfly/v2/client/daemon/peer" - "d7y.io/dragonfly/v2/client/daemon/transport" - logger "d7y.io/dragonfly/v2/internal/dflog" - "d7y.io/dragonfly/v2/pkg/rpc/scheduler" - "d7y.io/dragonfly/v2/pkg/util/stringutils" "github.com/go-http-utils/headers" "github.com/golang/groupcache/lru" "github.com/pkg/errors" @@ -41,6 +35,13 @@ import ( "go.opentelemetry.io/otel/semconv" "go.opentelemetry.io/otel/trace" "golang.org/x/sync/semaphore" + + "d7y.io/dragonfly/v2/client/config" + "d7y.io/dragonfly/v2/client/daemon/peer" + "d7y.io/dragonfly/v2/client/daemon/transport" + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/rpc/scheduler" + "d7y.io/dragonfly/v2/pkg/util/stringutils" ) var ( diff --git a/cmd/dependency/dependency.go b/cmd/dependency/dependency.go index 48d6edf7b14..8f50a6ca697 100644 --- a/cmd/dependency/dependency.go +++ b/cmd/dependency/dependency.go @@ -27,15 +27,6 @@ import ( "syscall" "time" - "d7y.io/dragonfly/v2/client/clientutil" - "d7y.io/dragonfly/v2/client/config" - logger "d7y.io/dragonfly/v2/internal/dflog" - "d7y.io/dragonfly/v2/internal/dflog/logcore" - "d7y.io/dragonfly/v2/internal/dfpath" - "d7y.io/dragonfly/v2/pkg/basic/dfnet" - "d7y.io/dragonfly/v2/pkg/unit" - "d7y.io/dragonfly/v2/pkg/util/net/iputils" - "d7y.io/dragonfly/v2/version" "github.com/go-echarts/statsview" "github.com/go-echarts/statsview/viewer" "github.com/mitchellh/mapstructure" @@ -45,11 +36,22 @@ import ( "github.com/spf13/viper" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/trace/jaeger" + "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" "go.opentelemetry.io/otel/semconv" "go.uber.org/zap/zapcore" "gopkg.in/yaml.v3" + + "d7y.io/dragonfly/v2/client/clientutil" + "d7y.io/dragonfly/v2/client/config" + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/internal/dflog/logcore" + "d7y.io/dragonfly/v2/internal/dfpath" + "d7y.io/dragonfly/v2/pkg/basic/dfnet" + "d7y.io/dragonfly/v2/pkg/unit" + "d7y.io/dragonfly/v2/pkg/util/net/iputils" + "d7y.io/dragonfly/v2/version" ) // InitCobra initializes flags binding and common sub cmds. @@ -245,6 +247,7 @@ func initJaegerTracer(url string) (func(), error) { // Register our TracerProvider as the global so any imported // instrumentation in the future will default to using it. otel.SetTracerProvider(tp) + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})) return func() { // Do not make the application hang when it is shutdown. diff --git a/go.mod b/go.mod index fc9b5d21ef9..7ff8672233d 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( github.com/golang/protobuf v1.5.2 github.com/google/uuid v1.2.0 github.com/gorilla/mux v1.7.3 + github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 github.com/jarcoal/httpmock v1.0.8 github.com/klauspost/compress v1.13.1 // indirect github.com/kr/text v0.2.0 // indirect @@ -55,6 +56,7 @@ require ( github.com/spf13/viper v1.7.1 github.com/stretchr/testify v1.7.0 github.com/ugorji/go v1.1.13 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 go.opentelemetry.io/otel v0.20.0 go.opentelemetry.io/otel/exporters/trace/jaeger v0.20.0 go.opentelemetry.io/otel/sdk v0.20.0 @@ -67,7 +69,7 @@ require ( golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 golang.org/x/tools v0.1.4 // indirect gonum.org/v1/gonum v0.9.3 - google.golang.org/grpc v1.36.0 + google.golang.org/grpc v1.39.0 google.golang.org/protobuf v1.26.0 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 diff --git a/go.sum b/go.sum index bfa9b965848..62ede940b49 100644 --- a/go.sum +++ b/go.sum @@ -67,6 +67,7 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/aliyun/aliyun-oss-go-sdk v2.1.6+incompatible h1:Ft+KeWIJxFP76LqgJbvtOA1qBIoC8vGkTV3QeCOeJC4= github.com/aliyun/aliyun-oss-go-sdk v2.1.6+incompatible/go.mod h1:T/Aws4fEfogEE9v+HPhhw+CntffsBHJ8nXQCwKr0/g8= +github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/appleboy/gin-jwt/v2 v2.6.4 h1:4YlMh3AjCFnuIRiL27b7TXns7nLx8tU/TiSgh40RRUI= github.com/appleboy/gin-jwt/v2 v2.6.4/go.mod h1:CZpq1cRw+kqi0+yD2CwVw7VGXrrx4AqBdeZnwxVmoAs= github.com/appleboy/gofight/v2 v2.1.2 h1:VOy3jow4vIK8BRQJoC/I9muxyYlJ2yb9ht2hZoS3rf4= @@ -106,6 +107,7 @@ github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDk github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/colinmarc/hdfs/v2 v2.2.0 h1:4AaIlTq+/sWmeqYhI0dX8bD4YrMQM990tRjm636FkGM= @@ -146,6 +148,8 @@ github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.m github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5ynNVH9qI8YYLbd1fK2po= github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/envoyproxy/protoc-gen-validate v0.6.1 h1:4CF52PCseTFt4bE+Yk3dIpdVi7XWuPVMhPtm4FaIJPM= github.com/envoyproxy/protoc-gen-validate v0.6.1/go.mod h1:txg5va2Qkip90uYoSKH+nkAAmXrb2j3iq4FLwdrCbXQ= @@ -341,9 +345,11 @@ github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+ github.com/gorilla/sessions v1.2.0 h1:S7P+1Hm5V/AT9cjEcUD5uDaQSX0OE577aCXgoaKpYbQ= github.com/gorilla/sessions v1.2.0/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk= github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= +github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q= github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -634,6 +640,7 @@ github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= +github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.2.2/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -750,6 +757,10 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.22.6 h1:BdkrbWrzDlV9dnbzoP7sfN+dHheJ4J9JOaYxcUDL+ok= go.opencensus.io v0.22.6/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= +go.opentelemetry.io/contrib v0.20.0 h1:ubFQUn0VCZ0gPwIoJfBJVpeBlyRMxu8Mm/huKWYd9p0= +go.opentelemetry.io/contrib v0.20.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUzl5H4LY0Kc= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 h1:sO4WKdPAudZGKPcpZT4MJn6JaDmpyLrMPDGGyA1SttE= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0/go.mod h1:oVGt1LRbBOBq1A5BQLlUg9UaU/54aiHw8cgjV3aWZ/E= go.opentelemetry.io/otel v0.11.0/go.mod h1:G8UCk+KooF2HLkgo8RHX9epABH/aRGYET7gQOqBVdB0= go.opentelemetry.io/otel v0.15.0/go.mod h1:e4GKElweB8W2gWUqbghw0B8t5MCTccc9212eNHnOHwA= go.opentelemetry.io/otel v0.17.0/go.mod h1:Oqtdxmf7UtEvL037ohlgnaYa1h7GtMh0NcSd9eqkC9s= @@ -768,10 +779,13 @@ go.opentelemetry.io/otel/sdk v0.20.0/go.mod h1:g/IcepuwNsoiX5Byy2nNV0ySUF1em498m go.opentelemetry.io/otel/trace v0.17.0/go.mod h1:bIujpqg6ZL6xUTubIUgziI1jSaUPthmabA/ygf/6Cfg= go.opentelemetry.io/otel/trace v0.20.0 h1:1DL6EXUdcg95gukhuRRvLDO/4X5THh/5dIV52lqtnbw= go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.6.0 h1:Ezj3JGmsOnG1MoRWQkPBsKLe9DwWD9QeXzTRzzldNVk= go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0= +go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/multierr v1.5.0 h1:KCa4XfM8CWFCpxXRGok+Q0SS/0XBhMDbHHGABQLvD2A= go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= @@ -1036,6 +1050,7 @@ golang.org/x/tools v0.0.0-20190927191325-030b2cf1153e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191112195655-aa38f8e97acc/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191113191852-77e3bb0ad9e7/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191115202509-3a792d9c32b2/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= @@ -1144,6 +1159,7 @@ google.golang.org/genproto v0.0.0-20200312145019-da6875a35672/go.mod h1:55QSHmfG google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA= @@ -1172,11 +1188,14 @@ google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3Iji google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak= +google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc= google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8= google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= -google.golang.org/grpc v1.36.0 h1:o1bcQ6imQMIOpdrO3SWf2z5RV72WbDwdXuK0MDlc8As= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM= +google.golang.org/grpc v1.39.0 h1:Klz8I9kdtkIN6EpHHUOMLCYhTn/2WAe5a0s1hcBkdTI= +google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -1210,6 +1229,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/pkg/rpc/client.go b/pkg/rpc/client.go index c83485671aa..85d4d9569fa 100644 --- a/pkg/rpc/client.go +++ b/pkg/rpc/client.go @@ -22,14 +22,17 @@ import ( "sync" "time" - "d7y.io/dragonfly/v2/internal/dfcodes" - "d7y.io/dragonfly/v2/internal/dferrors" - logger "d7y.io/dragonfly/v2/internal/dflog" - "d7y.io/dragonfly/v2/pkg/basic/dfnet" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" "github.com/pkg/errors" "github.com/serialx/hashring" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" + + "d7y.io/dragonfly/v2/internal/dfcodes" + "d7y.io/dragonfly/v2/internal/dferrors" + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/basic/dfnet" ) const ( @@ -92,8 +95,13 @@ var defaultClientOpts = []grpc.DialOption{ Time: 2 * time.Minute, Timeout: 10 * time.Second, }), - grpc.WithStreamInterceptor(streamClientInterceptor), - grpc.WithUnaryInterceptor(unaryClientInterceptor), + // TODO make grpc interceptor optional + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + otelgrpc.StreamClientInterceptor(), + streamClientInterceptor)), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + otelgrpc.UnaryClientInterceptor(), + unaryClientInterceptor)), } type ConnOption interface { diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index fb51f101d2c..58c792f27de 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -20,13 +20,13 @@ import ( "context" "time" - "d7y.io/dragonfly/v2/internal/dfcodes" - "d7y.io/dragonfly/v2/internal/dferrors" - "d7y.io/dragonfly/v2/internal/idgen" "github.com/pkg/errors" "google.golang.org/grpc" + "d7y.io/dragonfly/v2/internal/dfcodes" + "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/internal/idgen" "d7y.io/dragonfly/v2/pkg/basic/dfnet" "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/rpc/base" diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go index 84d288d636d..bc6622c8158 100644 --- a/pkg/rpc/server.go +++ b/pkg/rpc/server.go @@ -29,6 +29,8 @@ import ( "syscall" "time" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" @@ -75,8 +77,13 @@ var serverOpts = []grpc.ServerOption{ MaxConnectionIdle: 5 * time.Minute, }), grpc.MaxConcurrentStreams(100), - grpc.StreamInterceptor(streamServerInterceptor), - grpc.UnaryInterceptor(unaryServerInterceptor), + // TODO make grpc interceptor optional + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + otelgrpc.StreamServerInterceptor(), + streamServerInterceptor)), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + otelgrpc.UnaryServerInterceptor(), + unaryServerInterceptor)), } var sp = struct { diff --git a/test/stress/README.md b/test/tools/stress/README.md similarity index 100% rename from test/stress/README.md rename to test/tools/stress/README.md diff --git a/test/stress/main.go b/test/tools/stress/main.go similarity index 100% rename from test/stress/main.go rename to test/tools/stress/main.go index 816e53b1bd8..297e320e04c 100644 --- a/test/stress/main.go +++ b/test/tools/stress/main.go @@ -33,13 +33,13 @@ import ( "syscall" "time" - "d7y.io/dragonfly/v2/pkg/unit" - "d7y.io/dragonfly/v2/pkg/util/net/iputils" "github.com/go-echarts/statsview" "github.com/go-echarts/statsview/viewer" "github.com/montanaflynn/stats" "d7y.io/dragonfly/v2/client/config" + "d7y.io/dragonfly/v2/pkg/unit" + "d7y.io/dragonfly/v2/pkg/util/net/iputils" ) var (