-
Notifications
You must be signed in to change notification settings - Fork 8.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add health checking from NGINX to individual endpoints. #146
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,9 @@ limitations under the License. | |
package controller | ||
|
||
import ( | ||
"bytes" | ||
"fmt" | ||
"net/http" | ||
"reflect" | ||
"sort" | ||
"strconv" | ||
|
@@ -35,6 +37,7 @@ import ( | |
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion" | ||
"k8s.io/kubernetes/pkg/client/record" | ||
"k8s.io/kubernetes/pkg/fields" | ||
"k8s.io/kubernetes/pkg/labels" | ||
"k8s.io/kubernetes/pkg/util/flowcontrol" | ||
"k8s.io/kubernetes/pkg/util/intstr" | ||
|
||
|
@@ -66,7 +69,8 @@ const ( | |
|
||
var ( | ||
// list of ports that cannot be used by TCP or UDP services | ||
reservedPorts = []string{"80", "443", "8181", "18080"} | ||
reservedPorts = []string{"80", "443", "8181", "18080"} | ||
httpSendReplacer = strings.NewReplacer("\r", "\\r", "\n", "\\n", "\"", "\\\"") | ||
) | ||
|
||
// GenericController holds the boilerplate code required to build an Ingress controlller. | ||
|
@@ -78,12 +82,14 @@ type GenericController struct { | |
svcController *cache.Controller | ||
secrController *cache.Controller | ||
mapController *cache.Controller | ||
podController *cache.Controller | ||
|
||
ingLister cache_store.StoreToIngressLister | ||
svcLister cache.StoreToServiceLister | ||
endpLister cache.StoreToEndpointsLister | ||
secrLister cache_store.StoreToSecretsLister | ||
mapLister cache_store.StoreToConfigmapLister | ||
podLister cache.StoreToPodLister | ||
|
||
annotations annotationExtractor | ||
|
||
|
@@ -302,6 +308,19 @@ func newIngressController(config *Configuration) *GenericController { | |
glog.Warning("Update of ingress status is disabled (flag --update-status=false was specified)") | ||
} | ||
|
||
ic.podLister.Indexer, ic.podController = cache.NewIndexerInformer( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fyi this will increase the memory footprint of the controller in large clusters. Somethig to keep in mind, not much we can do about it at this point I guess. We can think about optimizing that when the time comes, however we might have to update examples that have request/limits set (unsure if there are any). |
||
cache.NewListWatchFromClient(ic.cfg.Client.Core().RESTClient(), "pods", ic.cfg.Namespace, fields.Everything()), | ||
&api.Pod{}, | ||
ic.cfg.ResyncPeriod, | ||
cache.ResourceEventHandlerFuncs{}, | ||
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) | ||
|
||
ic.syncStatus = status.NewStatusSyncer(status.Config{ | ||
Client: config.Client, | ||
PublishService: ic.cfg.PublishService, | ||
IngressLister: ic.ingLister, | ||
}) | ||
|
||
ic.annotations = newAnnotationExtractor(ic) | ||
|
||
return &ic | ||
|
@@ -312,6 +331,7 @@ func (ic *GenericController) controllersInSync() bool { | |
ic.svcController.HasSynced() && | ||
ic.endpController.HasSynced() && | ||
ic.secrController.HasSynced() && | ||
ic.podController.HasSynced() && | ||
ic.mapController.HasSynced() | ||
} | ||
|
||
|
@@ -746,13 +766,84 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing | |
continue | ||
} | ||
upstreams[name].Endpoints = endp | ||
|
||
probe, err := ic.findProbeForService(svcKey, &path.Backend.ServicePort) | ||
if err != nil { | ||
glog.Errorf("Failed to check for readinessProbe for %v: %v", name, err) | ||
} | ||
if probe != nil { | ||
check, err := ic.getUpstreamCheckForProbe(probe) | ||
if err != nil { | ||
glog.Errorf("Failed to create health check for probe: %v", err) | ||
} else { | ||
upstreams[name].UpstreamCheck = check | ||
} | ||
} | ||
|
||
} | ||
} | ||
} | ||
|
||
return upstreams | ||
} | ||
|
||
func (ic *GenericController) findProbeForService(svcKey string, servicePort *intstr.IntOrString) (*api.Probe, error) { | ||
svcObj, svcExists, err := ic.svcLister.Indexer.GetByKey(svcKey) | ||
if err != nil { | ||
return nil, fmt.Errorf("error getting service %v from the cache: %v", svcKey, err) | ||
} | ||
|
||
if !svcExists { | ||
err = fmt.Errorf("service %v does not exists", svcKey) | ||
return nil, err | ||
} | ||
|
||
svc := svcObj.(*api.Service) | ||
|
||
selector := labels.SelectorFromSet(svc.Spec.Selector) | ||
pods, err := ic.podLister.List(selector) | ||
if err != nil { | ||
return nil, fmt.Errorf("Failed to get pod listing: %v", err) | ||
} | ||
for _, pod := range pods { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess you want this to deterministically give you the same health check, even if there are 3 endpoint pods behind the service with all different readiness probes, we should sort based on creation timestamp and always take the first one. |
||
for _, container := range pod.Spec.Containers { | ||
for _, port := range container.Ports { | ||
if servicePort.Type == intstr.Int && int(port.ContainerPort) == servicePort.IntValue() || | ||
servicePort.Type == intstr.String && port.Name == servicePort.String() { | ||
if container.ReadinessProbe != nil { | ||
if container.ReadinessProbe.HTTPGet == nil || container.ReadinessProbe.HTTPGet.Scheme != "HTTP" { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason to not support https health checks? (I'm fine with the HTTP caveat, just curious) |
||
continue | ||
} | ||
return container.ReadinessProbe, nil | ||
} | ||
} | ||
} | ||
} | ||
} | ||
return nil, nil | ||
} | ||
|
||
func (ic *GenericController) getUpstreamCheckForProbe(probe *api.Probe) (*ingress.UpstreamCheck, error) { | ||
var headers http.Header = make(http.Header) | ||
for _, header := range probe.HTTPGet.HTTPHeaders { | ||
headers.Add(header.Name, header.Value) | ||
} | ||
headersWriter := new(bytes.Buffer) | ||
headers.Write(headersWriter) | ||
|
||
httpSend := httpSendReplacer.Replace( | ||
fmt.Sprintf("GET %s HTTP/1.0\r\n%s\r\n", probe.HTTPGet.Path, string(headersWriter.Bytes()))) | ||
|
||
return &ingress.UpstreamCheck{ | ||
HttpSend: httpSend, | ||
Port: probe.HTTPGet.Port.IntValue(), | ||
Rise: probe.SuccessThreshold, | ||
Fall: probe.FailureThreshold, | ||
TimeoutMillis: probe.TimeoutSeconds * 1000, | ||
IntervalMillis: probe.PeriodSeconds * 1000, | ||
}, nil | ||
} | ||
|
||
// serviceEndpoints returns the upstream servers (endpoints) associated | ||
// to a service. | ||
func (ic *GenericController) serviceEndpoints(svcKey, backendPort string, | ||
|
@@ -1018,6 +1109,7 @@ func (ic GenericController) Start() { | |
go ic.svcController.Run(ic.stopCh) | ||
go ic.secrController.Run(ic.stopCh) | ||
go ic.mapController.Run(ic.stopCh) | ||
go ic.podController.Run(ic.stopCh) | ||
|
||
go ic.secretQueue.Run(5*time.Second, ic.stopCh) | ||
go ic.syncQueue.Run(5*time.Second, ic.stopCh) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -102,6 +102,16 @@ type BackendInfo struct { | |
Repository string `json:"repository"` | ||
} | ||
|
||
// UpstreamCheck is used to configure ingress health checks | ||
type UpstreamCheck struct { | ||
HttpSend string | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please add a godoc comment for each public field, especially HttpSend, Fall and Rise which are non-obvious. |
||
Port int | ||
IntervalMillis int32 | ||
Fall int32 | ||
Rise int32 | ||
TimeoutMillis int32 | ||
} | ||
|
||
// Configuration holds the definition of all the parts required to describe all | ||
// ingresses reachable by the ingress controller (using a filter by namespace) | ||
type Configuration struct { | ||
|
@@ -134,6 +144,8 @@ type Backend struct { | |
Secure bool `json:"secure"` | ||
// Endpoints contains the list of endpoints currently running | ||
Endpoints []Endpoint `json:"endpoints"` | ||
|
||
UpstreamCheck *UpstreamCheck | ||
} | ||
|
||
// Endpoint describes a kubernetes endpoint in an backend | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add an example, as in this pr: #235, there's already an examples/health-checks directory, just create a gce/ and nginx/ under that and add it under nginx/.
Please also mention in that example what this endpoint exposes.