From 4799f7f307896a56823975d83c6e551359183cba Mon Sep 17 00:00:00 2001 From: "craig.hazen" Date: Tue, 12 Mar 2019 11:17:39 -0500 Subject: [PATCH 1/8] moved tcp dynamic --- .gitignore | 1 + Makefile | 32 +++++++++++++ config/config.go | 1 + config/load.go | 10 +++- config/load_test.go | 7 +++ fabio.properties | 7 +++ main.go | 66 +++++++++++++++++++++++++ proxy/tcp/tcp_dynamic_proxy.go | 82 ++++++++++++++++++++++++++++++++ proxy/tcp_integration_test.go | 32 +++++++++++++ registry/consul/routecmd.go | 3 ++ registry/consul/routecmd_test.go | 5 +- 11 files changed, 242 insertions(+), 4 deletions(-) create mode 100644 proxy/tcp/tcp_dynamic_proxy.go diff --git a/.gitignore b/.gitignore index b214b1379..3599ae810 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ fabio.sublime-* demo/cert/ /pkg/ dist/ +fabio.properties.test \ No newline at end of file diff --git a/Makefile b/Makefile index ef34c1106..3b5ea0f9f 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,10 @@ GOVERSION = $(shell go version | awk '{print $$3;}') # GORELEASER is the path to the goreleaser binary. GORELEASER = $(shell which goreleaser) +UNAME := $(shell uname) +VAULTCHECK = $(shell which vault) +CONSULCHECK = $(shell which consul) + # pin versions for CI builds CI_CONSUL_VERSION=1.3.0 CI_VAULT_VERSION=0.11.4 @@ -44,6 +48,34 @@ build: gofmt # test runs the tests test: build + +ifeq ($(UNAME),Darwin) +ifeq ($(VAULTCHECK),/usr/local/bin/vault) + @echo "Vault found, skipping install..." +else + brew install vault +endif + +ifeq ($(CONSULCHECK),/usr/local/bin/consul) + @echo "Consul found, skipping install..." + go test -p 1 ${GOFILES} --cover + +else + brew install consul + go test -p 1 ${GOFILES} --cover +endif +endif + +ifeq ($(UNAME),Linux) + yum install unzip -y + wget https://releases.hashicorp.com/consul/1.4.0/consul_1.4.0_linux_amd64.zip + unzip consul_1.4.0_linux_amd64.zip + cp consul /usr/local/bin/ + wget https://releases.hashicorp.com/vault/1.0.2/vault_1.0.2_linux_amd64.zip + unzip vault_1.0.2_linux_amd64.zip + cp vault /usr/local/bin + go test ${GOFILES} --cover +endif go test -v -test.timeout 15s `go list ./... | grep -v '/vendor/'` # gofmt runs gofmt on the code diff --git a/config/config.go b/config/config.go index 5c84c8ad5..7fd656235 100644 --- a/config/config.go +++ b/config/config.go @@ -44,6 +44,7 @@ type Listen struct { TLSCiphers []uint16 ProxyProto bool ProxyHeaderTimeout time.Duration + Refresh time.Duration } type UI struct { diff --git a/config/load.go b/config/load.go index 3f0a71f75..2f91e8838 100644 --- a/config/load.go +++ b/config/load.go @@ -345,7 +345,7 @@ func parseListen(cfg map[string]string, cs map[string]CertSource, readTimeout, w case "proto": l.Proto = v switch l.Proto { - case "tcp", "tcp+sni", "http", "https", "grpc", "grpcs": + case "tcp", "tcp+sni", "tcp-dynamic", "http", "https", "grpc", "grpcs": // ok default: return Listen{}, fmt.Errorf("unknown protocol %q", v) @@ -400,6 +400,12 @@ func parseListen(cfg map[string]string, cs map[string]CertSource, readTimeout, w return Listen{}, err } l.ProxyHeaderTimeout = d + case "refresh": + d, err := time.ParseDuration(v) + if err != nil { + return Listen{}, err + } + l.Refresh = d } } @@ -409,7 +415,7 @@ func parseListen(cfg map[string]string, cs map[string]CertSource, readTimeout, w if l.Addr == "" { return Listen{}, fmt.Errorf("need listening host:port") } - if csName != "" && l.Proto != "https" && l.Proto != "tcp" && l.Proto != "grpcs" { + if csName != "" && l.Proto != "https" && l.Proto != "tcp" && l.Proto != "tcp-dynamic" && l.Proto != "grpcs" { return Listen{}, fmt.Errorf("cert source requires proto 'https', 'tcp' or 'grpcs'") } if csName == "" && l.Proto == "https" { diff --git a/config/load_test.go b/config/load_test.go index 869fbb041..d1cd0eefb 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -105,6 +105,13 @@ func TestLoad(t *testing.T) { return cfg }, }, + { + args: []string{"-proxy.addr", ":5555;proto=tcp-dynamic"}, + cfg: func(cfg *Config) *Config { + cfg.Listen = []Listen{{Addr: ":5555", Proto: "tcp-dynamic"}} + return cfg + }, + }, { desc: "-proxy.addr with tls configs", args: []string{"-proxy.addr", `:5555;rt=1s;wt=2s;tlsmin=0x0300;tlsmax=0x305;tlsciphers="0x123,0x456"`}, diff --git a/fabio.properties b/fabio.properties index 52e2e8d0a..e38035732 100644 --- a/fabio.properties +++ b/fabio.properties @@ -199,6 +199,7 @@ # * https for HTTPS based protocols # * tcp for a raw TCP proxy with or witout TLS support # * tcp+sni for an SNI aware TCP proxy +# * tcp-dynamic for a consul driven TCP proxy # # If no 'proto' option is specified then the protocol # is either 'http' or 'https' depending on whether a @@ -233,6 +234,9 @@ # pxytimeout: Sets PROXY protocol header read timeout as a duration (e.g. '250ms'). # This defaults to 250ms if not set when 'pxyproto' is enabled. # +# refresh: Sets the refresh interval to check the route table for updates. +# Used when 'tcp-dynamic' is enabled. +# # TLS options: # # tlsmin: Sets the minimum TLS version for the handshake. This value @@ -273,6 +277,9 @@ # # TCP listener on port 443 with SNI routing # proxy.addr = :443;proto=tcp+sni # +# # TCP listeners using consul for config with 5 second refresh interval +# proxy.addr = 0.0.0.0:0;proto=tcp-dynamic;refresh=5s +# # The default is # # proxy.addr = :9999 diff --git a/main.go b/main.go index 9a79dbb5c..cb9e99fc4 100644 --- a/main.go +++ b/main.go @@ -338,6 +338,50 @@ func startServers(cfg *config.Config) { exit.Fatal("[FATAL] ", err) } }() + case "tcp-dynamic": + go func() { + var buffer strings.Builder + for { + time.Sleep(l.Refresh) + table := route.GetTable() + ports := []string{} + for target, rts := range table { + buffer.WriteString(":") + buffer.WriteString(strings.Split(target, ":")[1]) + + schemes := tableSchemes(rts) + if len(schemes) == 1 && schemes[0] == "tcp" { + ports = append(ports, buffer.String()) + } + buffer.Reset() + } + ports = unique(ports) + for _, port := range ports { + l := l + port := port + conn, err := net.Listen("tcp", port) + if err != nil { + log.Printf("[DEBUG] Dynamic TCP port %s in use", port) + continue + } + conn.Close() + log.Printf("[INFO] Starting dynamic TCP listener on port %s ", port) + go func() { + h := &tcp.DynamicProxy{ + DialTimeout: cfg.Proxy.DialTimeout, + Lookup: lookupHostFn(cfg), + Conn: metrics.DefaultRegistry.GetCounter("tcp.conn"), + ConnFail: metrics.DefaultRegistry.GetCounter("tcp.connfail"), + Noroute: metrics.DefaultRegistry.GetCounter("tcp.noroute"), + } + l.Addr = port + if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil { + exit.Fatal("[FATAL] ", err) + } + }() + } + } + }() default: exit.Fatal("[FATAL] Invalid protocol ", l.Proto) } @@ -527,3 +571,25 @@ func toJSON(v interface{}) string { } return string(data) } + +func unique(strSlice []string) []string { + keys := make(map[string]bool) + list := []string{} + for _, entry := range strSlice { + if _, value := keys[entry]; !value { + keys[entry] = true + list = append(list, entry) + } + } + return list +} + +func tableSchemes(r route.Routes) []string { + schemes := []string{} + for _, rt := range r { + for _, target := range rt.Targets { + schemes = append(schemes, target.URL.Scheme) + } + } + return unique(schemes) +} \ No newline at end of file diff --git a/proxy/tcp/tcp_dynamic_proxy.go b/proxy/tcp/tcp_dynamic_proxy.go new file mode 100644 index 000000000..e4de3eaaf --- /dev/null +++ b/proxy/tcp/tcp_dynamic_proxy.go @@ -0,0 +1,82 @@ +package tcp + +import ( + "io" + "log" + "net" + "time" + + "github.com/fabiolb/fabio/metrics" + "github.com/fabiolb/fabio/route" +) + +// Proxy implements a generic TCP proxying handler. +type DynamicProxy struct { + // DialTimeout sets the timeout for establishing the outbound + // connection. + DialTimeout time.Duration + + // Lookup returns a target host for the given request. + // The proxy will panic if this value is nil. + Lookup func(host string) *route.Target + + // Conn counts the number of connections. + Conn metrics.Counter + + // ConnFail counts the failed upstream connection attempts. + ConnFail metrics.Counter + + // Noroute counts the failed Lookup() calls. + Noroute metrics.Counter +} + +func (p *DynamicProxy) ServeTCP(in net.Conn) error { + defer in.Close() + + if p.Conn != nil { + p.Conn.Inc(1) + } + target := in.LocalAddr().String() + t := p.Lookup(target) + if t == nil { + if p.Noroute != nil { + p.Noroute.Inc(1) + } + return nil + } + addr := t.URL.Host + log.Printf("[DEBUG] Connection: %s incoming %s to %s: ", in.RemoteAddr(), target, addr) + + if t.AccessDeniedTCP(in) { + return nil + } + + out, err := net.DialTimeout("tcp", addr, p.DialTimeout) + defer out.Close() + if err != nil { + log.Print("[WARN] tcp: cannot connect to upstream ", addr) + if p.ConnFail != nil { + p.ConnFail.Inc(1) + } + return err + } + + errc := make(chan error, 2) + cp := func(dst io.Writer, src io.Reader, c metrics.Counter) { + errc <- copyBuffer(dst, src, c) + } + + // rx measures the traffic to the upstream server (in <- out) + // tx measures the traffic from the upstream server (out <- in) + rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx") + tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx") + + go cp(in, out, rx) + go cp(out, in, tx) + err = <-errc + if err != nil && err != io.EOF { + log.Print("[WARN]: tcp: ", err) + return err + } + return nil +} diff --git a/proxy/tcp_integration_test.go b/proxy/tcp_integration_test.go index 425e43758..598d742ea 100644 --- a/proxy/tcp_integration_test.go +++ b/proxy/tcp_integration_test.go @@ -32,6 +32,38 @@ var echoHandler tcp.HandlerFunc = func(c net.Conn) error { return err } +// TestTCPDynamicProxy tests proxying an unencrypted TCP connection +// to a TCP upstream server. +func TestTCPDyanmicProxy(t *testing.T) { + srv := tcptest.NewServer(echoHandler) + defer srv.Close() + + // start proxy + proxyAddr := "127.0.0.1:57778" + go func() { + h := &tcp.DynamicProxy{ + Lookup: func(h string) *route.Target { + tbl, _ := route.NewTable("route add srv 127.0.0.1:57778 tcp://" + srv.Addr) + return tbl.LookupHost(h, route.Picker["rr"]) + }, + } + l := config.Listen{Addr: proxyAddr} + if err := ListenAndServeTCP(l, h, nil); err != nil { + t.Log("ListenAndServeTCP: ", err) + } + }() + defer Close() + + // connect to proxy + out, err := tcptest.NewRetryDialer().Dial("tcp", proxyAddr) + if err != nil { + t.Fatalf("net.Dial: %#v", err) + } + defer out.Close() + + testRoundtrip(t, out) +} + // TestTCPProxy tests proxying an unencrypted TCP connection // to a TCP upstream server. func TestTCPProxy(t *testing.T) { diff --git a/registry/consul/routecmd.go b/registry/consul/routecmd.go index 2f2707b7a..7a17ef0be 100644 --- a/registry/consul/routecmd.go +++ b/registry/consul/routecmd.go @@ -134,6 +134,9 @@ func parseURLPrefixTag(s, prefix string, env map[string]string) (route, opts str return s, opts, true } + if !strings.Contains(s, "/") { + return s, opts, true + } // prefix is "host/path" p = strings.SplitN(s, "/", 2) if len(p) == 1 { diff --git a/registry/consul/routecmd_test.go b/registry/consul/routecmd_test.go index 2c906e476..d6b19b031 100644 --- a/registry/consul/routecmd_test.go +++ b/registry/consul/routecmd_test.go @@ -64,14 +64,15 @@ func TestParseTag(t *testing.T) { ok bool }{ {tag: "p", route: "", ok: false}, - {tag: "p-", route: "", ok: false}, - {tag: "p- ", route: "", ok: false}, + {tag: "p-", route: "", ok: true}, + {tag: "p- ", route: "", ok: true}, {tag: "p-/", route: "/", ok: true}, {tag: " p-/", route: "/", ok: true}, {tag: "p-/ ", route: "/", ok: true}, {tag: "p- / ", route: "/", ok: true}, {tag: "p-/foo", route: "/foo", ok: true}, {tag: "p- /foo", route: "/foo", ok: true}, + {tag: "p-1.1.1.1:999", route: "1.1.1.1:999", ok: true}, {tag: "p-bar/foo", route: "bar/foo", ok: true}, {tag: "p-bar/foo/foo", route: "bar/foo/foo", ok: true}, {tag: "p-www.bar.com/foo/foo", route: "www.bar.com/foo/foo", ok: true}, From c33f95d2ebc5ab9e017e3d9a6ee72d64233bee9c Mon Sep 17 00:00:00 2001 From: "Michael.Murphy" Date: Thu, 21 Mar 2019 11:27:20 -0500 Subject: [PATCH 2/8] setting .gitignore and Makefile back to master --- .gitignore | 1 - Makefile | 32 -------------------------------- 2 files changed, 33 deletions(-) diff --git a/.gitignore b/.gitignore index 3599ae810..b214b1379 100644 --- a/.gitignore +++ b/.gitignore @@ -20,4 +20,3 @@ fabio.sublime-* demo/cert/ /pkg/ dist/ -fabio.properties.test \ No newline at end of file diff --git a/Makefile b/Makefile index 3b5ea0f9f..ef34c1106 100644 --- a/Makefile +++ b/Makefile @@ -20,10 +20,6 @@ GOVERSION = $(shell go version | awk '{print $$3;}') # GORELEASER is the path to the goreleaser binary. GORELEASER = $(shell which goreleaser) -UNAME := $(shell uname) -VAULTCHECK = $(shell which vault) -CONSULCHECK = $(shell which consul) - # pin versions for CI builds CI_CONSUL_VERSION=1.3.0 CI_VAULT_VERSION=0.11.4 @@ -48,34 +44,6 @@ build: gofmt # test runs the tests test: build - -ifeq ($(UNAME),Darwin) -ifeq ($(VAULTCHECK),/usr/local/bin/vault) - @echo "Vault found, skipping install..." -else - brew install vault -endif - -ifeq ($(CONSULCHECK),/usr/local/bin/consul) - @echo "Consul found, skipping install..." - go test -p 1 ${GOFILES} --cover - -else - brew install consul - go test -p 1 ${GOFILES} --cover -endif -endif - -ifeq ($(UNAME),Linux) - yum install unzip -y - wget https://releases.hashicorp.com/consul/1.4.0/consul_1.4.0_linux_amd64.zip - unzip consul_1.4.0_linux_amd64.zip - cp consul /usr/local/bin/ - wget https://releases.hashicorp.com/vault/1.0.2/vault_1.0.2_linux_amd64.zip - unzip vault_1.0.2_linux_amd64.zip - cp vault /usr/local/bin - go test ${GOFILES} --cover -endif go test -v -test.timeout 15s `go list ./... | grep -v '/vendor/'` # gofmt runs gofmt on the code From e3fd9c82b2411162f602df741c6ed6c449b94ee8 Mon Sep 17 00:00:00 2001 From: "Michael.Murphy" Date: Thu, 21 Mar 2019 11:44:12 -0500 Subject: [PATCH 3/8] added features md for tcp-dynamic-proxy --- docs/content/feature/tcp-dynamic-proxy.md | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 docs/content/feature/tcp-dynamic-proxy.md diff --git a/docs/content/feature/tcp-dynamic-proxy.md b/docs/content/feature/tcp-dynamic-proxy.md new file mode 100644 index 000000000..67fee52f3 --- /dev/null +++ b/docs/content/feature/tcp-dynamic-proxy.md @@ -0,0 +1,17 @@ +--- +title: "TCP Dynamic Proxy" +--- + +The TCP dynamic proxy is similar to the TCP Proxy, but the listener is started from the Consul urlprefix tag. +Also, the service is defined with IP and port, so that multiple services can be defined on the load balancer using +the same TCP port. Connections are forwarded to services based on the combination of ip:port + +To use TCP Dynamic proxy support the service needs to advertise `urlprefix-127.0.0.1:1234 proto=tcp` in +Consul. In addition, fabio needs to be configured with a placeholder for the proxy.addr.: + +``` +fabio -proxy.addr '0.0.0.0:0;proto=tcp-dynamic' +``` + +The TCP listener is started for the given TCP ports. To use IP addressing to separate the services, matching IP +addressed would need to be added to the loopback interface on the host. \ No newline at end of file From 707006ee789f68e0a1a9b6dbdfc9cec065583cdd Mon Sep 17 00:00:00 2001 From: "Michael.Murphy" Date: Thu, 21 Mar 2019 13:38:20 -0500 Subject: [PATCH 4/8] udpated features doco --- docs/content/feature/tcp-dynamic-proxy.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/feature/tcp-dynamic-proxy.md b/docs/content/feature/tcp-dynamic-proxy.md index 67fee52f3..c362509c8 100644 --- a/docs/content/feature/tcp-dynamic-proxy.md +++ b/docs/content/feature/tcp-dynamic-proxy.md @@ -10,7 +10,7 @@ To use TCP Dynamic proxy support the service needs to advertise `urlprefix-127.0 Consul. In addition, fabio needs to be configured with a placeholder for the proxy.addr.: ``` -fabio -proxy.addr '0.0.0.0:0;proto=tcp-dynamic' +fabio -proxy.addr '0.0.0.0:0;proto=tcp-dynamic;refresh=5s' ``` The TCP listener is started for the given TCP ports. To use IP addressing to separate the services, matching IP From 38667eebdb2e36132952d019aa23cb7dfee7e886 Mon Sep 17 00:00:00 2001 From: "Michael.Murphy" Date: Mon, 8 Apr 2019 10:11:20 -0500 Subject: [PATCH 5/8] updated log message for cert source --- config/load.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config/load.go b/config/load.go index 2f91e8838..b83358d0f 100644 --- a/config/load.go +++ b/config/load.go @@ -416,7 +416,7 @@ func parseListen(cfg map[string]string, cs map[string]CertSource, readTimeout, w return Listen{}, fmt.Errorf("need listening host:port") } if csName != "" && l.Proto != "https" && l.Proto != "tcp" && l.Proto != "tcp-dynamic" && l.Proto != "grpcs" { - return Listen{}, fmt.Errorf("cert source requires proto 'https', 'tcp' or 'grpcs'") + return Listen{}, fmt.Errorf("cert source requires proto 'https', 'tcp', 'tcp-dynamic' or 'grpcs'") } if csName == "" && l.Proto == "https" { return Listen{}, fmt.Errorf("proto 'https' requires cert source") From 22548a540650a8fd7665c21249e43a00e9497047 Mon Sep 17 00:00:00 2001 From: "Michael.Murphy" Date: Thu, 16 May 2019 09:39:51 -0500 Subject: [PATCH 6/8] added split check for route --- main.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/main.go b/main.go index cb9e99fc4..30f19e455 100644 --- a/main.go +++ b/main.go @@ -346,16 +346,18 @@ func startServers(cfg *config.Config) { table := route.GetTable() ports := []string{} for target, rts := range table { - buffer.WriteString(":") - buffer.WriteString(strings.Split(target, ":")[1]) + if strings.Split(target, ":")[1] != "" { + buffer.WriteString(":") + buffer.WriteString(strings.Split(target, ":")[1]) - schemes := tableSchemes(rts) - if len(schemes) == 1 && schemes[0] == "tcp" { - ports = append(ports, buffer.String()) + schemes := tableSchemes(rts) + if len(schemes) == 1 && schemes[0] == "tcp" { + ports = append(ports, buffer.String()) + } + buffer.Reset() } - buffer.Reset() + ports = unique(ports) } - ports = unique(ports) for _, port := range ports { l := l port := port From 8b237994f146b56da00f711f16398f6a5d747429 Mon Sep 17 00:00:00 2001 From: "Michael.Murphy" Date: Thu, 16 May 2019 09:52:49 -0500 Subject: [PATCH 7/8] added : check for tcp-dynamic --- main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.go b/main.go index 30f19e455..a5118f038 100644 --- a/main.go +++ b/main.go @@ -346,7 +346,7 @@ func startServers(cfg *config.Config) { table := route.GetTable() ports := []string{} for target, rts := range table { - if strings.Split(target, ":")[1] != "" { + if strings.Contains(target, ":") { buffer.WriteString(":") buffer.WriteString(strings.Split(target, ":")[1]) From 5143fc8617099a0a90733f12a33ce4fc1a10888d Mon Sep 17 00:00:00 2001 From: "Michael.Murphy" Date: Mon, 22 Jul 2019 13:50:15 -0500 Subject: [PATCH 8/8] fixed dial timeout error --- proxy/tcp/tcp_dynamic_proxy.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proxy/tcp/tcp_dynamic_proxy.go b/proxy/tcp/tcp_dynamic_proxy.go index e4de3eaaf..2cd4ad8b3 100644 --- a/proxy/tcp/tcp_dynamic_proxy.go +++ b/proxy/tcp/tcp_dynamic_proxy.go @@ -52,7 +52,6 @@ func (p *DynamicProxy) ServeTCP(in net.Conn) error { } out, err := net.DialTimeout("tcp", addr, p.DialTimeout) - defer out.Close() if err != nil { log.Print("[WARN] tcp: cannot connect to upstream ", addr) if p.ConnFail != nil { @@ -60,6 +59,7 @@ func (p *DynamicProxy) ServeTCP(in net.Conn) error { } return err } + defer out.Close() errc := make(chan error, 2) cp := func(dst io.Writer, src io.Reader, c metrics.Counter) {