Skip to content

Commit

Permalink
feat: support dump http content in client for debugging
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma committed Oct 29, 2021
1 parent 0d2ae1f commit 58951ff
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 37 deletions.
35 changes: 19 additions & 16 deletions client/config/peerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,15 @@ type DownloadOption struct {

type ProxyOption struct {
// WARNING: when add more option, please update ProxyOption.unmarshal function
ListenOption `mapstructure:",squash" yaml:",inline"`
BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"`
DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"`
MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"`
RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"`
WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"`
Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"`
HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"`
ListenOption `mapstructure:",squash" yaml:",inline"`
BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"`
DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"`
MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"`
RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"`
WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"`
Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"`
HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"`
DumpHTTPContent bool `mapstructure:"dumpHTTPContent" yaml:"dumpHTTPContent"`
}

func (p *ProxyOption) UnmarshalJSON(b []byte) error {
Expand Down Expand Up @@ -236,14 +237,15 @@ func (p *ProxyOption) UnmarshalYAML(node *yaml.Node) error {

func (p *ProxyOption) unmarshal(unmarshal func(in []byte, out interface{}) (err error), b []byte) error {
pt := struct {
ListenOption `mapstructure:",squash" yaml:",inline"`
BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"`
DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"`
MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"`
RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"`
WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"`
Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"`
HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"`
ListenOption `mapstructure:",squash" yaml:",inline"`
BasicAuth *BasicAuth `mapstructure:"basicAuth" yaml:"basicAuth"`
DefaultFilter string `mapstructure:"defaultFilter" yaml:"defaultFilter"`
MaxConcurrency int64 `mapstructure:"maxConcurrency" yaml:"maxConcurrency"`
RegistryMirror *RegistryMirror `mapstructure:"registryMirror" yaml:"registryMirror"`
WhiteList []*WhiteList `mapstructure:"whiteList" yaml:"whiteList"`
Proxies []*Proxy `mapstructure:"proxies" yaml:"proxies"`
HijackHTTPS *HijackConfig `mapstructure:"hijackHTTPS" yaml:"hijackHTTPS"`
DumpHTTPContent bool `mapstructure:"dumpHTTPContent" yaml:"dumpHTTPContent"`
}{}

if err := unmarshal(b, &pt); err != nil {
Expand All @@ -258,6 +260,7 @@ func (p *ProxyOption) unmarshal(unmarshal func(in []byte, out interface{}) (err
p.MaxConcurrency = pt.MaxConcurrency
p.DefaultFilter = pt.DefaultFilter
p.BasicAuth = pt.BasicAuth
p.DumpHTTPContent = pt.DumpHTTPContent

return nil
}
Expand Down
33 changes: 27 additions & 6 deletions client/daemon/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type Proxy struct {
// cert is the certificate used to hijack https proxy requests
cert *tls.Certificate

// certCache is a in-memory cache store for TLS certs used in HTTPS hijack. Lazy init.
// certCache is an in-memory cache store for TLS certs used in HTTPS hijack. Lazy init.
certCache *lru.Cache

// directHandler are used to handle non-proxy requests
Expand All @@ -95,6 +95,9 @@ type Proxy struct {
tracer trace.Tracer

basicAuth *config.BasicAuth

// dumpHTTPContent indicates to dump http request header and response header
dumpHTTPContent bool
}

// Option is a functional option for configuring the proxy
Expand Down Expand Up @@ -201,6 +204,13 @@ func WithBasicAuth(auth *config.BasicAuth) Option {
}
}

func WithDumpHTTPContent(dump bool) Option {
return func(p *Proxy) *Proxy {
p.dumpHTTPContent = dump
return p
}
}

// NewProxy returns a new transparent proxy from the given options
func NewProxy(options ...Option) (*Proxy, error) {
return NewProxyWithOptions(options...)
Expand Down Expand Up @@ -334,12 +344,14 @@ func (proxy *Proxy) handleHTTP(span trace.Span, w http.ResponseWriter, req *http

func (proxy *Proxy) handleHTTPS(w http.ResponseWriter, r *http.Request) {
if proxy.cert == nil {
logger.Debugf("proxy cert is not configured, tunneling https request for %s", r.Host)
tunnelHTTPS(w, r)
return
}

cConfig := proxy.remoteConfig(r.Host)
if cConfig == nil {
logger.Debugf("hijackHTTPS hosts not match, tunneling https request for %s", r.Host)
tunnelHTTPS(w, r)
return
}
Expand Down Expand Up @@ -394,9 +406,16 @@ func (proxy *Proxy) handleHTTPS(w http.ResponseWriter, r *http.Request) {
cConn.Close()

rp := &httputil.ReverseProxy{
Director: func(r *http.Request) {
r.URL.Host = r.Host
r.URL.Scheme = schemaHTTPS
Director: func(req *http.Request) {
req.URL.Host = req.Host
req.URL.Scheme = schemaHTTPS
if proxy.dumpHTTPContent {
if out, e := httputil.DumpRequest(req, false); e == nil {
logger.Debugf("dump request in ReverseProxy: %s", string(out))
} else {
logger.Errorf("dump request in ReverseProxy error: %s", e)
}
}
},
Transport: proxy.newTransport(cConfig),
}
Expand All @@ -405,7 +424,8 @@ func (proxy *Proxy) handleHTTPS(w http.ResponseWriter, r *http.Request) {
wg := sync.WaitGroup{}
wg.Add(1)
// NOTE: http.Serve always returns a non-nil error
if err := http.Serve(&singleUseListener{&customCloseConn{sConn, wg.Done}}, rp); err != errServerClosed && err != http.ErrServerClosed {
err = http.Serve(&singleUseListener{&customCloseConn{sConn, wg.Done}}, rp)
if err != errServerClosed && err != http.ErrServerClosed {
logger.Errorf("failed to accept incoming HTTP connections: %v", err)
}
wg.Wait()
Expand All @@ -419,6 +439,7 @@ func (proxy *Proxy) newTransport(tlsConfig *tls.Config) http.RoundTripper {
transport.WithCondition(proxy.shouldUseDragonfly),
transport.WithDefaultFilter(proxy.defaultFilter),
transport.WithDefaultBiz(bizTag),
transport.WithDumpHTTPContent(proxy.dumpHTTPContent),
)
return rt
}
Expand All @@ -432,6 +453,7 @@ func (proxy *Proxy) mirrorRegistry(w http.ResponseWriter, r *http.Request) {
transport.WithCondition(proxy.shouldUseDragonflyForMirror),
transport.WithDefaultFilter(proxy.defaultFilter),
transport.WithDefaultBiz(bizTag),
transport.WithDumpHTTPContent(proxy.dumpHTTPContent),
)
if err != nil {
http.Error(w, fmt.Sprintf("failed to get transport: %v", err), http.StatusInternalServerError)
Expand Down Expand Up @@ -529,7 +551,6 @@ func (proxy *Proxy) shouldUseDragonflyForMirror(req *http.Request) bool {
// tunnelHTTPS handles a CONNECT request and proxy an https request through an
// http tunnel.
func tunnelHTTPS(w http.ResponseWriter, r *http.Request) {
logger.Debugf("Tunneling https request for %s", r.Host)
dst, err := net.DialTimeout("tcp", r.Host, 10*time.Second)
if err != nil {
http.Error(w, err.Error(), http.StatusServiceUnavailable)
Expand Down
1 change: 1 addition & 0 deletions client/daemon/proxy/proxy_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func NewProxyManager(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskMana
WithMaxConcurrency(opts.MaxConcurrency),
WithDefaultFilter(opts.DefaultFilter),
WithBasicAuth(opts.BasicAuth),
WithDumpHTTPContent(opts.DumpHTTPContent),
}

if registry != nil {
Expand Down
18 changes: 13 additions & 5 deletions client/daemon/proxy/proxy_sni.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,18 @@ func (proxy *Proxy) handleTLSConn(clientConn net.Conn, port int) {
defer tlsConn.Close()

rp := &httputil.ReverseProxy{
Director: func(r *http.Request) {
r.URL.Scheme = schemaHTTPS
r.URL.Host = serverName
Director: func(req *http.Request) {
req.URL.Scheme = schemaHTTPS
req.URL.Host = serverName
if port != portHTTPS {
r.URL.Host = fmt.Sprintf("%s:%d", serverName, port)
req.URL.Host = fmt.Sprintf("%s:%d", serverName, port)
}
if proxy.dumpHTTPContent {
if out, e := httputil.DumpRequest(req, false); e == nil {
logger.Debugf("dump request in SNI ReverseProxy: %s", string(out))
} else {
logger.Errorf("dump request in SNI ReverseProxy error: %s", e)
}
}
},
Transport: proxy.newTransport(proxy.remoteConfig(serverName)),
Expand All @@ -122,7 +129,8 @@ func (proxy *Proxy) handleTLSConn(clientConn net.Conn, port int) {
wg := sync.WaitGroup{}
wg.Add(1)
// NOTE: http.Serve always returns a non-nil error
if err := http.Serve(&singleUseListener{&customCloseConn{tlsConn, wg.Done}}, rp); err != errServerClosed && err != http.ErrServerClosed {
err = http.Serve(&singleUseListener{&customCloseConn{tlsConn, wg.Done}}, rp)
if err != errServerClosed && err != http.ErrServerClosed {
logger.Errorf("failed to accept incoming HTTPS connections: %v", err)
}
wg.Wait()
Expand Down
68 changes: 58 additions & 10 deletions client/daemon/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,13 @@ import (
"fmt"
"net"
"net/http"
"net/http/httputil"
"regexp"
"strconv"
"time"

"github.com/go-http-utils/headers"

"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/client/daemon/peer"
Expand Down Expand Up @@ -61,6 +65,9 @@ type transport struct {

// defaultBiz is used when http request without X-Dragonfly-Biz Header
defaultBiz string

// dumpHTTPContent indicates to dump http request header and response header
dumpHTTPContent bool
}

// Option is functional config for transport.
Expand Down Expand Up @@ -114,6 +121,13 @@ func WithDefaultBiz(b string) Option {
}
}

func WithDumpHTTPContent(b bool) Option {
return func(rt *transport) *transport {
rt.dumpHTTPContent = b
return rt
}
}

// New constructs a new instance of a RoundTripper with additional options.
func New(options ...Option) (http.RoundTripper, error) {
rt := &transport{
Expand All @@ -130,19 +144,25 @@ func New(options ...Option) (http.RoundTripper, error) {

// RoundTrip only process first redirect at present
// fix resource release
func (rt *transport) RoundTrip(req *http.Request) (*http.Response, error) {
func (rt *transport) RoundTrip(req *http.Request) (resp *http.Response, err error) {
if rt.shouldUseDragonfly(req) {
// delete the Accept-Encoding header to avoid returning the same cached
// result for different requests
req.Header.Del("Accept-Encoding")
logger.Debugf("round trip with dragonfly: %s", req.URL.String())
return rt.download(req)
resp, err = rt.download(req)
} else {
logger.Debugf("round trip directly, method: %s, url: %s", req.Method, req.URL.String())
req.Host = req.URL.Host
req.Header.Set("Host", req.Host)
resp, err = rt.baseRoundTripper.RoundTrip(req)
}
logger.Debugf("round trip directly: %s %s", req.Method, req.URL.String())
req.Host = req.URL.Host
req.Header.Set("Host", req.Host)

return rt.baseRoundTripper.RoundTrip(req)
if err != nil {
logger.With("method", req.Method, "url", req.URL.String()).
Errorf("round trip error: %s", err)
}
rt.processDumpHTTPContent(req, resp)
return resp, err
}

// NeedUseDragonfly is the default value for shouldUseDragonfly, which downloads all
Expand Down Expand Up @@ -202,14 +222,42 @@ func (rt *transport) download(req *http.Request) (*http.Response, error) {
hdr := httputils.MapToHeader(attr)
log.Infof("download stream attribute: %v", hdr)

var contentLength int64 = -1
if l, ok := attr[headers.ContentLength]; ok {
if i, e := strconv.ParseInt(l, 10, 64); e == nil {
contentLength = i
}
}

resp := &http.Response{
StatusCode: 200,
Body: body,
Header: hdr,
StatusCode: 200,
Body: body,
Header: hdr,
ContentLength: contentLength,

Proto: req.Proto,
ProtoMajor: req.ProtoMajor,
ProtoMinor: req.ProtoMinor,
}
return resp, nil
}

func (rt *transport) processDumpHTTPContent(req *http.Request, resp *http.Response) {
if !rt.dumpHTTPContent {
return
}
if out, e := httputil.DumpRequest(req, false); e == nil {
logger.Debugf("dump request in transport: %s", string(out))
} else {
logger.Errorf("dump request in transport error: %s", e)
}
if out, e := httputil.DumpResponse(resp, false); e == nil {
logger.Debugf("dump response in transport: %s", string(out))
} else {
logger.Errorf("dump response in transport error: %s", e)
}
}

func defaultHTTPTransport(cfg *tls.Config) *http.Transport {
if cfg == nil {
cfg = &tls.Config{InsecureSkipVerify: true}
Expand Down

0 comments on commit 58951ff

Please sign in to comment.