diff --git a/cmd/manager/main.go b/cmd/manager/main.go index 1cd2b3bf..cb9a6b90 100644 --- a/cmd/manager/main.go +++ b/cmd/manager/main.go @@ -4,9 +4,6 @@ import ( "context" "flag" "fmt" - "go.medium.engineering/picchu/pkg/client/scheme" - "go.medium.engineering/picchu/pkg/webhook" - apps "k8s.io/api/apps/v1" "os" "runtime" "runtime/debug" @@ -19,12 +16,16 @@ import ( "github.com/operator-framework/operator-sdk/pkg/log/zap" "github.com/operator-framework/operator-sdk/pkg/metrics" sdkVersion "github.com/operator-framework/operator-sdk/version" + wpav1 "github.com/practo/k8s-worker-pod-autoscaler/pkg/apis/workerpodautoscaler/v1" "github.com/spf13/pflag" "go.medium.engineering/picchu/pkg/apis" picchu "go.medium.engineering/picchu/pkg/apis/picchu/v1alpha1" + "go.medium.engineering/picchu/pkg/client/scheme" "go.medium.engineering/picchu/pkg/controller" "go.medium.engineering/picchu/pkg/controller/utils" + "go.medium.engineering/picchu/pkg/webhook" istio "istio.io/client-go/pkg/apis/networking/v1alpha3" + apps "k8s.io/api/apps/v1" autoscaling "k8s.io/api/autoscaling/v2beta2" core "k8s.io/api/core/v1" k8sruntime "k8s.io/apimachinery/pkg/runtime" @@ -151,6 +152,7 @@ func main() { istio.AddToScheme, monitoring.AddToScheme, slo.AddToScheme, + wpav1.AddToScheme, } for _, sch := range []*k8sruntime.Scheme{mgr.GetScheme(), scheme.Scheme} { diff --git a/deploy/crds/picchu.medium.engineering_revisions_crd.yaml b/deploy/crds/picchu.medium.engineering_revisions_crd.yaml index 30e25934..af63ade7 100644 --- a/deploy/crds/picchu.medium.engineering_revisions_crd.yaml +++ b/deploy/crds/picchu.medium.engineering_revisions_crd.yaml @@ -2236,12 +2236,35 @@ spec: format: int32 type: integer requestsRateMetric: + description: 'RequestsRateMetric refers to a Prometheus Adapter + metric. See: https://github.com/DirectXMan12/k8s-prometheus-adapter' type: string targetCPUUtilizationPercentage: + description: TargetCPUUtilizationPercentage scales based on + CPU percentage format: int32 type: integer targetRequestsRate: + description: TargetRequestsRate scales based on the specified + RequestsRateMetric type: string + worker: + description: Worker specifies parameters for Worker Pod Autoscaler. + See https://github.com/practo/k8s-worker-pod-autoscaler + properties: + maxDisruption: + type: string + queueUri: + type: string + secondsToProcessOneJob: {} + targetMessagesPerWorker: + format: int32 + type: integer + required: + - maxDisruption + - queueUri + - targetMessagesPerWorker + type: object type: object serviceAccountName: type: string diff --git a/go.mod b/go.mod index 6e9dbec9..ee5e64fa 100644 --- a/go.mod +++ b/go.mod @@ -6,15 +6,16 @@ require ( github.com/Medium/service-level-operator v0.3.1-0.20200128160720-77476ad50a61 github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 github.com/coreos/prometheus-operator v0.38.1 - github.com/go-logr/logr v0.1.0 + github.com/go-logr/logr v0.2.0 github.com/go-logr/zapr v0.1.1 github.com/go-openapi/spec v0.19.7 github.com/gogo/protobuf v1.3.1 github.com/golang/mock v1.4.3 github.com/google/uuid v1.1.1 github.com/operator-framework/operator-sdk v0.13.0 - github.com/prometheus/client_golang v1.6.0 - github.com/prometheus/common v0.9.1 + github.com/practo/k8s-worker-pod-autoscaler v1.1.1-0.20200722110630-c31dc858b6f9 + github.com/prometheus/client_golang v1.7.0 + github.com/prometheus/common v0.10.0 github.com/prometheus/prometheus v2.3.2+incompatible github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.6.0 @@ -33,6 +34,9 @@ require ( sigs.k8s.io/controller-runtime v0.6.0 ) +// Override dependancy from k8s-worker-pod-autoscaler +replace github.com/go-logr/logr => github.com/go-logr/logr v0.1.0 + // Pinned to kubernetes-1.17 replace ( github.com/golang/mock => github.com/golang/mock v1.3.1 diff --git a/go.sum b/go.sum index 724368c7..a74a24e7 100644 --- a/go.sum +++ b/go.sum @@ -110,9 +110,11 @@ github.com/auth0/go-jwt-middleware v0.0.0-20170425171159-5493cabe49f7/go.mod h1: github.com/aws/aws-sdk-go v1.16.26/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.17.7/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go v1.23.12/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= +github.com/aws/aws-sdk-go v1.29.15/go.mod h1:1KvfttTE3SPKMpo8g2c6jL3ZKfXtFvKscTgahTma5Xg= github.com/baiyubin/aliyun-sts-go-sdk v0.0.0-20180326062324-cfa1a18b161f/go.mod h1:AuiFmCCPBSrqvVMvuqFuk0qogytodnVFVSN5CeJB8Gc= github.com/bazelbuild/bazel-gazelle v0.0.0-20181012220611-c728ce9f663e/go.mod h1:uHBSeeATKpVazAACZBDPL/Nk/UhQDDsJWDlqYJo8/Us= github.com/bazelbuild/buildtools v0.0.0-20180226164855-80c7f0d45d7e/go.mod h1:5JP0TXzWDHXv8qvxRC4InIazwdyDseBDbzESUMKk1yU= +github.com/beanstalkd/go-beanstalk v0.0.0-20200526060843-1cc502ecaf3c/go.mod h1:Q3f6RCbUHp8RHSfBiPUZBojK76rir8Rl+KINuz2/sYs= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0 h1:HWo1m869IqiPhD389kmkxeTalrjNbbJTC8LXupb+sl0= @@ -350,6 +352,7 @@ github.com/go-openapi/validate v0.19.5/go.mod h1:8DJv2CVJQ6kGNpFW6eV9N3JviE1C85n github.com/go-ozzo/ozzo-validation v3.5.0+incompatible/go.mod h1:gsEKFIVnabGBt6mXmxK0MoFy+cZoTJY6mu5Ll3LVLBU= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gobuffalo/envy v1.6.5/go.mod h1:N+GkhhZ/93bGZc6ZKhJLP6+m+tCNPKwgSpH9kaifseQ= @@ -544,7 +547,6 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62Fo= github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.8/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= -github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10 h1:Kz6Cvnvv2wGdaG/V8yMvfkmNiXq9Ya2KUv4rouJJr68= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/jsonnet-bundler/jsonnet-bundler v0.1.0/go.mod h1:YKsSFc9VFhhLITkJS3X2PrRqWG9u2Jq99udTdDjQLfM= @@ -738,12 +740,18 @@ github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= github.com/pquerna/cachecontrol v0.0.0-20171018203845-0dec1b30a021/go.mod h1:prYjPmNq4d1NPVmpShWobRqXY3q7Vp+80DqgxxUrUIA= github.com/pquerna/ffjson v0.0.0-20180717144149-af8b230fcd20/go.mod h1:YARuvh7BUWHNhzDq2OM5tzR2RiCcN2D7sapiKyCel/M= +github.com/practo/k8s-worker-pod-autoscaler v1.1.1-0.20200722110630-c31dc858b6f9 h1:FZfqpzIdRRMjdodpMB8nXH15sTT9BBgUwHkAhjDtdyY= +github.com/practo/k8s-worker-pod-autoscaler v1.1.1-0.20200722110630-c31dc858b6f9/go.mod h1:jh1Tx7J3/BZwfbR1gv3G5dRRYfognxOyUnFN264eGqs= +github.com/practo/klog/v2 v2.2.1/go.mod h1:WMkpfPwTxLgSLookpI4UUv2zL7tMltKACK/FHB1zLV0= +github.com/practo/promlog v1.0.0/go.mod h1:gNtTwdRfC4UHAqBDWZ8xrSSFtACV689gWvpgGobYHXM= github.com/prometheus/alertmanager v0.18.0/go.mod h1:WcxHBl40VSPuOaqWae6l6HpnEOVRIycEJ7i9iYkadEE= github.com/prometheus/alertmanager v0.20.0/go.mod h1:9g2i48FAyZW6BtbsnvHtMHQXl2aVtrORKwKVCQ+nbrg= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= @@ -758,8 +766,8 @@ github.com/prometheus/client_golang v1.1.0/go.mod h1:I1FGZT9+L76gKKOs5djB6ezCbFQ github.com/prometheus/client_golang v1.2.0/go.mod h1:XMU6Z2MjaRKVu/dC1qupJI9SiNkDYzz3xecMgSW/F+U= github.com/prometheus/client_golang v1.2.1 h1:JnMpQc6ppsNgw9QPAGF6Dod479itz7lvlsMzzNayLOI= github.com/prometheus/client_golang v1.2.1/go.mod h1:XMU6Z2MjaRKVu/dC1qupJI9SiNkDYzz3xecMgSW/F+U= -github.com/prometheus/client_golang v1.6.0 h1:YVPodQOcK15POxhgARIvnDRVpLcuK8mglnMrWfyrw6A= -github.com/prometheus/client_golang v1.6.0/go.mod h1:ZLOG9ck3JLRdB5MgO8f+lLTe83AXG6ro35rLTxvnIl4= +github.com/prometheus/client_golang v1.7.0 h1:wCi7urQOGBsYcQROHqpUUX4ct84xp40t9R9JX0FuA/U= +github.com/prometheus/client_golang v1.7.0/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90 h1:S/YWwWx/RA8rT8tKFRuGUZhuA90OyIBpPCXkcbwU8DE= @@ -778,8 +786,8 @@ github.com/prometheus/common v0.6.0 h1:kRhiuYSXR3+uv2IbVbZhUxK5zVD/2pp3Gd2PpvPkp github.com/prometheus/common v0.6.0/go.mod h1:eBmuwkDJBwy6iBfxCBob6t6dR6ENT/y+J+Zk0j9GMYc= github.com/prometheus/common v0.7.0 h1:L+1lyG48J1zAQXA3RBX/nG/B3gjlHq0zTt2tlbJLyCY= github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA= -github.com/prometheus/common v0.9.1 h1:KOMtN28tlbam3/7ZKEYKHhKoJZYYj3gMH4uc62x7X7U= -github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4= +github.com/prometheus/common v0.10.0 h1:RyRA7RzGXQZiW+tGMr7sxa85G1z0yOpM1qq5c8lNawc= +github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= @@ -794,6 +802,8 @@ github.com/prometheus/procfs v0.0.5/go.mod h1:4A/X28fw3Fc593LaREMrKMqOKvUAntwMDa github.com/prometheus/procfs v0.0.6/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.0.11 h1:DhHlBtkHWPYi8O2y31JkK0TF+DGM+51OopZjH/Ia5qI= github.com/prometheus/procfs v0.0.11/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= +github.com/prometheus/procfs v0.1.3 h1:F0+tqvhOksq22sc6iCHF5WGlWjdwj92p0udFh1VFBS8= +github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/prometheus v1.8.2-0.20191111142012-edeb7a44cbf7 h1:OhR8SVtl7w0Z+ssULwwm5TJnJaAPkd/uV+zQdk6ADmA= github.com/prometheus/prometheus v1.8.2-0.20191111142012-edeb7a44cbf7/go.mod h1:PVTKYlgELGIDbIKIyWRzD4WKjnavPynGOFLSuDpvOwU= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= @@ -1023,6 +1033,7 @@ golang.org/x/net v0.0.0-20191004110552-13f9640d40b9 h1:rjwSpXsdiK0dV8/Naq3kAw9ym golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0= golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -1091,8 +1102,8 @@ golang.org/x/sys v0.0.0-20191113165036-4c7a9d0fe056/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f h1:gWF768j/LaZugp8dyS4UwsslYCYz9XgFxvlgsn0n9H8= -golang.org/x/sys v0.0.0-20200420163511-1957bb5e6d1f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80= +golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180805044716-cb6730876b98/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/pkg/apis/picchu/v1alpha1/common.go b/pkg/apis/picchu/v1alpha1/common.go index c3099a0c..fec05fec 100644 --- a/pkg/apis/picchu/v1alpha1/common.go +++ b/pkg/apis/picchu/v1alpha1/common.go @@ -90,14 +90,29 @@ type IstioHTTPPortConfig struct { Timeout *metav1.Duration `json:"timeout,omitempty"` } +type WorkerScaleInfo struct { + QueueURI string `json:"queueUri"` + TargetMessagesPerWorker *int32 `json:"targetMessagesPerWorker"` + SecondsToProcessOneJob *float64 `json:"secondsToProcessOneJob,omitempty"` // optional + MaxDisruption *string `json:"maxDisruption"` // optional +} + type ScaleInfo struct { - Min *int32 `json:"min,omitempty"` - Default int32 `json:"default,omitempty"` - Max int32 `json:"max,omitempty"` - TargetCPUUtilizationPercentage *int32 `json:"targetCPUUtilizationPercentage,omitempty"` - RequestsRateMetric string `json:"requestsRateMetric,omitempty"` - TargetRequestsRate *string `json:"targetRequestsRate,omitempty"` - MinReadySeconds int32 `json:"minReadySeconds,omitempty"` + Min *int32 `json:"min,omitempty"` + Default int32 `json:"default,omitempty"` + Max int32 `json:"max,omitempty"` + MinReadySeconds int32 `json:"minReadySeconds,omitempty"` + + // TargetCPUUtilizationPercentage scales based on CPU percentage + TargetCPUUtilizationPercentage *int32 `json:"targetCPUUtilizationPercentage,omitempty"` + + // TargetRequestsRate scales based on the specified RequestsRateMetric + TargetRequestsRate *string `json:"targetRequestsRate,omitempty"` + // RequestsRateMetric refers to a Prometheus Adapter metric. See: https://github.com/DirectXMan12/k8s-prometheus-adapter + RequestsRateMetric string `json:"requestsRateMetric,omitempty"` + + // Worker specifies parameters for Worker Pod Autoscaler. See https://github.com/practo/k8s-worker-pod-autoscaler + Worker *WorkerScaleInfo `json:"worker,omitempty"` } func (s *ScaleInfo) TargetReqeustsRateQuantity() (*resource.Quantity, error) { diff --git a/pkg/apis/picchu/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/picchu/v1alpha1/zz_generated.deepcopy.go index 1aa4bc37..18f1c8de 100644 --- a/pkg/apis/picchu/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/picchu/v1alpha1/zz_generated.deepcopy.go @@ -1371,6 +1371,11 @@ func (in *ScaleInfo) DeepCopyInto(out *ScaleInfo) { *out = new(string) **out = **in } + if in.Worker != nil { + in, out := &in.Worker, &out.Worker + *out = new(WorkerScaleInfo) + (*in).DeepCopyInto(*out) + } return } @@ -1525,3 +1530,34 @@ func (in *Variant) DeepCopy() *Variant { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *WorkerScaleInfo) DeepCopyInto(out *WorkerScaleInfo) { + *out = *in + if in.TargetMessagesPerWorker != nil { + in, out := &in.TargetMessagesPerWorker, &out.TargetMessagesPerWorker + *out = new(int32) + **out = **in + } + if in.SecondsToProcessOneJob != nil { + in, out := &in.SecondsToProcessOneJob, &out.SecondsToProcessOneJob + *out = new(float64) + **out = **in + } + if in.MaxDisruption != nil { + in, out := &in.MaxDisruption, &out.MaxDisruption + *out = new(string) + **out = **in + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkerScaleInfo. +func (in *WorkerScaleInfo) DeepCopy() *WorkerScaleInfo { + if in == nil { + return nil + } + out := new(WorkerScaleInfo) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/controller/releasemanager/incarnation.go b/pkg/controller/releasemanager/incarnation.go index 73668531..c38e02cb 100644 --- a/pkg/controller/releasemanager/incarnation.go +++ b/pkg/controller/releasemanager/incarnation.go @@ -465,6 +465,7 @@ func (i *Incarnation) genScalePlan(ctx context.Context) *rmplan.ScaleRevision { CPUTarget: cpuTarget, RequestsRateMetric: i.target().Scale.RequestsRateMetric, RequestsRateTarget: requestsRateTarget, + Worker: i.target().Scale.Worker, } } diff --git a/pkg/controller/releasemanager/plan/init_test.go b/pkg/controller/releasemanager/plan/init_test.go index 591cd588..01279535 100644 --- a/pkg/controller/releasemanager/plan/init_test.go +++ b/pkg/controller/releasemanager/plan/init_test.go @@ -1,8 +1,11 @@ package plan import ( + "time" + slov1alpha1 "github.com/Medium/service-level-operator/pkg/apis/monitoring/v1alpha1" monitoringv1 "github.com/coreos/prometheus-operator/pkg/apis/monitoring/v1" + wpav1 "github.com/practo/k8s-worker-pod-autoscaler/pkg/apis/workerpodautoscaler/v1" ktest "go.medium.engineering/kubernetes/pkg/test" coreAsserts "go.medium.engineering/kubernetes/pkg/test/core/v1" istioAsserts "go.medium.engineering/kubernetes/pkg/test/istio/networking/v1alpha3" @@ -17,7 +20,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" - "time" ) var ( @@ -65,6 +67,9 @@ func init() { if err := slov1alpha1.AddToScheme(s); err != nil { panic(err) } + if err := wpav1.AddToScheme(s); err != nil { + panic(err) + } } monitoringAsserts.RegisterAsserts(comparator) coreAsserts.RegisterAsserts(comparator) diff --git a/pkg/controller/releasemanager/plan/scaleRevision.go b/pkg/controller/releasemanager/plan/scaleRevision.go index c51abdaa..ed40bf22 100644 --- a/pkg/controller/releasemanager/plan/scaleRevision.go +++ b/pkg/controller/releasemanager/plan/scaleRevision.go @@ -11,6 +11,7 @@ import ( "go.medium.engineering/picchu/pkg/plan" "github.com/go-logr/logr" + wpav1 "github.com/practo/k8s-worker-pod-autoscaler/pkg/apis/workerpodautoscaler/v1" autoscaling "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -26,9 +27,32 @@ type ScaleRevision struct { CPUTarget *int32 RequestsRateMetric string RequestsRateTarget *resource.Quantity + Worker *picchuv1alpha1.WorkerScaleInfo } func (p *ScaleRevision) Apply(ctx context.Context, cli client.Client, cluster *picchuv1alpha1.Cluster, log logr.Logger) error { + if p.Min > p.Max { + p.Max = p.Min + } + + scalingFactor := cluster.Spec.ScalingFactor + if scalingFactor == nil { + e := errors.New("cluster scalingFactor can't be nil") + log.Error(e, "Cluster scalingFactor nil") + return e + } + + scaledMin := int32(math.Ceil(float64(p.Min) * *scalingFactor)) + scaledMax := int32(math.Ceil(float64(p.Max) * *scalingFactor)) + + if p.Worker != nil { + return p.applyWPA(ctx, cli, log, scaledMin, scaledMax) + } + + return p.applyHPA(ctx, cli, log, scaledMin, scaledMax) +} + +func (p *ScaleRevision) applyHPA(ctx context.Context, cli client.Client, log logr.Logger, scaledMin int32, scaledMax int32) error { var metrics = []autoscaling.MetricSpec{} if p.CPUTarget != nil { @@ -74,20 +98,6 @@ func (p *ScaleRevision) Apply(ctx context.Context, cli client.Client, cluster *p return nil } - if p.Min > p.Max { - p.Max = p.Min - } - - scalingFactor := cluster.Spec.ScalingFactor - if scalingFactor == nil { - e := errors.New("cluster scalingFactor can't be nil") - log.Error(e, "Cluster scalingFactor nil") - return e - } - - scaledMin := int32(math.Ceil(float64(p.Min) * *scalingFactor)) - scaledMax := int32(math.Ceil(float64(p.Max) * *scalingFactor)) - hpa := &autoscaling.HorizontalPodAutoscaler{ ObjectMeta: metav1.ObjectMeta{ Name: p.Tag, @@ -108,3 +118,24 @@ func (p *ScaleRevision) Apply(ctx context.Context, cli client.Client, cluster *p return plan.CreateOrUpdate(ctx, log, cli, hpa) } + +func (p *ScaleRevision) applyWPA(ctx context.Context, cli client.Client, log logr.Logger, scaledMin int32, scaledMax int32) error { + wpa := &wpav1.WorkerPodAutoScaler{ + ObjectMeta: metav1.ObjectMeta{ + Name: p.Tag, + Namespace: p.Namespace, + Labels: p.Labels, + }, + Spec: wpav1.WorkerPodAutoScalerSpec{ + ReplicaSetName: p.Tag, + MinReplicas: &scaledMin, + MaxReplicas: &scaledMax, + QueueURI: p.Worker.QueueURI, + TargetMessagesPerWorker: p.Worker.TargetMessagesPerWorker, + SecondsToProcessOneJob: p.Worker.SecondsToProcessOneJob, + MaxDisruption: p.Worker.MaxDisruption, + }, + } + + return plan.CreateOrUpdate(ctx, log, cli, wpa) +} diff --git a/pkg/controller/releasemanager/plan/scaleRevision_test.go b/pkg/controller/releasemanager/plan/scaleRevision_test.go index 22769dbe..9ffddf35 100644 --- a/pkg/controller/releasemanager/plan/scaleRevision_test.go +++ b/pkg/controller/releasemanager/plan/scaleRevision_test.go @@ -4,10 +4,12 @@ import ( "context" "testing" + picchuv1alpha1 "go.medium.engineering/picchu/pkg/apis/picchu/v1alpha1" "go.medium.engineering/picchu/pkg/mocks" "go.medium.engineering/picchu/pkg/test" "github.com/golang/mock/gomock" + wpav1 "github.com/practo/k8s-worker-pod-autoscaler/pkg/apis/workerpodautoscaler/v1" "github.com/stretchr/testify/assert" autoscaling "k8s.io/api/autoscaling/v2beta2" "k8s.io/apimachinery/pkg/api/resource" @@ -116,6 +118,55 @@ func TestScaleRevisionByRequestsRate(t *testing.T) { assert.NoError(t, plan.Apply(ctx, m, halfCluster, log), "Shouldn't return error.") } +func TestScaleRevisionWithWPA(t *testing.T) { + log := test.MustNewLogger() + ctrl := gomock.NewController(t) + m := mocks.NewMockClient(ctrl) + defer ctrl.Finish() + + plan := &ScaleRevision{ + Tag: "testtag", + Namespace: "testnamespace", + Min: 4, + Max: 10, + Worker: &picchuv1alpha1.WorkerScaleInfo{}, + Labels: map[string]string{}, + } + ok := client.ObjectKey{Name: "testtag", Namespace: "testnamespace"} + ctx := context.TODO() + + var wpaMaxReplicas int32 = 0 + wpa := &wpav1.WorkerPodAutoScaler{ + Spec: wpav1.WorkerPodAutoScalerSpec{ + MaxReplicas: &wpaMaxReplicas, + }, + } + + expected := mocks.Callback(func(x interface{}) bool { + switch o := x.(type) { + case *wpav1.WorkerPodAutoScaler: + return *o.Spec.MaxReplicas == 5 && + o.Spec.ReplicaSetName == "testtag" + default: + return false + } + }, "match Spec.MaxReplicas == 5 and Spec.ReplicaSetName == testtag") + + m. + EXPECT(). + Get(ctx, mocks.ObjectKey(ok), mocks.UpdateWPASpec(wpa)). + Return(nil). + Times(1) + + m. + EXPECT(). + Update(ctx, expected). + Return(nil). + Times(1) + + assert.NoError(t, plan.Apply(ctx, m, halfCluster, log), "Shouldn't return error.") +} + func TestDontScaleRevision(t *testing.T) { log := test.MustNewLogger() ctrl := gomock.NewController(t) diff --git a/pkg/mocks/mutateMatchers.go b/pkg/mocks/mutateMatchers.go index edb57f48..cf66c63f 100644 --- a/pkg/mocks/mutateMatchers.go +++ b/pkg/mocks/mutateMatchers.go @@ -4,6 +4,7 @@ import ( slov1alpha1 "github.com/Medium/service-level-operator/pkg/apis/monitoring/v1alpha1" monitoringv1 "github.com/coreos/prometheus-operator/pkg/apis/monitoring/v1" "github.com/golang/mock/gomock" + wpav1 "github.com/practo/k8s-worker-pod-autoscaler/pkg/apis/workerpodautoscaler/v1" appsv1 "k8s.io/api/apps/v1" autoscaling "k8s.io/api/autoscaling/v2beta2" corev1 "k8s.io/api/core/v1" @@ -154,3 +155,17 @@ func UpdateHPASpec(hpa *autoscaling.HorizontalPodAutoscaler) gomock.Matcher { } return Callback(fn, "update hpa spec") } + +// UpdateHPASpec sets the spec on a *HorizontalPodAutoscaler +func UpdateWPASpec(wpa *wpav1.WorkerPodAutoScaler) gomock.Matcher { + fn := func(x interface{}) bool { + switch o := x.(type) { + case *wpav1.WorkerPodAutoScaler: + o.Spec = wpa.Spec + return true + default: + return false + } + } + return Callback(fn, "update wpa spec") +} diff --git a/pkg/plan/common.go b/pkg/plan/common.go index f55c19c8..e15f8903 100644 --- a/pkg/plan/common.go +++ b/pkg/plan/common.go @@ -9,6 +9,7 @@ import ( slov1alpha1 "github.com/Medium/service-level-operator/pkg/apis/monitoring/v1alpha1" monitoringv1 "github.com/coreos/prometheus-operator/pkg/apis/monitoring/v1" "github.com/go-logr/logr" + wpav1 "github.com/practo/k8s-worker-pod-autoscaler/pkg/apis/workerpodautoscaler/v1" istiov1alpha3 "istio.io/client-go/pkg/apis/networking/v1alpha3" appsv1 "k8s.io/api/apps/v1" autoscaling "k8s.io/api/autoscaling/v2beta2" @@ -281,6 +282,23 @@ func CreateOrUpdate( if err != nil { return err } + case *wpav1.WorkerPodAutoScaler: + typed := orig.DeepCopy() + wpa := &wpav1.WorkerPodAutoScaler{ + ObjectMeta: metav1.ObjectMeta{ + Name: typed.Name, + Namespace: typed.Namespace, + Labels: typed.Labels, + }, + } + op, err := controllerutil.CreateOrUpdate(ctx, cli, wpa, func() error { + wpa.Spec = typed.Spec + return nil + }) + LogSync(log, op, err, wpa) + if err != nil { + return err + } default: return fmt.Errorf("Unsupported type") }