diff --git a/Makefile b/Makefile index 7247909d582..6060f4df3fa 100644 --- a/Makefile +++ b/Makefile @@ -181,12 +181,12 @@ build-dfget-man-page: # Run unittests test: - @go test -gcflags "all=-l" -race -short ${PKG_LIST} + @go test -v -gcflags "all=-l" -race -short ${PKG_LIST} .PHONY: test # Run tests with coverage test-coverage: - @go test -gcflags "all=-l" -race -short ${PKG_LIST} -coverprofile cover.out -covermode=atomic + @go test -v -gcflags "all=-l" -race -short ${PKG_LIST} -coverprofile cover.out -covermode=atomic @cat cover.out >> coverage.txt .PHONY: test-coverage diff --git a/client/config/peerhost.go b/client/config/peerhost.go index d15cf81874d..700876fca89 100644 --- a/client/config/peerhost.go +++ b/client/config/peerhost.go @@ -152,14 +152,15 @@ type DownloadOption struct { type ProxyOption struct { // WARNING: when add more option, please update ProxyOption.unmarshal function - ListenOption `mapstructure:",squash" yaml:",inline"` - BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"` - DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"` - MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"` - RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"` - WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"` - Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"` - HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"` + ListenOption `mapstructure:",squash" yaml:",inline"` + BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"` + DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"` + MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"` + RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"` + WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"` + Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"` + HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"` + DumpHTTPContent bool `mapstructure:"dumpHTTPContent" yaml:"dumpHTTPContent"` } func (p *ProxyOption) UnmarshalJSON(b []byte) error { @@ -236,14 +237,15 @@ func (p *ProxyOption) UnmarshalYAML(node *yaml.Node) error { func (p *ProxyOption) unmarshal(unmarshal func(in []byte, out interface{}) (err error), b []byte) error { pt := struct { - ListenOption `mapstructure:",squash" yaml:",inline"` - BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"` - DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"` - MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"` - RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"` - WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"` - Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"` - HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"` + ListenOption `mapstructure:",squash" yaml:",inline"` + BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"` + DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"` + MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"` + RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"` + WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"` + Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"` + HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"` + DumpHTTPContent bool `mapstructure:"dumpHTTPContent" yaml:"dumpHTTPContent"` }{} if err := unmarshal(b, &pt); err != nil { @@ -258,6 +260,7 @@ func (p *ProxyOption) unmarshal(unmarshal func(in []byte, out interface{}) (err p.MaxConcurrency = pt.MaxConcurrency p.DefaultFilter = pt.DefaultFilter p.BasicAuth = pt.BasicAuth + p.DumpHTTPContent = pt.DumpHTTPContent return nil } diff --git a/client/daemon/proxy/proxy.go b/client/daemon/proxy/proxy.go index 5f324b5b207..ea3d0b9e7e4 100644 --- a/client/daemon/proxy/proxy.go +++ b/client/daemon/proxy/proxy.go @@ -70,7 +70,7 @@ type Proxy struct { // cert is the certificate used to hijack https proxy requests cert *tls.Certificate - // certCache is a in-memory cache store for TLS certs used in HTTPS hijack. Lazy init. + // certCache is an in-memory cache store for TLS certs used in HTTPS hijack. Lazy init. certCache *lru.Cache // directHandler are used to handle non-proxy requests @@ -95,6 +95,9 @@ type Proxy struct { tracer trace.Tracer basicAuth *config.BasicAuth + + // dumpHTTPContent indicates to dump http request header and response header + dumpHTTPContent bool } // Option is a functional option for configuring the proxy @@ -201,6 +204,13 @@ func WithBasicAuth(auth *config.BasicAuth) Option { } } +func WithDumpHTTPContent(dump bool) Option { + return func(p *Proxy) *Proxy { + p.dumpHTTPContent = dump + return p + } +} + // NewProxy returns a new transparent proxy from the given options func NewProxy(options ...Option) (*Proxy, error) { return NewProxyWithOptions(options...) @@ -334,12 +344,14 @@ func (proxy *Proxy) handleHTTP(span trace.Span, w http.ResponseWriter, req *http func (proxy *Proxy) handleHTTPS(w http.ResponseWriter, r *http.Request) { if proxy.cert == nil { + logger.Debugf("proxy cert is not configured, tunneling https request for %s", r.Host) tunnelHTTPS(w, r) return } cConfig := proxy.remoteConfig(r.Host) if cConfig == nil { + logger.Debugf("hijackHTTPS hosts not match, tunneling https request for %s", r.Host) tunnelHTTPS(w, r) return } @@ -394,9 +406,16 @@ func (proxy *Proxy) handleHTTPS(w http.ResponseWriter, r *http.Request) { cConn.Close() rp := &httputil.ReverseProxy{ - Director: func(r *http.Request) { - r.URL.Host = r.Host - r.URL.Scheme = schemaHTTPS + Director: func(req *http.Request) { + req.URL.Host = req.Host + req.URL.Scheme = schemaHTTPS + if proxy.dumpHTTPContent { + if out, e := httputil.DumpRequest(req, false); e == nil { + logger.Debugf("dump request in ReverseProxy: %s", string(out)) + } else { + logger.Errorf("dump request in ReverseProxy error: %s", e) + } + } }, Transport: proxy.newTransport(cConfig), } @@ -405,7 +424,8 @@ func (proxy *Proxy) handleHTTPS(w http.ResponseWriter, r *http.Request) { wg := sync.WaitGroup{} wg.Add(1) // NOTE: http.Serve always returns a non-nil error - if err := http.Serve(&singleUseListener{&customCloseConn{sConn, wg.Done}}, rp); err != errServerClosed && err != http.ErrServerClosed { + err = http.Serve(&singleUseListener{&customCloseConn{sConn, wg.Done}}, rp) + if err != errServerClosed && err != http.ErrServerClosed { logger.Errorf("failed to accept incoming HTTP connections: %v", err) } wg.Wait() @@ -419,6 +439,7 @@ func (proxy *Proxy) newTransport(tlsConfig *tls.Config) http.RoundTripper { transport.WithCondition(proxy.shouldUseDragonfly), transport.WithDefaultFilter(proxy.defaultFilter), transport.WithDefaultBiz(bizTag), + transport.WithDumpHTTPContent(proxy.dumpHTTPContent), ) return rt } @@ -432,6 +453,7 @@ func (proxy *Proxy) mirrorRegistry(w http.ResponseWriter, r *http.Request) { transport.WithCondition(proxy.shouldUseDragonflyForMirror), transport.WithDefaultFilter(proxy.defaultFilter), transport.WithDefaultBiz(bizTag), + transport.WithDumpHTTPContent(proxy.dumpHTTPContent), ) if err != nil { http.Error(w, fmt.Sprintf("failed to get transport: %v", err), http.StatusInternalServerError) @@ -529,7 +551,6 @@ func (proxy *Proxy) shouldUseDragonflyForMirror(req *http.Request) bool { // tunnelHTTPS handles a CONNECT request and proxy an https request through an // http tunnel. func tunnelHTTPS(w http.ResponseWriter, r *http.Request) { - logger.Debugf("Tunneling https request for %s", r.Host) dst, err := net.DialTimeout("tcp", r.Host, 10*time.Second) if err != nil { http.Error(w, err.Error(), http.StatusServiceUnavailable) diff --git a/client/daemon/proxy/proxy_manager.go b/client/daemon/proxy/proxy_manager.go index 56ab45d2e4a..5c9cba65dc2 100644 --- a/client/daemon/proxy/proxy_manager.go +++ b/client/daemon/proxy/proxy_manager.go @@ -70,6 +70,7 @@ func NewProxyManager(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskMana WithMaxConcurrency(opts.MaxConcurrency), WithDefaultFilter(opts.DefaultFilter), WithBasicAuth(opts.BasicAuth), + WithDumpHTTPContent(opts.DumpHTTPContent), } if registry != nil { diff --git a/client/daemon/proxy/proxy_sni.go b/client/daemon/proxy/proxy_sni.go index 8124985ad9e..277903919f5 100644 --- a/client/daemon/proxy/proxy_sni.go +++ b/client/daemon/proxy/proxy_sni.go @@ -108,11 +108,18 @@ func (proxy *Proxy) handleTLSConn(clientConn net.Conn, port int) { defer tlsConn.Close() rp := &httputil.ReverseProxy{ - Director: func(r *http.Request) { - r.URL.Scheme = schemaHTTPS - r.URL.Host = serverName + Director: func(req *http.Request) { + req.URL.Scheme = schemaHTTPS + req.URL.Host = serverName if port != portHTTPS { - r.URL.Host = fmt.Sprintf("%s:%d", serverName, port) + req.URL.Host = fmt.Sprintf("%s:%d", serverName, port) + } + if proxy.dumpHTTPContent { + if out, e := httputil.DumpRequest(req, false); e == nil { + logger.Debugf("dump request in SNI ReverseProxy: %s", string(out)) + } else { + logger.Errorf("dump request in SNI ReverseProxy error: %s", e) + } } }, Transport: proxy.newTransport(proxy.remoteConfig(serverName)), @@ -122,7 +129,8 @@ func (proxy *Proxy) handleTLSConn(clientConn net.Conn, port int) { wg := sync.WaitGroup{} wg.Add(1) // NOTE: http.Serve always returns a non-nil error - if err := http.Serve(&singleUseListener{&customCloseConn{tlsConn, wg.Done}}, rp); err != errServerClosed && err != http.ErrServerClosed { + err = http.Serve(&singleUseListener{&customCloseConn{tlsConn, wg.Done}}, rp) + if err != errServerClosed && err != http.ErrServerClosed { logger.Errorf("failed to accept incoming HTTPS connections: %v", err) } wg.Wait() diff --git a/client/daemon/storage/local_storage.go b/client/daemon/storage/local_storage.go index b9a11cb1f07..acd85c491a9 100644 --- a/client/daemon/storage/local_storage.go +++ b/client/daemon/storage/local_storage.go @@ -26,12 +26,13 @@ import ( "sync" "time" + "go.uber.org/atomic" + "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/util/digestutils" - "go.uber.org/atomic" ) type localTaskStore struct { @@ -342,19 +343,19 @@ func (t *localTaskStore) reclaimData(sLogger *logger.SugaredLoggerOnWith) error if stat.Mode()&os.ModeSymlink == os.ModeSymlink { dest, err0 := os.Readlink(data) if err0 == nil { - if err = os.Remove(dest); err != nil { + if err = os.Remove(dest); err != nil && !os.IsNotExist(err) { sLogger.Warnf("remove symlink target file %s error: %s", dest, err) } else { sLogger.Infof("remove data file %s", dest) } } } else { // remove cache file - if err = os.Remove(t.DataFilePath); err != nil { + if err = os.Remove(t.DataFilePath); err != nil && !os.IsNotExist(err) { sLogger.Errorf("remove data file %s error: %s", data, err) return err } } - if err = os.Remove(data); err != nil { + if err = os.Remove(data); err != nil && !os.IsNotExist(err) { sLogger.Errorf("remove data file %s error: %s", data, err) return err } diff --git a/client/daemon/transport/transport.go b/client/daemon/transport/transport.go index 300a4986451..323bd74c04b 100644 --- a/client/daemon/transport/transport.go +++ b/client/daemon/transport/transport.go @@ -21,9 +21,13 @@ import ( "fmt" "net" "net/http" + "net/http/httputil" "regexp" + "strconv" "time" + "github.com/go-http-utils/headers" + "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/peer" @@ -61,6 +65,9 @@ type transport struct { // defaultBiz is used when http request without X-Dragonfly-Biz Header defaultBiz string + + // dumpHTTPContent indicates to dump http request header and response header + dumpHTTPContent bool } // Option is functional config for transport. @@ -114,6 +121,13 @@ func WithDefaultBiz(b string) Option { } } +func WithDumpHTTPContent(b bool) Option { + return func(rt *transport) *transport { + rt.dumpHTTPContent = b + return rt + } +} + // New constructs a new instance of a RoundTripper with additional options. func New(options ...Option) (http.RoundTripper, error) { rt := &transport{ @@ -130,19 +144,25 @@ func New(options ...Option) (http.RoundTripper, error) { // RoundTrip only process first redirect at present // fix resource release -func (rt *transport) RoundTrip(req *http.Request) (*http.Response, error) { +func (rt *transport) RoundTrip(req *http.Request) (resp *http.Response, err error) { if rt.shouldUseDragonfly(req) { // delete the Accept-Encoding header to avoid returning the same cached // result for different requests req.Header.Del("Accept-Encoding") logger.Debugf("round trip with dragonfly: %s", req.URL.String()) - return rt.download(req) + resp, err = rt.download(req) + } else { + logger.Debugf("round trip directly, method: %s, url: %s", req.Method, req.URL.String()) + req.Host = req.URL.Host + req.Header.Set("Host", req.Host) + resp, err = rt.baseRoundTripper.RoundTrip(req) } - logger.Debugf("round trip directly: %s %s", req.Method, req.URL.String()) - req.Host = req.URL.Host - req.Header.Set("Host", req.Host) - - return rt.baseRoundTripper.RoundTrip(req) + if err != nil { + logger.With("method", req.Method, "url", req.URL.String()). + Errorf("round trip error: %s", err) + } + rt.processDumpHTTPContent(req, resp) + return resp, err } // NeedUseDragonfly is the default value for shouldUseDragonfly, which downloads all @@ -202,14 +222,45 @@ func (rt *transport) download(req *http.Request) (*http.Response, error) { hdr := httputils.MapToHeader(attr) log.Infof("download stream attribute: %v", hdr) + var contentLength int64 = -1 + if l, ok := attr[headers.ContentLength]; ok { + if i, e := strconv.ParseInt(l, 10, 64); e == nil { + contentLength = i + } + } + resp := &http.Response{ - StatusCode: 200, - Body: body, - Header: hdr, + StatusCode: http.StatusOK, + Body: body, + Header: hdr, + ContentLength: contentLength, + + Proto: req.Proto, + ProtoMajor: req.ProtoMajor, + ProtoMinor: req.ProtoMinor, } return resp, nil } +func (rt *transport) processDumpHTTPContent(req *http.Request, resp *http.Response) { + if !rt.dumpHTTPContent { + return + } + if out, e := httputil.DumpRequest(req, false); e == nil { + logger.Debugf("dump request in transport: %s", string(out)) + } else { + logger.Errorf("dump request in transport error: %s", e) + } + if resp == nil { + return + } + if out, e := httputil.DumpResponse(resp, false); e == nil { + logger.Debugf("dump response in transport: %s", string(out)) + } else { + logger.Errorf("dump response in transport error: %s", e) + } +} + func defaultHTTPTransport(cfg *tls.Config) *http.Transport { if cfg == nil { cfg = &tls.Config{InsecureSkipVerify: true} diff --git a/deploy/helm-charts b/deploy/helm-charts index 3a576ac0752..e3d9e72f64b 160000 --- a/deploy/helm-charts +++ b/deploy/helm-charts @@ -1 +1 @@ -Subproject commit 3a576ac0752d4f956bb2feeb62180a703050b43d +Subproject commit e3d9e72f64bb2328edeb574872d873d1336349b1 diff --git a/test/testdata/charts/config.yaml b/test/testdata/charts/config.yaml index 7f862164122..c0dd4ea12c3 100644 --- a/test/testdata/charts/config.yaml +++ b/test/testdata/charts/config.yaml @@ -33,6 +33,8 @@ dfdaemon: hostPath: path: /tmp/artifact config: + scheduler: + disableAutoBackSource: true proxy: defaultFilter: "Expires&Signature" security: