forked from AndrewBurian/eventsource
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient.go
174 lines (153 loc) · 3.61 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package eventsource
import (
"context"
"io"
"log"
"net/http"
"sync"
"time"
)
// Client wraps an http connection and converts it to an
// event stream.
type Client struct {
flusher http.Flusher
write io.Writer
ctx context.Context
events chan Event
closed bool
waiter sync.WaitGroup
lock sync.Mutex
flushing *time.Timer
flushLatency time.Duration
}
type Options struct {
ChannelSize int
FlushLatency time.Duration
}
// NewClient creates a client wrapping a response writer.
// The response writer must support http.Flusher interface.
// When writing, the client will automatically send some headers. Passing the
// original http.Request helps determine which headers, but the request it is
// optional.
// Returns nil on error.
func NewClient(w http.ResponseWriter, req *http.Request, options ...Options) *Client {
if len(options) > 1 {
log.Panicln("only one Options value may be provided")
}
flushLatency := 100 * time.Millisecond
channelSize := 100
if len(options) == 1 {
options := options[0]
if options.FlushLatency > 0 {
flushLatency = options.FlushLatency
}
if options.ChannelSize > 0 {
channelSize = options.ChannelSize
}
}
c := &Client{
events: make(chan Event, channelSize),
write: w,
flushLatency: flushLatency,
}
// Check to ensure we support flushing
flusher, ok := w.(http.Flusher)
if !ok {
return nil
}
c.flusher = flusher
c.ctx = req.Context()
// Send the initial headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
if req == nil || req.ProtoMajor < 2 {
w.Header().Set("Connection", "keep-alive")
}
flusher.Flush()
// start the sending thread
c.waiter.Add(1)
go c.run()
return c
}
func (c *Client) Closed() bool {
c.lock.Lock()
defer c.lock.Unlock()
return c.closed
}
// Send queues an event to be sent to the client.
// This does not block until the event has been sent,
// however it could block if the event queue is full.
// Returns an error if the Client has disconnected
func (c *Client) Send(ev Event) error {
if c.Closed() {
return io.ErrClosedPipe
}
c.events <- ev
return nil
}
// Send queues an event to be sent to the client.
// This guarantees not block until the event has been sent.
// Returns true if blocked
// Returns an error if the Client has disconnected
func (c *Client) SendNonBlocking(ev Event) (bool, error) {
if c.Closed() {
return false, io.ErrClosedPipe
}
select {
case c.events <- ev:
default:
return true, nil
}
return false, nil
}
// Shutdown terminates a client connection
func (c *Client) Shutdown() {
close(c.events)
c.waiter.Wait()
}
// Wait blocks and waits for the client to be shutdown.
// Call this in http handler threads to prevent the server from closing
// the client connection.
func (c *Client) Wait() {
c.waiter.Wait()
}
// Worker thread for the client responsible for writing events
func (c *Client) run() {
done := c.ctx.Done()
for {
select {
case ev, ok := <-c.events:
// check for shutdown
if !ok {
c.lock.Lock()
c.closed = true
c.lock.Unlock()
c.waiter.Done()
return
}
// send the event
c.lock.Lock()
io.Copy(c.write, &ev)
if c.flushing == nil {
c.flushing = time.AfterFunc(c.flushLatency, c.flush)
}
c.lock.Unlock()
case <-done:
c.lock.Lock()
c.closed = true
c.lock.Unlock()
c.waiter.Done()
return
}
}
}
// flusher amortizes flushing costs for high activity SSE channels
func (c *Client) flush() {
c.lock.Lock()
defer c.lock.Unlock()
if c.closed || c.ctx.Err() != nil {
return
}
c.flushing = nil
c.flusher.Flush()
}