Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 613 tcp dynamic #626

Merged
merged 8 commits into from
Dec 2, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Listen struct {
TLSCiphers []uint16
ProxyProto bool
ProxyHeaderTimeout time.Duration
Refresh time.Duration
}

type UI struct {
Expand Down
10 changes: 8 additions & 2 deletions config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ func parseListen(cfg map[string]string, cs map[string]CertSource, readTimeout, w
case "proto":
l.Proto = v
switch l.Proto {
case "tcp", "tcp+sni", "http", "https", "grpc", "grpcs":
case "tcp", "tcp+sni", "tcp-dynamic", "http", "https", "grpc", "grpcs":
// ok
default:
return Listen{}, fmt.Errorf("unknown protocol %q", v)
Expand Down Expand Up @@ -400,6 +400,12 @@ func parseListen(cfg map[string]string, cs map[string]CertSource, readTimeout, w
return Listen{}, err
}
l.ProxyHeaderTimeout = d
case "refresh":
d, err := time.ParseDuration(v)
if err != nil {
return Listen{}, err
}
l.Refresh = d
}
}

Expand All @@ -409,7 +415,7 @@ func parseListen(cfg map[string]string, cs map[string]CertSource, readTimeout, w
if l.Addr == "" {
return Listen{}, fmt.Errorf("need listening host:port")
}
if csName != "" && l.Proto != "https" && l.Proto != "tcp" && l.Proto != "grpcs" {
if csName != "" && l.Proto != "https" && l.Proto != "tcp" && l.Proto != "tcp-dynamic" && l.Proto != "grpcs" {
return Listen{}, fmt.Errorf("cert source requires proto 'https', 'tcp' or 'grpcs'")
aaronhurt marked this conversation as resolved.
Show resolved Hide resolved
}
if csName == "" && l.Proto == "https" {
Expand Down
7 changes: 7 additions & 0 deletions config/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ func TestLoad(t *testing.T) {
return cfg
},
},
{
args: []string{"-proxy.addr", ":5555;proto=tcp-dynamic"},
cfg: func(cfg *Config) *Config {
cfg.Listen = []Listen{{Addr: ":5555", Proto: "tcp-dynamic"}}
return cfg
},
},
{
desc: "-proxy.addr with tls configs",
args: []string{"-proxy.addr", `:5555;rt=1s;wt=2s;tlsmin=0x0300;tlsmax=0x305;tlsciphers="0x123,0x456"`},
Expand Down
17 changes: 17 additions & 0 deletions docs/content/feature/tcp-dynamic-proxy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
title: "TCP Dynamic Proxy"
---

The TCP dynamic proxy is similar to the TCP Proxy, but the listener is started from the Consul urlprefix tag.
Also, the service is defined with IP and port, so that multiple services can be defined on the load balancer using
the same TCP port. Connections are forwarded to services based on the combination of ip:port

To use TCP Dynamic proxy support the service needs to advertise `urlprefix-127.0.0.1:1234 proto=tcp` in
Consul. In addition, fabio needs to be configured with a placeholder for the proxy.addr.:

```
fabio -proxy.addr '0.0.0.0:0;proto=tcp-dynamic;refresh=5s'
```

The TCP listener is started for the given TCP ports. To use IP addressing to separate the services, matching IP
addressed would need to be added to the loopback interface on the host.
7 changes: 7 additions & 0 deletions fabio.properties
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@
# * https for HTTPS based protocols
# * tcp for a raw TCP proxy with or witout TLS support
# * tcp+sni for an SNI aware TCP proxy
# * tcp-dynamic for a consul driven TCP proxy
#
# If no 'proto' option is specified then the protocol
# is either 'http' or 'https' depending on whether a
Expand Down Expand Up @@ -233,6 +234,9 @@
# pxytimeout: Sets PROXY protocol header read timeout as a duration (e.g. '250ms').
# This defaults to 250ms if not set when 'pxyproto' is enabled.
#
# refresh: Sets the refresh interval to check the route table for updates.
# Used when 'tcp-dynamic' is enabled.
#
# TLS options:
#
# tlsmin: Sets the minimum TLS version for the handshake. This value
Expand Down Expand Up @@ -273,6 +277,9 @@
# # TCP listener on port 443 with SNI routing
# proxy.addr = :443;proto=tcp+sni
#
# # TCP listeners using consul for config with 5 second refresh interval
# proxy.addr = 0.0.0.0:0;proto=tcp-dynamic;refresh=5s
#
# The default is
#
# proxy.addr = :9999
Expand Down
66 changes: 66 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,50 @@ func startServers(cfg *config.Config) {
exit.Fatal("[FATAL] ", err)
}
}()
case "tcp-dynamic":
go func() {
var buffer strings.Builder
for {
time.Sleep(l.Refresh)
table := route.GetTable()
ports := []string{}
for target, rts := range table {
buffer.WriteString(":")
buffer.WriteString(strings.Split(target, ":")[1])

schemes := tableSchemes(rts)
if len(schemes) == 1 && schemes[0] == "tcp" {
ports = append(ports, buffer.String())
}
buffer.Reset()
}
ports = unique(ports)
for _, port := range ports {
l := l
port := port
conn, err := net.Listen("tcp", port)
if err != nil {
log.Printf("[DEBUG] Dynamic TCP port %s in use", port)
continue
}
conn.Close()
log.Printf("[INFO] Starting dynamic TCP listener on port %s ", port)
go func() {
h := &tcp.DynamicProxy{
DialTimeout: cfg.Proxy.DialTimeout,
Lookup: lookupHostFn(cfg),
Conn: metrics.DefaultRegistry.GetCounter("tcp.conn"),
ConnFail: metrics.DefaultRegistry.GetCounter("tcp.connfail"),
Noroute: metrics.DefaultRegistry.GetCounter("tcp.noroute"),
}
l.Addr = port
if err := proxy.ListenAndServeTCP(l, h, tlscfg); err != nil {
exit.Fatal("[FATAL] ", err)
}
}()
}
}
}()
default:
exit.Fatal("[FATAL] Invalid protocol ", l.Proto)
}
Expand Down Expand Up @@ -527,3 +571,25 @@ func toJSON(v interface{}) string {
}
return string(data)
}

func unique(strSlice []string) []string {
keys := make(map[string]bool)
list := []string{}
for _, entry := range strSlice {
if _, value := keys[entry]; !value {
keys[entry] = true
list = append(list, entry)
}
}
return list
}

func tableSchemes(r route.Routes) []string {
schemes := []string{}
for _, rt := range r {
for _, target := range rt.Targets {
schemes = append(schemes, target.URL.Scheme)
}
}
return unique(schemes)
}
82 changes: 82 additions & 0 deletions proxy/tcp/tcp_dynamic_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package tcp

import (
"io"
"log"
"net"
"time"

"github.com/fabiolb/fabio/metrics"
"github.com/fabiolb/fabio/route"
)

// Proxy implements a generic TCP proxying handler.
type DynamicProxy struct {
// DialTimeout sets the timeout for establishing the outbound
// connection.
DialTimeout time.Duration

// Lookup returns a target host for the given request.
// The proxy will panic if this value is nil.
Lookup func(host string) *route.Target

// Conn counts the number of connections.
Conn metrics.Counter

// ConnFail counts the failed upstream connection attempts.
ConnFail metrics.Counter

// Noroute counts the failed Lookup() calls.
Noroute metrics.Counter
}

func (p *DynamicProxy) ServeTCP(in net.Conn) error {
defer in.Close()

if p.Conn != nil {
p.Conn.Inc(1)
}
target := in.LocalAddr().String()
t := p.Lookup(target)
if t == nil {
if p.Noroute != nil {
p.Noroute.Inc(1)
}
return nil
}
addr := t.URL.Host
log.Printf("[DEBUG] Connection: %s incoming %s to %s: ", in.RemoteAddr(), target, addr)

if t.AccessDeniedTCP(in) {
return nil
}

out, err := net.DialTimeout("tcp", addr, p.DialTimeout)
defer out.Close()
if err != nil {
log.Print("[WARN] tcp: cannot connect to upstream ", addr)
if p.ConnFail != nil {
p.ConnFail.Inc(1)
}
return err
}

errc := make(chan error, 2)
cp := func(dst io.Writer, src io.Reader, c metrics.Counter) {
errc <- copyBuffer(dst, src, c)
}

// rx measures the traffic to the upstream server (in <- out)
// tx measures the traffic from the upstream server (out <- in)
rx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".rx")
tx := metrics.DefaultRegistry.GetCounter(t.TimerName + ".tx")

go cp(in, out, rx)
go cp(out, in, tx)
err = <-errc
if err != nil && err != io.EOF {
log.Print("[WARN]: tcp: ", err)
return err
}
return nil
}
32 changes: 32 additions & 0 deletions proxy/tcp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,38 @@ var echoHandler tcp.HandlerFunc = func(c net.Conn) error {
return err
}

// TestTCPDynamicProxy tests proxying an unencrypted TCP connection
// to a TCP upstream server.
func TestTCPDyanmicProxy(t *testing.T) {
srv := tcptest.NewServer(echoHandler)
defer srv.Close()

// start proxy
proxyAddr := "127.0.0.1:57778"
go func() {
h := &tcp.DynamicProxy{
Lookup: func(h string) *route.Target {
tbl, _ := route.NewTable("route add srv 127.0.0.1:57778 tcp://" + srv.Addr)
return tbl.LookupHost(h, route.Picker["rr"])
},
}
l := config.Listen{Addr: proxyAddr}
if err := ListenAndServeTCP(l, h, nil); err != nil {
t.Log("ListenAndServeTCP: ", err)
}
}()
defer Close()

// connect to proxy
out, err := tcptest.NewRetryDialer().Dial("tcp", proxyAddr)
if err != nil {
t.Fatalf("net.Dial: %#v", err)
}
defer out.Close()

testRoundtrip(t, out)
}

// TestTCPProxy tests proxying an unencrypted TCP connection
// to a TCP upstream server.
func TestTCPProxy(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions registry/consul/routecmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ func parseURLPrefixTag(s, prefix string, env map[string]string) (route, opts str
return s, opts, true
}

if !strings.Contains(s, "/") {
return s, opts, true
}
// prefix is "host/path"
p = strings.SplitN(s, "/", 2)
if len(p) == 1 {
Expand Down
5 changes: 3 additions & 2 deletions registry/consul/routecmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,15 @@ func TestParseTag(t *testing.T) {
ok bool
}{
{tag: "p", route: "", ok: false},
{tag: "p-", route: "", ok: false},
{tag: "p- ", route: "", ok: false},
{tag: "p-", route: "", ok: true},
{tag: "p- ", route: "", ok: true},
{tag: "p-/", route: "/", ok: true},
{tag: " p-/", route: "/", ok: true},
{tag: "p-/ ", route: "/", ok: true},
{tag: "p- / ", route: "/", ok: true},
{tag: "p-/foo", route: "/foo", ok: true},
{tag: "p- /foo", route: "/foo", ok: true},
{tag: "p-1.1.1.1:999", route: "1.1.1.1:999", ok: true},
{tag: "p-bar/foo", route: "bar/foo", ok: true},
{tag: "p-bar/foo/foo", route: "bar/foo/foo", ok: true},
{tag: "p-www.bar.com/foo/foo", route: "www.bar.com/foo/foo", ok: true},
Expand Down