Skip to content

Commit

Permalink
Issue #9: Add websocket support
Browse files Browse the repository at this point in the history
  • Loading branch information
magiconair committed Nov 11, 2015
1 parent f356464 commit f534849
Show file tree
Hide file tree
Showing 13 changed files with 191 additions and 83 deletions.
4 changes: 2 additions & 2 deletions listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

"github.com/eBay/fabio/config"
"github.com/eBay/fabio/exit"
"github.com/eBay/fabio/route"
"github.com/eBay/fabio/proxy"
)

var quit = make(chan bool)
Expand All @@ -35,7 +35,7 @@ func startListeners(listen []config.Listen, wait time.Duration, h http.Handler)
<-quit

// disable routing for all requests
route.Shutdown()
proxy.Shutdown()

// trigger graceful shutdown
log.Printf("[INFO] Graceful shutdown over %s", wait)
Expand Down
3 changes: 2 additions & 1 deletion listen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/eBay/fabio/config"
"github.com/eBay/fabio/proxy"
"github.com/eBay/fabio/route"
)

Expand Down Expand Up @@ -87,7 +88,7 @@ func TestGracefulShutdown(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
startListeners([]config.Listen{l}, 250*time.Millisecond, route.NewProxy(http.DefaultTransport, config.Proxy{}))
startListeners([]config.Listen{l}, 250*time.Millisecond, proxy.New(http.DefaultTransport, config.Proxy{}))
}()

// trigger shutdown after some time
Expand Down
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net/http"

"github.com/eBay/fabio/config"
"github.com/eBay/fabio/proxy"
"github.com/eBay/fabio/route"
"github.com/eBay/fabio/ui"
)
Expand Down Expand Up @@ -39,7 +40,7 @@ func main() {
startListeners(cfg.Listen, cfg.Proxy.ShutdownWait, newProxy(cfg))
}

func newProxy(cfg *config.Config) *route.Proxy {
func newProxy(cfg *config.Config) *proxy.Proxy {
if err := route.SetPickerStrategy(cfg.Proxy.Strategy); err != nil {
log.Fatal("[FATAL] ", err)
}
Expand All @@ -54,7 +55,7 @@ func newProxy(cfg *config.Config) *route.Proxy {
}).Dial,
}

return route.NewProxy(tr, cfg.Proxy)
return proxy.New(tr, cfg.Proxy)
}

func startUI(cfg *config.Config) {
Expand Down
46 changes: 46 additions & 0 deletions proxy/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package proxy

import (
"net"
"net/http"
"net/http/httputil"
"time"

"github.com/eBay/fabio/config"
)

type httpProxy struct {
tr http.RoundTripper
cfg config.Proxy
}

func newHTTPProxy(tr http.RoundTripper, cfg config.Proxy) http.Handler {
return &httpProxy{tr, cfg}
}

func (p *httpProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
t := target(r)
if t == nil {
w.WriteHeader(http.StatusNotFound)
return
}

if p.cfg.ClientIPHeader != "" {
ip, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
http.Error(w, "cannot parse "+r.RemoteAddr, http.StatusInternalServerError)
return
}
r.Header.Set(p.cfg.ClientIPHeader, ip)
}

if p.cfg.TLSHeader != "" && r.TLS != nil {
r.Header.Set(p.cfg.TLSHeader, p.cfg.TLSHeaderValue)
}

start := time.Now()
rp := httputil.NewSingleHostReverseProxy(t.URL)
rp.Transport = p.tr
rp.ServeHTTP(w, r)
t.Timer.UpdateSince(start)
}
50 changes: 50 additions & 0 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package proxy

import (
"log"
"net/http"
"time"

gometrics "github.com/eBay/fabio/_third_party/github.com/rcrowley/go-metrics"
"github.com/eBay/fabio/config"
"github.com/eBay/fabio/route"
)

// Proxy is a dynamic reverse proxy.
type Proxy struct {
httpProxy http.Handler
wsProxy http.Handler
requests gometrics.Timer
}

func New(tr http.RoundTripper, cfg config.Proxy) *Proxy {
return &Proxy{
httpProxy: newHTTPProxy(tr, cfg),
wsProxy: newWSProxy(),
requests: gometrics.GetOrRegisterTimer("requests", gometrics.DefaultRegistry),
}
}

func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if ShuttingDown() {
http.Error(w, "shutting down", http.StatusServiceUnavailable)
return
}

h := p.httpProxy
if r.Header.Get("Upgrade") == "websocket" {
h = p.wsProxy
}

start := time.Now()
h.ServeHTTP(w, r)
p.requests.UpdateSince(start)
}

func target(r *http.Request) *route.Target {
t := route.GetTable().Lookup(r, r.Header.Get("trace"))
if t == nil {
log.Print("[WARN] No route for ", r.URL)
}
return t
}
2 changes: 1 addition & 1 deletion route/shutdown.go → proxy/shutdown.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package route
package proxy

