From ea7f9ddf7e16c0ebe6e861a101fd5f5f100c862a Mon Sep 17 00:00:00 2001 From: Frank Schroeder Date: Thu, 21 Jul 2016 00:11:24 +0200 Subject: [PATCH] Issue #129: Support for server-sent events (SSE) Original PR #130 by @madeddie Add a proxy.flushinterval option which enables periodic flushing of the repsonse buffer for SSE connections which have the 'Accept' header set to 'text/event-stream'. This is really a route specific option and should be configured as such once this becomes possible. --- config/config.go | 1 + config/default.go | 1 + config/load.go | 1 + config/load_test.go | 2 ++ fabio.properties | 12 +++++++++++- proxy/http.go | 4 +++- proxy/proxy.go | 8 +++++++- 7 files changed, 26 insertions(+), 3 deletions(-) diff --git a/config/config.go b/config/config.go index 0732c50ac..5929a1ddf 100644 --- a/config/config.go +++ b/config/config.go @@ -54,6 +54,7 @@ type Proxy struct { KeepAliveTimeout time.Duration ReadTimeout time.Duration WriteTimeout time.Duration + FlushInterval time.Duration LocalIP string ClientIPHeader string TLSHeader string diff --git a/config/default.go b/config/default.go index 9fd9fb85d..f73a27d4a 100644 --- a/config/default.go +++ b/config/default.go @@ -13,6 +13,7 @@ var Default = &Config{ Matcher: "prefix", NoRouteStatus: 404, DialTimeout: 30 * time.Second, + FlushInterval: time.Second, LocalIP: LocalIPString(), }, Registry: Registry{ diff --git a/config/load.go b/config/load.go index 2de45888b..857ab5731 100644 --- a/config/load.go +++ b/config/load.go @@ -104,6 +104,7 @@ func load(p *properties.Properties) (cfg *Config, err error) { f.KVSliceVar(&cfg.CertSourcesValue, "proxy.cs", Default.CertSourcesValue, "certificate sources") f.DurationVar(&cfg.Proxy.ReadTimeout, "proxy.readtimeout", Default.Proxy.ReadTimeout, "read timeout for incoming requests") f.DurationVar(&cfg.Proxy.WriteTimeout, "proxy.writetimeout", Default.Proxy.WriteTimeout, "write timeout for outgoing responses") + f.DurationVar(&cfg.Proxy.FlushInterval, "proxy.flushinterval", Default.Proxy.FlushInterval, "flush interval for streaming responses") f.StringVar(&cfg.Metrics.Target, "metrics.target", Default.Metrics.Target, "metrics backend") f.StringVar(&cfg.Metrics.Prefix, "metrics.prefix", Default.Metrics.Prefix, "prefix for reported metrics") f.DurationVar(&cfg.Metrics.Interval, "metrics.interval", Default.Metrics.Interval, "metrics reporting interval") diff --git a/config/load_test.go b/config/load_test.go index 7b9c278fd..525b05e9b 100644 --- a/config/load_test.go +++ b/config/load_test.go @@ -24,6 +24,7 @@ proxy.keepalivetimeout = 4s proxy.dialtimeout = 60s proxy.readtimeout = 5s proxy.writetimeout = 10s +proxy.flushinterval = 15s proxy.maxconn = 666 proxy.header.clientip = clientip proxy.header.tls = tls @@ -79,6 +80,7 @@ aws.apigw.cert.cn = furb KeepAliveTimeout: 4 * time.Second, ReadTimeout: 5 * time.Second, WriteTimeout: 10 * time.Second, + FlushInterval: 15 * time.Second, ClientIPHeader: "clientip", TLSHeader: "tls", TLSHeaderValue: "tls-true", diff --git a/fabio.properties b/fabio.properties index 964e7bdfa..f6c2ddf12 100644 --- a/fabio.properties +++ b/fabio.properties @@ -274,6 +274,16 @@ # proxy.dialtimeout = 30s +# proxy.flushinterval configures periodic flushing of the +# response buffer for SSE (server-sent events) connections. +# They are detected when the 'Accept' header is +# 'text/event-stream'. +# +# The default is +# +# proxy.flushinterval = 1s + + # proxy.maxconn configures the maximum number of cached # incoming and outgoing connections. # @@ -536,4 +546,4 @@ # # The default is # -# ui.title = \ No newline at end of file +# ui.title = diff --git a/proxy/http.go b/proxy/http.go index 2739a2d67..0020c57e4 100644 --- a/proxy/http.go +++ b/proxy/http.go @@ -4,10 +4,12 @@ import ( "net/http" "net/http/httputil" "net/url" + "time" ) -func newHTTPProxy(t *url.URL, tr http.RoundTripper) http.Handler { +func newHTTPProxy(t *url.URL, tr http.RoundTripper, flush time.Duration) http.Handler { rp := httputil.NewSingleHostReverseProxy(t) rp.Transport = tr + rp.FlushInterval = flush return rp } diff --git a/proxy/proxy.go b/proxy/proxy.go index f69d05408..201a173dd 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -48,8 +48,14 @@ func (p *Proxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { // To use the filtered proxy use // h = newWSProxy(t.URL) + + case r.Header.Get("Accept") == "text/event-stream": + // use the flush interval for SSE (server-sent events) + // must be > 0s to be effective + h = newHTTPProxy(t.URL, p.tr, p.cfg.FlushInterval) + default: - h = newHTTPProxy(t.URL, p.tr) + h = newHTTPProxy(t.URL, p.tr, time.Duration(0)) } start := time.Now()