Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support dump http content in client for debugging #770

Merged
merged 6 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,12 @@ build-dfget-man-page:

# Run unittests
test:
@go test -gcflags "all=-l" -race -short ${PKG_LIST}
@go test -v -gcflags "all=-l" -race -short ${PKG_LIST}
.PHONY: test

# Run tests with coverage
test-coverage:
@go test -gcflags "all=-l" -race -short ${PKG_LIST} -coverprofile cover.out -covermode=atomic
@go test -v -gcflags "all=-l" -race -short ${PKG_LIST} -coverprofile cover.out -covermode=atomic
@cat cover.out >> coverage.txt
.PHONY: test-coverage

Expand Down
35 changes: 19 additions & 16 deletions client/config/peerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,15 @@ type DownloadOption struct {

type ProxyOption struct {
// WARNING: when add more option, please update ProxyOption.unmarshal function
ListenOption `mapstructure:",squash" yaml:",inline"`
BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"`
DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"`
MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"`
RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"`
WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"`
Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"`
HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"`
ListenOption `mapstructure:",squash" yaml:",inline"`
BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"`
DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"`
MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"`
RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"`
WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"`
Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"`
HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"`
DumpHTTPContent bool `mapstructure:"dumpHTTPContent" yaml:"dumpHTTPContent"`
}

func (p *ProxyOption) UnmarshalJSON(b []byte) error {
Expand Down Expand Up @@ -236,14 +237,15 @@ func (p *ProxyOption) UnmarshalYAML(node *yaml.Node) error {

func (p *ProxyOption) unmarshal(unmarshal func(in []byte, out interface{}) (err error), b []byte) error {
pt := struct {
ListenOption `mapstructure:",squash" yaml:",inline"`
BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"`
DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"`
MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"`
RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"`
WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"`
Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"`
HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"`
ListenOption `mapstructure:",squash" yaml:",inline"`
BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"`
DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"`
MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"`
RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"`
WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"`
Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"`
HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"`
DumpHTTPContent bool `mapstructure:"dumpHTTPContent" yaml:"dumpHTTPContent"`
}{}

if err := unmarshal(b, &pt); err != nil {
Expand All @@ -258,6 +260,7 @@ func (p *ProxyOption) unmarshal(unmarshal func(in []byte, out interface{}) (err
p.MaxConcurrency = pt.MaxConcurrency
p.DefaultFilter = pt.DefaultFilter
p.BasicAuth = pt.BasicAuth
p.DumpHTTPContent = pt.DumpHTTPContent

return nil
}
Expand Down
33 changes: 27 additions & 6 deletions client/daemon/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Proxy struct {
// cert is the certificate used to hijack https proxy requests
cert *tls.Certificate

// certCache is a in-memory cache store for TLS certs used in HTTPS hijack. Lazy init.
// certCache is an in-memory cache store for TLS certs used in HTTPS hijack. Lazy init.
certCache *lru.Cache

// directHandler are used to handle non-proxy requests
Expand All @@ -95,6 +95,9 @@ type Proxy struct {
tracer trace.Tracer

basicAuth *config.BasicAuth

// dumpHTTPContent indicates to dump http request header and response header
dumpHTTPContent bool
}

// Option is a functional option for configuring the proxy
Expand Down Expand Up @@ -201,6 +204,13 @@ func WithBasicAuth(auth *config.BasicAuth) Option {
}
}

func WithDumpHTTPContent(dump bool) Option {
return func(p *Proxy) *Proxy {
p.dumpHTTPContent = dump
return p
}
}

// NewProxy returns a new transparent proxy from the given options
func NewProxy(options ...Option) (*Proxy, error) {
return NewProxyWithOptions(options...)
Expand Down Expand Up @@ -334,12 +344,14 @@ func (proxy *Proxy) handleHTTP(span trace.Span, w http.ResponseWriter, req *http

func (proxy *Proxy) handleHTTPS(w http.ResponseWriter, r *http.Request) {
if proxy.cert == nil {
logger.Debugf("proxy cert is not configured, tunneling https request for %s", r.Host)
tunnelHTTPS(w, r)
return
}

cConfig := proxy.remoteConfig(r.Host)
if cConfig == nil {
logger.Debugf("hijackHTTPS hosts not match, tunneling https request for %s", r.Host)
tunnelHTTPS(w, r)
return
}
Expand Down Expand Up @@ -394,9 +406,16 @@ func (proxy *Proxy) handleHTTPS(w http.ResponseWriter, r *http.Request) {
cConn.Close()

rp := &httputil.ReverseProxy{
Director: func(r *http.Request) {
r.URL.Host = r.Host
r.URL.Scheme = schemaHTTPS
Director: func(req *http.Request) {
req.URL.Host = req.Host
req.URL.Scheme = schemaHTTPS
if proxy.dumpHTTPContent {
if out, e := httputil.DumpRequest(req, false); e == nil {
logger.Debugf("dump request in ReverseProxy: %s", string(out))
} else {
logger.Errorf("dump request in ReverseProxy error: %s", e)
}
}
},
Transport: proxy.newTransport(cConfig),
}
Expand All @@ -405,7 +424,8 @@ func (proxy *Proxy) handleHTTPS(w http.ResponseWriter, r *http.Request) {
wg := sync.WaitGroup{}
wg.Add(1)
// NOTE: http.Serve always returns a non-nil error
if err := http.Serve(&singleUseListener{&customCloseConn{sConn, wg.Done}}, rp); err != errServerClosed && err != http.ErrServerClosed {
err = http.Serve(&singleUseListener{&customCloseConn{sConn, wg.Done}}, rp)
if err != errServerClosed && err != http.ErrServerClosed {
logger.Errorf("failed to accept incoming HTTP connections: %v", err)
}
wg.Wait()
Expand All @@ -419,6 +439,7 @@ func (proxy *Proxy) newTransport(tlsConfig *tls.Config) http.RoundTripper {
transport.WithCondition(proxy.shouldUseDragonfly),
transport.WithDefaultFilter(proxy.defaultFilter),
transport.WithDefaultBiz(bizTag),
transport.WithDumpHTTPContent(proxy.dumpHTTPContent),
)
return rt
}
Expand All @@ -432,6 +453,7 @@ func (proxy *Proxy) mirrorRegistry(w http.ResponseWriter, r *http.Request) {
transport.WithCondition(proxy.shouldUseDragonflyForMirror),
transport.WithDefaultFilter(proxy.defaultFilter),
transport.WithDefaultBiz(bizTag),
transport.WithDumpHTTPContent(proxy.dumpHTTPContent),
)
if err != nil {
http.Error(w, fmt.Sprintf("failed to get transport: %v", err), http.StatusInternalServerError)
Expand Down Expand Up @@ -529,7 +551,6 @@ func (proxy *Proxy) shouldUseDragonflyForMirror(req *http.Request) bool {
// tunnelHTTPS handles a CONNECT request and proxy an https request through an
// http tunnel.
func tunnelHTTPS(w http.ResponseWriter, r *http.Request) {
logger.Debugf("Tunneling https request for %s", r.Host)
dst, err := net.DialTimeout("tcp", r.Host, 10*time.Second)
if err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
Expand Down
1 change: 1 addition & 0 deletions client/daemon/proxy/proxy_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func NewProxyManager(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskMana
WithMaxConcurrency(opts.MaxConcurrency),
WithDefaultFilter(opts.DefaultFilter),
WithBasicAuth(opts.BasicAuth),
WithDumpHTTPContent(opts.DumpHTTPContent),
}

if registry != nil {
Expand Down
18 changes: 13 additions & 5 deletions client/daemon/proxy/proxy_sni.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,18 @@ func (proxy *Proxy) handleTLSConn(clientConn net.Conn, port int) {
defer tlsConn.Close()

rp := &httputil.ReverseProxy{
Director: func(r *http.Request) {
r.URL.Scheme = schemaHTTPS
r.URL.Host = serverName
Director: func(req *http.Request) {
req.URL.Scheme = schemaHTTPS
req.URL.Host = serverName
if port != portHTTPS {
r.URL.Host = fmt.Sprintf("%s:%d", serverName, port)
req.URL.Host = fmt.Sprintf("%s:%d", serverName, port)
}
if proxy.dumpHTTPContent {
if out, e := httputil.DumpRequest(req, false); e == nil {
logger.Debugf("dump request in SNI ReverseProxy: %s", string(out))
} else {
logger.Errorf("dump request in SNI ReverseProxy error: %s", e)
}
}
},
Transport: proxy.newTransport(proxy.remoteConfig(serverName)),
Expand All @@ -122,7 +129,8 @@ func (proxy *Proxy) handleTLSConn(clientConn net.Conn, port int) {
wg := sync.WaitGroup{}
wg.Add(1)
// NOTE: http.Serve always returns a non-nil error
if err := http.Serve(&singleUseListener{&customCloseConn{tlsConn, wg.Done}}, rp); err != errServerClosed && err != http.ErrServerClosed {
err = http.Serve(&singleUseListener{&customCloseConn{tlsConn, wg.Done}}, rp)
if err != errServerClosed && err != http.ErrServerClosed {
logger.Errorf("failed to accept incoming HTTPS connections: %v", err)
}
wg.Wait()
Expand Down
9 changes: 5 additions & 4 deletions client/daemon/storage/local_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@ import (
"sync"
"time"

"go.uber.org/atomic"

"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/util/digestutils"
"go.uber.org/atomic"
)

type localTaskStore struct {
Expand Down Expand Up @@ -342,19 +343,19 @@ func (t *localTaskStore) reclaimData(sLogger *logger.SugaredLoggerOnWith) error
if stat.Mode()&os.ModeSymlink == os.ModeSymlink {
dest, err0 := os.Readlink(data)
if err0 == nil {
if err = os.Remove(dest); err != nil {
if err = os.Remove(dest); err != nil && !os.IsNotExist(err) {
sLogger.Warnf("remove symlink target file %s error: %s", dest, err)
} else {
sLogger.Infof("remove data file %s", dest)
}
}
} else { // remove cache file
if err = os.Remove(t.DataFilePath); err != nil {
if err = os.Remove(t.DataFilePath); err != nil && !os.IsNotExist(err) {
sLogger.Errorf("remove data file %s error: %s", data, err)
return err
}
}
if err = os.Remove(data); err != nil {
if err = os.Remove(data); err != nil && !os.IsNotExist(err) {
sLogger.Errorf("remove data file %s error: %s", data, err)
return err
}
Expand Down
71 changes: 61 additions & 10 deletions client/daemon/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ import (
"fmt"
"net"
"net/http"
"net/http/httputil"
"regexp"
"strconv"
"time"

"github.com/go-http-utils/headers"

"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/peer"
Expand Down Expand Up @@ -61,6 +65,9 @@ type transport struct {

// defaultBiz is used when http request without X-Dragonfly-Biz Header
defaultBiz string

// dumpHTTPContent indicates to dump http request header and response header
dumpHTTPContent bool
}

// Option is functional config for transport.
Expand Down Expand Up @@ -114,6 +121,13 @@ func WithDefaultBiz(b string) Option {
}
}

func WithDumpHTTPContent(b bool) Option {
return func(rt *transport) *transport {
rt.dumpHTTPContent = b
return rt
}
}

// New constructs a new instance of a RoundTripper with additional options.
func New(options ...Option) (http.RoundTripper, error) {
rt := &transport{
Expand All @@ -130,19 +144,25 @@ func New(options ...Option) (http.RoundTripper, error) {

// RoundTrip only process first redirect at present
// fix resource release
func (rt *transport) RoundTrip(req *http.Request) (*http.Response, error) {
func (rt *transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
if rt.shouldUseDragonfly(req) {
// delete the Accept-Encoding header to avoid returning the same cached
// result for different requests
req.Header.Del("Accept-Encoding")
logger.Debugf("round trip with dragonfly: %s", req.URL.String())
return rt.download(req)
resp, err = rt.download(req)
} else {
logger.Debugf("round trip directly, method: %s, url: %s", req.Method, req.URL.String())
req.Host = req.URL.Host
req.Header.Set("Host", req.Host)
resp, err = rt.baseRoundTripper.RoundTrip(req)
}
logger.Debugf("round trip directly: %s %s", req.Method, req.URL.String())
req.Host = req.URL.Host
req.Header.Set("Host", req.Host)

return rt.baseRoundTripper.RoundTrip(req)
if err != nil {
logger.With("method", req.Method, "url", req.URL.String()).
Errorf("round trip error: %s", err)
}
rt.processDumpHTTPContent(req, resp)
return resp, err
}

// NeedUseDragonfly is the default value for shouldUseDragonfly, which downloads all
Expand Down Expand Up @@ -202,14 +222,45 @@ func (rt *transport) download(req *http.Request) (*http.Response, error) {
hdr := httputils.MapToHeader(attr)
log.Infof("download stream attribute: %v", hdr)

var contentLength int64 = -1
if l, ok := attr[headers.ContentLength]; ok {
if i, e := strconv.ParseInt(l, 10, 64); e == nil {
contentLength = i
}
}

resp := &http.Response{
StatusCode: 200,
Body: body,
Header: hdr,
StatusCode: http.StatusOK,
Body: body,
Header: hdr,
ContentLength: contentLength,

Proto: req.Proto,
ProtoMajor: req.ProtoMajor,
ProtoMinor: req.ProtoMinor,
}
return resp, nil
}

func (rt *transport) processDumpHTTPContent(req *http.Request, resp *http.Response) {
if !rt.dumpHTTPContent {
return
}
if out, e := httputil.DumpRequest(req, false); e == nil {
logger.Debugf("dump request in transport: %s", string(out))
} else {
logger.Errorf("dump request in transport error: %s", e)
}
if resp == nil {
return
}
if out, e := httputil.DumpResponse(resp, false); e == nil {
logger.Debugf("dump response in transport: %s", string(out))
} else {
logger.Errorf("dump response in transport error: %s", e)
}
}

func defaultHTTPTransport(cfg *tls.Config) *http.Transport {
if cfg == nil {
cfg = &tls.Config{InsecureSkipVerify: true}
Expand Down
2 changes: 2 additions & 0 deletions test/testdata/charts/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ dfdaemon:
hostPath:
path: /tmp/artifact
config:
scheduler:
disableAutoBackSource: true
proxy:
defaultFilter: "Expires&Signature"
security:
Expand Down