@@ -17,7 +17,9 @@ limitations under the License.
17
17
package controller
18
18
19
19
import (
20
+ "bytes"
20
21
"fmt"
22
+ "net/http"
21
23
"reflect"
22
24
"sort"
23
25
"strconv"
@@ -35,6 +37,7 @@ import (
35
37
unversionedcore "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/typed/core/internalversion"
36
38
"k8s.io/kubernetes/pkg/client/record"
37
39
"k8s.io/kubernetes/pkg/fields"
40
+ "k8s.io/kubernetes/pkg/labels"
38
41
"k8s.io/kubernetes/pkg/util/flowcontrol"
39
42
"k8s.io/kubernetes/pkg/util/intstr"
40
43
@@ -66,7 +69,8 @@ const (
66
69
67
70
var (
68
71
// list of ports that cannot be used by TCP or UDP services
69
- reservedPorts = []string {"80" , "443" , "8181" , "18080" }
72
+ reservedPorts = []string {"80" , "443" , "8181" , "18080" }
73
+ httpSendReplacer = strings .NewReplacer ("\r " , "\\ r" , "\n " , "\\ n" , "\" " , "\\ \" " )
70
74
)
71
75
72
76
// GenericController holds the boilerplate code required to build an Ingress controlller.
@@ -78,12 +82,14 @@ type GenericController struct {
78
82
svcController * cache.Controller
79
83
secrController * cache.Controller
80
84
mapController * cache.Controller
85
+ podController * cache.Controller
81
86
82
87
ingLister cache_store.StoreToIngressLister
83
88
svcLister cache.StoreToServiceLister
84
89
endpLister cache.StoreToEndpointsLister
85
90
secrLister cache_store.StoreToSecretsLister
86
91
mapLister cache_store.StoreToConfigmapLister
92
+ podLister cache.StoreToPodLister
87
93
88
94
annotations annotationExtractor
89
95
@@ -302,6 +308,19 @@ func newIngressController(config *Configuration) *GenericController {
302
308
glog .Warning ("Update of ingress status is disabled (flag --update-status=false was specified)" )
303
309
}
304
310
311
+ ic .podLister .Indexer , ic .podController = cache .NewIndexerInformer (
312
+ cache .NewListWatchFromClient (ic .cfg .Client .Core ().RESTClient (), "pods" , ic .cfg .Namespace , fields .Everything ()),
313
+ & api.Pod {},
314
+ ic .cfg .ResyncPeriod ,
315
+ cache.ResourceEventHandlerFuncs {},
316
+ cache.Indexers {cache .NamespaceIndex : cache .MetaNamespaceIndexFunc })
317
+
318
+ ic .syncStatus = status .NewStatusSyncer (status.Config {
319
+ Client : config .Client ,
320
+ PublishService : ic .cfg .PublishService ,
321
+ IngressLister : ic .ingLister ,
322
+ })
323
+
305
324
ic .annotations = newAnnotationExtractor (ic )
306
325
307
326
return & ic
@@ -312,6 +331,7 @@ func (ic *GenericController) controllersInSync() bool {
312
331
ic .svcController .HasSynced () &&
313
332
ic .endpController .HasSynced () &&
314
333
ic .secrController .HasSynced () &&
334
+ ic .podController .HasSynced () &&
315
335
ic .mapController .HasSynced ()
316
336
}
317
337
@@ -746,13 +766,84 @@ func (ic *GenericController) createUpstreams(data []interface{}) map[string]*ing
746
766
continue
747
767
}
748
768
upstreams [name ].Endpoints = endp
769
+
770
+ probe , err := ic .findProbeForService (svcKey , & path .Backend .ServicePort )
771
+ if err != nil {
772
+ glog .Errorf ("Failed to check for readinessProbe for %v: %v" , name , err )
773
+ }
774
+ if probe != nil {
775
+ check , err := ic .getUpstreamCheckForProbe (probe )
776
+ if err != nil {
777
+ glog .Errorf ("Failed to create health check for probe: %v" , err )
778
+ } else {
779
+ upstreams [name ].UpstreamCheck = check
780
+ }
781
+ }
782
+
749
783
}
750
784
}
751
785
}
752
786
753
787
return upstreams
754
788
}
755
789
790
+ func (ic * GenericController ) findProbeForService (svcKey string , servicePort * intstr.IntOrString ) (* api.Probe , error ) {
791
+ svcObj , svcExists , err := ic .svcLister .Indexer .GetByKey (svcKey )
792
+ if err != nil {
793
+ return nil , fmt .Errorf ("error getting service %v from the cache: %v" , svcKey , err )
794
+ }
795
+
796
+ if ! svcExists {
797
+ err = fmt .Errorf ("service %v does not exists" , svcKey )
798
+ return nil , err
799
+ }
800
+
801
+ svc := svcObj .(* api.Service )
802
+
803
+ selector := labels .SelectorFromSet (svc .Spec .Selector )
804
+ pods , err := ic .podLister .List (selector )
805
+ if err != nil {
806
+ return nil , fmt .Errorf ("Failed to get pod listing: %v" , err )
807
+ }
808
+ for _ , pod := range pods {
809
+ for _ , container := range pod .Spec .Containers {
810
+ for _ , port := range container .Ports {
811
+ if servicePort .Type == intstr .Int && int (port .ContainerPort ) == servicePort .IntValue () ||
812
+ servicePort .Type == intstr .String && port .Name == servicePort .String () {
813
+ if container .ReadinessProbe != nil {
814
+ if container .ReadinessProbe .HTTPGet == nil || container .ReadinessProbe .HTTPGet .Scheme != "HTTP" {
815
+ continue
816
+ }
817
+ return container .ReadinessProbe , nil
818
+ }
819
+ }
820
+ }
821
+ }
822
+ }
823
+ return nil , nil
824
+ }
825
+
826
+ func (ic * GenericController ) getUpstreamCheckForProbe (probe * api.Probe ) (* ingress.UpstreamCheck , error ) {
827
+ var headers http.Header = make (http.Header )
828
+ for _ , header := range probe .HTTPGet .HTTPHeaders {
829
+ headers .Add (header .Name , header .Value )
830
+ }
831
+ headersWriter := new (bytes.Buffer )
832
+ headers .Write (headersWriter )
833
+
834
+ httpSend := httpSendReplacer .Replace (
835
+ fmt .Sprintf ("GET %s HTTP/1.0\r \n %s\r \n " , probe .HTTPGet .Path , string (headersWriter .Bytes ())))
836
+
837
+ return & ingress.UpstreamCheck {
838
+ HttpSend : httpSend ,
839
+ Port : probe .HTTPGet .Port .IntValue (),
840
+ Rise : probe .SuccessThreshold ,
841
+ Fall : probe .FailureThreshold ,
842
+ TimeoutMillis : probe .TimeoutSeconds * 1000 ,
843
+ IntervalMillis : probe .PeriodSeconds * 1000 ,
844
+ }, nil
845
+ }
846
+
756
847
// serviceEndpoints returns the upstream servers (endpoints) associated
757
848
// to a service.
758
849
func (ic * GenericController ) serviceEndpoints (svcKey , backendPort string ,
@@ -1018,6 +1109,7 @@ func (ic GenericController) Start() {
1018
1109
go ic .svcController .Run (ic .stopCh )
1019
1110
go ic .secrController .Run (ic .stopCh )
1020
1111
go ic .mapController .Run (ic .stopCh )
1112
+ go ic .podController .Run (ic .stopCh )
1021
1113
1022
1114
go ic .secretQueue .Run (5 * time .Second , ic .stopCh )
1023
1115
go ic .syncQueue .Run (5 * time .Second , ic .stopCh )
0 commit comments