import "sync/atomic"

Expand Down
71 changes: 71 additions & 0 deletions proxy/ws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package proxy

import (
"io"
"log"
"net/http"
"time"

"github.com/eBay/fabio/_third_party/golang.org/x/net/websocket"
)

type wsProxy struct{}

func newWSProxy() http.Handler {
return &wsProxy{}
}

func (p *wsProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
websocket.Handler(wsHandler).ServeHTTP(w, r)
}

func wsHandler(client *websocket.Conn) {
r := client.Request()
t := target(r)
if t == nil {
client.WriteClose(http.StatusNotFound)
return
}

logerr := func(err error) {
if err != nil && err != io.EOF {
log.Printf("[INFO] WS error for %s. %s", r.URL, err)
}
}

// dial the server
origin := r.Header.Get("Origin")
targetURL := "ws://" + t.URL.Host + r.RequestURI
server, err := websocket.Dial(targetURL, "", origin)
if err != nil {
logerr(err)
return
}

// send data from client to server
cerr := make(chan error)
go func() {
_, err := io.Copy(server, client)
cerr <- err
}()

// send data from server to client
serr := make(chan error)
go func() {
_, err = io.Copy(client, server)
serr <- err
}()

// wait for either server or client to exit
// and then close the other side
start := time.Now()
select {
case err := <-cerr:
logerr(err)
server.Close()
case err := <-serr:
logerr(err)
client.Close()
}
t.Timer.UpdateSince(start)
}
6 changes: 3 additions & 3 deletions route/picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
var pick picker = rndPicker

// Picker selects a target from a list of targets
type picker func(r *route) *target
type picker func(r *route) *Target

// SetPickerStrategy sets the picker function for the proxy.
func SetPickerStrategy(s string) error {
Expand All @@ -26,12 +26,12 @@ func SetPickerStrategy(s string) error {
}

// rndPicker picks a random target from the list of targets.
func rndPicker(r *route) *target {
func rndPicker(r *route) *Target {
return r.wTargets[randIntn(len(r.wTargets))]
}

// rrPicker picks the next target from a list of targets using round-robin.
func rrPicker(r *route) *target {
func rrPicker(r *route) *Target {
u := r.wTargets[r.total%uint64(len(r.wTargets))]
atomic.AddUint64(&r.total, 1)
return u
Expand Down
61 changes: 0 additions & 61 deletions route/proxy.go

This file was deleted.

16 changes: 8 additions & 8 deletions route/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,19 @@ type route struct {
path string

// targets contains the list of URLs
targets []*target
targets []*Target

// wTargets contains 100 targets distributed
// according to their weight and ordered RR in the
// same order as targets
wTargets []*target
wTargets []*Target

// total contains the total number of requests for this route.
// Used by the RRPicker
total uint64
}

type target struct {
type Target struct {
// service is the name of the service the targetURL points to
service string

Expand All @@ -57,7 +57,7 @@ type target struct {
weight float64

// timer measures throughput and latency of this target
timer gometrics.Timer
Timer gometrics.Timer
}

func newRoute(host, path string) *route {
Expand All @@ -72,13 +72,13 @@ func (r *route) addTarget(service string, targetURL *url.URL, fixedWeight float6
name := metrics.TargetName(service, r.host, r.path, targetURL)
timer := gometrics.GetOrRegisterTimer(name, gometrics.DefaultRegistry)

t := &target{service: service, tags: tags, URL: targetURL, fixedWeight: fixedWeight, timer: timer}
t := &Target{service: service, tags: tags, URL: targetURL, fixedWeight: fixedWeight, Timer: timer}
r.targets = append(r.targets, t)
r.weighTargets()
}

func (r *route) delService(service string) {
var clone []*target
var clone []*Target
for _, t := range r.targets {
if t.service == service {
continue
Expand All @@ -90,7 +90,7 @@ func (r *route) delService(service string) {
}

func (r *route) delTarget(service string, targetURL *url.URL) {
var clone []*target
var clone []*Target
for _, t := range r.targets {
if t.service == service && t.URL.String() == targetURL.String() {
continue
Expand Down Expand Up @@ -227,7 +227,7 @@ func (r *route) weighTargets() {
}
sort.Sort(slotCount)

slots := make([]*target, gotSlots)
slots := make([]*Target, gotSlots)
for _, c := range slotCount {
if c.n <= 0 {
continue
Expand Down
2 changes: 1 addition & 1 deletion route/route_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func benchmarkGet(t Table, m matcher, p picker, pb *testing.PB) {
match, pick = m, p
k, n := len(reqs), 0
for pb.Next() {
t.lookup(reqs[n%k], "")
t.Lookup(reqs[n%k], "")
n++
}
}
Loading

0 comments on commit f534849

Please sign in to comment.