-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathservice.go
341 lines (306 loc) · 9.01 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
// Package http implements a Functions HTTP middleware for use by
// scaffolding which exposes a function as a network service which handles
// http requests.
package http
import (
"bufio"
"context"
"fmt"
"net"
"net/http"
"os"
"os/signal"
"runtime"
"strings"
"syscall"
"time"
"github.com/rs/zerolog/log"
)
const (
DefaultLogLevel = LogDebug
DefaultListenAddress = "127.0.0.1:8080"
)
const (
ServerShutdownTimeout = 30 * time.Second
InstanceStopTimeout = 30 * time.Second
)
// Start an intance using a new Service
// Note that for CloudEvent Handlers this effectively accepts ANY because
// the actual type of the handler function is determined later.
func Start(f Handler) error {
log.Debug().Msg("func runtime creating function instance")
return New(f).Start(context.Background())
}
// Service exposes a Function Instance as a an HTTP service.
type Service struct {
http.Server
listener net.Listener
stop chan error
f Handler
}
// New Service which serves the given instance.
func New(f Handler) *Service {
svc := &Service{
f: f,
stop: make(chan error),
Server: http.Server{
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 30 * time.Second,
MaxHeaderBytes: 1 << 20,
ReadHeaderTimeout: 2 * time.Second,
},
}
mux := http.NewServeMux()
mux.HandleFunc("/health/readiness", svc.Ready)
mux.HandleFunc("/health/liveness", svc.Alive)
mux.HandleFunc("/", svc.Handle)
svc.Handler = mux
// Print some helpful information about which interfaces the function
// is correctly implementing
logImplements(f)
return svc
}
// log which interfaces the function implements.
// This could be more verbose for new users:
func logImplements(f any) {
if _, ok := f.(Starter); ok {
log.Info().Msg("Function implements Start")
}
if _, ok := f.(Stopper); ok {
log.Info().Msg("Function implements Stop")
}
if _, ok := f.(ReadinessReporter); ok {
log.Info().Msg("Function implements Ready")
}
if _, ok := f.(LivenessReporter); ok {
log.Info().Msg("Function implements Alive")
}
}
// Start
// Will stop when the context is canceled, a runtime error is encountered,
// or an os interrupt or kill signal is received.
// By default it listens on the default address DefaultListenAddress.
// This can be modified using the environment variable LISTEN_ADDRESS
func (s *Service) Start(ctx context.Context) (err error) {
// Get the listen address
// TODO: Currently this is an env var for legacy reasons. Logic should
// be moved into the generated mainfiles, and this setting be an optional
// functional option WithListenAddress(os.Getenv("LISTEN_ADDRESS"))
addr := listenAddress()
log.Debug().Str("address", addr).Msg("function starting")
// Listen
if s.listener, err = net.Listen("tcp", addr); err != nil {
return
}
// Start
// Starts the function instance in a separate routine, sending any
// runtime errors on s.stop.
if err = s.startInstance(ctx); err != nil {
return
}
// Wait for signals
// Interrupts and Kill signals
// sending a message on the s.stop channel if either are received.
s.handleSignals()
// Listen and serve
go func() {
if err := s.Serve(s.listener); err != http.ErrServerClosed {
log.Error().Err(err).Msg("http server exited with unexpected error")
s.stop <- err
}
}()
log.Debug().Msg("waiting for stop signals or errors")
// Wait for either a context cancellation or a signal on the stop channel.
select {
case err = <-s.stop:
if err != nil {
log.Error().Err(err).Msg("function error")
}
case <-ctx.Done():
log.Debug().Msg("function canceled")
}
return s.shutdown(err)
}
func listenAddress() string {
// If they are using the corret LISTEN_ADRESS, use this immediately
listenAddress := os.Getenv("LISTEN_ADDRESS")
if listenAddress != "" {
return listenAddress
}
// Legacy logic if ADDRESS or PORT provided
address := os.Getenv("ADDRESS")
port := os.Getenv("PORT")
if address != "" || port != "" {
if address != "" {
log.Warn().Msg("Environment variable ADDRESS is deprecated and support will be removed in future versions. Try rebuilding your Function with the latest version of func to use LISTEN_ADDRESS instead.")
} else {
address = "127.0.0.1"
}
if port != "" {
log.Warn().Msg("Environment variable PORT is deprecated and support will be removed in future version.s Try rebuilding your Function with the latest version of func to use LISTEN_ADDRESS instead.")
} else {
port = "8080"
}
return address + ":" + port
}
return DefaultListenAddress
}
// Addr returns the address upon which the service is listening if started;
// nil otherwise.
func (s *Service) Addr() net.Addr {
if s.listener == nil {
return nil
}
return s.listener.Addr()
}
// Handle requests for the instance
func (s *Service) Handle(w http.ResponseWriter, r *http.Request) {
s.f.Handle(w, r)
}
// Ready handles readiness checks.
func (s *Service) Ready(w http.ResponseWriter, r *http.Request) {
if i, ok := s.f.(ReadinessReporter); ok {
ready, err := i.Ready(r.Context())
if err != nil {
message := "error checking readiness"
log.Debug().Err(err).Msg(message)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, "error checking readiness: ", err.Error())
return
}
if !ready {
message := "function not yet ready"
log.Debug().Msg(message)
w.WriteHeader(http.StatusServiceUnavailable)
fmt.Fprintln(w, message)
return
}
}
fmt.Fprintf(w, "READY")
}
// Alive handles liveness checks.
func (s *Service) Alive(w http.ResponseWriter, r *http.Request) {
if i, ok := s.f.(LivenessReporter); ok {
alive, err := i.Alive(r.Context())
if err != nil {
message := "error checking liveness"
log.Err(err).Msg(message)
w.WriteHeader(http.StatusInternalServerError)
fmt.Fprint(w, "error checking liveness: ", err.Error())
return
}
if !alive {
message := "function not alive"
log.Debug().Msg(message)
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte(message))
return
}
}
fmt.Fprintf(w, "ALIVE")
}
func (s *Service) startInstance(ctx context.Context) error {
if i, ok := s.f.(Starter); ok {
cfg, err := newCfg()
if err != nil {
return err
}
go func() {
if err := i.Start(ctx, cfg); err != nil {
s.stop <- err
}
}()
} else {
log.Debug().Msg("function does not implement Start. Skipping")
}
return nil
}
func (s *Service) handleSignals() {
sigs := make(chan os.Signal, 2)
signal.Notify(sigs)
go func() {
for {
sig := <-sigs
if sig == syscall.SIGINT || sig == syscall.SIGTERM {
log.Debug().Any("signal", sig).Msg("signal received")
s.stop <- nil
} else if runtime.GOOS == "linux" && sig == syscall.Signal(0x17) {
// Ignore SIGURG; signal 23 (0x17)
// See https://go.googlesource.com/proposal/+/master/design/24543-non-cooperative-preemption.md
}
}
}()
}
// readCfg returns a map representation of ./cfg
// Empty map is returned if ./cfg does not exist.
// Error is returned for invalid entries.
// keys and values are space-trimmed.
// Quotes are removed from values.
func readCfg() (map[string]string, error) {
cfg := map[string]string{}
f, err := os.Open("cfg")
if err != nil {
log.Debug().Msg("no static config")
return cfg, nil
}
defer f.Close()
scanner := bufio.NewScanner(f)
i := 0
for scanner.Scan() {
i++
line := scanner.Text()
parts := strings.SplitN(line, "=", 2)
if len(parts) != 2 {
return cfg, fmt.Errorf("config line %v invalid: %v", i, line)
}
cfg[strings.TrimSpace(parts[0])] = strings.Trim(strings.TrimSpace(parts[1]), "\"")
}
return cfg, scanner.Err()
}
// newCfg creates a final map of config values built from the static
// values in `cfg` and all environment variables.
func newCfg() (cfg map[string]string, err error) {
if cfg, err = readCfg(); err != nil {
return
}
for _, e := range os.Environ() {
pair := strings.SplitN(e, "=", 2)
cfg[pair[0]] = pair[1]
}
return
}
// shutdown is invoked when the stop channel receives a message and attempts to
// gracefully cease execution.
// Passed in is the message received on the stop channel, wich is either an
// error in the case of a runtime error, or nil in the case of a context
// cancellation or sigint/sigkill.
func (s *Service) shutdown(sourceErr error) (err error) {
log.Debug().Msg("function stopping")
var runtimeErr, instanceErr error
// Start a graceful shutdown of the HTTP server
ctx, cancel := context.WithTimeout(context.Background(), ServerShutdownTimeout)
defer cancel()
runtimeErr = s.Shutdown(ctx)
// Start a graceful shutdown of the Function instance
if i, ok := s.f.(Stopper); ok {
ctx, cancel = context.WithTimeout(context.Background(), InstanceStopTimeout)
defer cancel()
instanceErr = i.Stop(ctx)
}
return collapseErrors("shutdown error", sourceErr, instanceErr, runtimeErr)
}
// collapseErrors returns the first non-nil error which it is passed,
// printing the rest to log with the given prefix.
func collapseErrors(msg string, ee ...error) (err error) {
for _, e := range ee {
if e != nil {
if err == nil {
err = e
} else {
log.Error().Err(e).Msg(msg)
}
}
}
return
}