diff --git a/.mockery.yaml b/.mockery.yaml index 7a9f7d3..c7d9070 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -6,4 +6,7 @@ packages: Exporter: Scraper: MetricMapper: - HttpClient: \ No newline at end of file + HttpClient: + "github.com/castai/gpu-metrics-exporter/internal/castai": + interfaces: + Client: \ No newline at end of file diff --git a/cmd/main.go b/cmd/main.go index 4da7c2e..85ab0c2 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -10,6 +10,7 @@ import ( "syscall" "time" + "github.com/go-resty/resty/v2" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" @@ -17,11 +18,18 @@ import ( "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/flowcontrol" + "github.com/castai/gpu-metrics-exporter/internal/castai" "github.com/castai/gpu-metrics-exporter/internal/config" "github.com/castai/gpu-metrics-exporter/internal/exporter" "github.com/castai/gpu-metrics-exporter/internal/server" ) +var ( + GitCommit = "undefined" + GitRef = "no-ref" + Version = "local" +) + func main() { log := logrus.New() @@ -77,6 +85,7 @@ func run(cfg *config.Config, log logrus.FieldLogger) error { log.Fatal(err) } + client := setupCastAIClient(log, cfg) scraper := exporter.NewScraper(&http.Client{}, log) mapper := exporter.NewMapper() ex := exporter.NewExporter(exporter.Config{ @@ -85,7 +94,7 @@ func run(cfg *config.Config, log logrus.FieldLogger) error { DCGMExporterPort: cfg.DCGMPort, DCGMExporterPath: cfg.DCGMMetricsEndpoint, Enabled: true, - }, clientset, log, scraper, mapper) + }, clientset, log, scraper, mapper, client) go func() { if err := ex.Start(ctx); err != nil && !errors.Is(err, context.Canceled) { @@ -126,3 +135,20 @@ func selectorFromMap(labelMap map[string]string) (labels.Selector, error) { return selector.Add(requirements...), nil } + +func setupCastAIClient(log logrus.FieldLogger, cfg *config.Config) castai.Client { + clientConfig := castai.Config{ + ClusterID: cfg.ClusterID, + APIKey: cfg.APIKey, + URL: cfg.CastAPI, + } + restyClient := resty.NewWithClient(&http.Client{ + Timeout: 2 * time.Minute, + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + ForceAttemptHTTP2: true, + }, + }) + + return castai.NewClient(clientConfig, log, restyClient, Version) +} diff --git a/go.mod b/go.mod index c6ae479..a525d85 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,8 @@ module github.com/castai/gpu-metrics-exporter go 1.22.0 require ( + github.com/go-resty/resty/v2 v2.11.0 + github.com/jarcoal/httpmock v1.3.1 github.com/kelseyhightower/envconfig v1.4.0 github.com/prometheus/client_model v0.6.0 github.com/prometheus/common v0.49.0 diff --git a/go.sum b/go.sum index 0e668d9..c2c8d78 100644 --- a/go.sum +++ b/go.sum @@ -14,6 +14,8 @@ github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2Kv github.com/go-openapi/jsonreference v0.20.2/go.mod h1:Bl1zwGIM8/wsvqjsOQLJ/SH+En5Ap4rVB5KVcIDZG2k= github.com/go-openapi/swag v0.22.3 h1:yMBqmnQ0gyZvEb/+KzuWZOXgllrXT4SADYbvDaXHv/g= github.com/go-openapi/swag v0.22.3/go.mod h1:UzaqsxGiab7freDnrUUra0MwWfN/q7tE4j+VcZ0yl14= +github.com/go-resty/resty/v2 v2.11.0 h1:i7jMfNOJYMp69lq7qozJP+bjgzfAzeOhuGlyDrqxT/8= +github.com/go-resty/resty/v2 v2.11.0/go.mod h1:iiP/OpA0CkcL3IGt1O0+/SIItFUbkkyw5BGXiVdTu+A= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -37,6 +39,8 @@ github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/jarcoal/httpmock v1.3.1 h1:iUx3whfZWVf3jT01hQTO/Eo5sAYtB2/rqaUuOtpInww= +github.com/jarcoal/httpmock v1.3.1/go.mod h1:3yb8rc4BI7TCBhFY8ng0gjuLKJNquuDNiPaZjnENuYg= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= @@ -54,6 +58,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= +github.com/maxatome/go-testdeep v1.12.0 h1:Ql7Go8Tg0C1D/uMMX59LAoYK7LffeJQ6X2T04nTH68g= +github.com/maxatome/go-testdeep v1.12.0/go.mod h1:lPZc/HAcJMP92l7yI6TRz1aZN5URwUBUAfUNvrclaNM= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -92,16 +98,26 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190603091049-60506f45cf65/go.mod h1:HSz+uSET+XFnRR8LxR5pz3Of3rY3CfYBVs4xY44aLks= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/oauth2 v0.17.0 h1:6m3ZPmLEFdVxKKWnKq4VqZ60gutO35zm+zrAHVmHyDQ= @@ -109,19 +125,37 @@ golang.org/x/oauth2 v0.17.0/go.mod h1:OzPDGQiuQMguemayvdylqddI7qcD9lnSDb+1FiwQ5H golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.6.0 h1:5BMeUDZ7vkXGfEr1x9B4bRcTH4lpkTkpdh0T/J+qjbQ= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/term v0.17.0 h1:mkTF7LCd6WGJNL3K1Ad7kwxNfYAW6a8a8QqtMblp/4U= golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= @@ -130,6 +164,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= golang.org/x/tools v0.16.1 h1:TLyB3WofjdOEepBHAU20JdNC1Zbg87elYofWYAY5oZA= golang.org/x/tools v0.16.1/go.mod h1:kYVVN6I1mBNoB1OX+noeBjbRk4IUEPa7JJ+TJMEooJ0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/castai/client.go b/internal/castai/client.go new file mode 100644 index 0000000..681924d --- /dev/null +++ b/internal/castai/client.go @@ -0,0 +1,121 @@ +package castai + +import ( + "bytes" + "compress/gzip" + "context" + "fmt" + "net/http" + "time" + + "github.com/go-resty/resty/v2" + "github.com/sirupsen/logrus" + "google.golang.org/protobuf/proto" + "k8s.io/apimachinery/pkg/util/wait" + + "github.com/castai/gpu-metrics-exporter/pb" +) + +const ( + tokenHeader = "X-API-Key" // #nosec G101 + retryCount = 5 +) + +var ( + backoff = wait.Backoff{ + Steps: retryCount, + Duration: 50 * time.Millisecond, + Factor: 8, + Jitter: 0.15, + } + + contentTypeHeader = http.CanonicalHeaderKey("Content-Type") + contentType = "application/protobuf" + + contentEncodingHeader = http.CanonicalHeaderKey("Content-Encoding") + contentEncoding = "gzip" + + userAgentHeader = http.CanonicalHeaderKey("User-Agent") + userAgent = "castai-gpu-metrics-exporter/" +) + +type Config struct { + URL string + APIKey string + ClusterID string +} + +type Client interface { + UploadBatch(ctx context.Context, batch *pb.MetricsBatch) error +} + +type client struct { + restyClient *resty.Client + cfg Config + log logrus.FieldLogger +} + +func NewClient(cfg Config, log logrus.FieldLogger, restyClient *resty.Client, version string) Client { + restyClient.BaseURL = cfg.URL + restyClient.SetHeaders(map[string]string{ + tokenHeader: cfg.APIKey, + contentTypeHeader: contentType, + contentEncodingHeader: contentEncoding, + userAgentHeader: fmt.Sprintf("%s%s", userAgent, version), + }) + + return &client{ + restyClient: restyClient, + cfg: cfg, + log: log, + } +} + +func (c client) UploadBatch(ctx context.Context, batch *pb.MetricsBatch) error { + buffer, err := c.toBuffer(batch) + if err != nil { + return err + } + + return wait.ExponentialBackoffWithContext(ctx, backoff, func(ctx context.Context) (done bool, err error) { + resp, err := c.restyClient.R(). + SetContext(ctx). + SetBody(buffer). + Post(fmt.Sprintf("/v1/kubernetes/clusters/%s/gpu-metrics", c.cfg.ClusterID)) + + if err != nil { + c.log.Errorf("error making http request %v", err) + return false, nil + } + + statusCode := resp.StatusCode() + switch { + case statusCode >= 200 && statusCode < 300: + return true, nil + case statusCode >= 400 && statusCode < 500: + return true, fmt.Errorf("client error: %d %s", statusCode, resp.Status()) + default: + c.log.Errorf("server error or unexpected response code: %d %s", statusCode, resp.Status()) + return false, nil + } + }) +} + +func (c client) toBuffer(batch *pb.MetricsBatch) (*bytes.Buffer, error) { + payload := new(bytes.Buffer) + + protoBytes, err := proto.Marshal(batch) + if err != nil { + return nil, fmt.Errorf("error marshaling batch %w", err) + } + + writer := gzip.NewWriter(payload) + if _, err := writer.Write(protoBytes); err != nil { + return nil, fmt.Errorf("error compressing payload %w", err) + } + if err := writer.Close(); err != nil { + return nil, fmt.Errorf("error closing gzip writer %w", err) + } + + return payload, nil +} diff --git a/internal/castai/client_test.go b/internal/castai/client_test.go new file mode 100644 index 0000000..cd74c1b --- /dev/null +++ b/internal/castai/client_test.go @@ -0,0 +1,52 @@ +package castai_test + +import ( + "context" + "net/http" + "testing" + + "github.com/go-resty/resty/v2" + "github.com/jarcoal/httpmock" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + + "github.com/castai/gpu-metrics-exporter/internal/castai" + "github.com/castai/gpu-metrics-exporter/pb" +) + +func Test_UploadBatch(t *testing.T) { + log := logrus.New() + restyClient := resty.New() + client := castai.NewClient(castai.Config{ + URL: "http://localhost", + APIKey: "my-fake-token", + ClusterID: "cluster-id-1", + }, log, restyClient, "test") + + httpmock.ActivateNonDefault(restyClient.GetClient()) + t.Run("calls gpu-metrics endpoint with proper headers", func(t *testing.T) { + r := require.New(t) + + httpmock.RegisterResponder( + "POST", + "http://localhost/v1/kubernetes/clusters/cluster-id-1/gpu-metrics", + func(req *http.Request) (*http.Response, error) { + contentType := req.Header.Get("Content-type") + r.Equal("application/protobuf", contentType) + + contentEncoding := req.Header.Get("Content-encoding") + r.Equal("gzip", contentEncoding) + + apiKey := req.Header.Get("X-API-Key") + r.Equal("my-fake-token", apiKey) + + userAgent := req.Header.Get("User-Agent") + r.Equal("castai-gpu-metrics-exporter/test", userAgent) + + return &http.Response{StatusCode: 200}, nil + }, + ) + + _ = client.UploadBatch(context.Background(), &pb.MetricsBatch{}) + }) +} diff --git a/internal/config/config.go b/internal/config/config.go index 2f07ff7..1218064 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -16,6 +16,8 @@ type Config struct { ExportInterval time.Duration `envconfig:"EXPORT_INTERVAL" default:"15s"` CastAPI string `envconfig:"CAST_API" default:"https://api.cast.ai"` APIToken string `envconfig:"API_TOKEN"` + ClusterID string `envconfig:"CLUSTER_ID"` + APIKey string `envconfig:"API_KEY"` } func GetFromEnvironment() (*Config, error) { diff --git a/internal/exporter/exporter.go b/internal/exporter/exporter.go index dc79df4..1dab7c4 100644 --- a/internal/exporter/exporter.go +++ b/internal/exporter/exporter.go @@ -9,10 +9,8 @@ import ( "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" -) -const ( - cleanupInterval = 3 * time.Minute + "github.com/castai/gpu-metrics-exporter/internal/castai" ) type Exporter interface { @@ -37,9 +35,17 @@ type exporter struct { scraper Scraper mapper MetricMapper enabled *atomic.Bool + client castai.Client } -func NewExporter(cfg Config, kube kubernetes.Interface, log logrus.FieldLogger, scraper Scraper, mapper MetricMapper) Exporter { +func NewExporter( + cfg Config, + kube kubernetes.Interface, + log logrus.FieldLogger, + scraper Scraper, + mapper MetricMapper, + castaiClient castai.Client, +) Exporter { enabled := atomic.Bool{} enabled.Store(cfg.Enabled) @@ -50,6 +56,7 @@ func NewExporter(cfg Config, kube kubernetes.Interface, log logrus.FieldLogger, scraper: scraper, mapper: mapper, enabled: &enabled, + client: castaiClient, } } @@ -57,9 +64,6 @@ func (e *exporter) Start(ctx context.Context) error { exportTicker := time.NewTicker(e.cfg.ExportInterval) defer exportTicker.Stop() - cleanupTicker := time.NewTicker(cleanupInterval) - defer cleanupTicker.Stop() - for { select { case <-ctx.Done(): @@ -68,11 +72,9 @@ func (e *exporter) Start(ctx context.Context) error { if !e.enabled.Load() { continue } - if err := e.collect(ctx); err != nil { - e.log.Errorf("collect error: %v", err) + if err := e.export(ctx); err != nil { + e.log.Errorf("export error: %v", err) } - case <-cleanupTicker.C: - // TODO: call cleanup procedure } } } @@ -89,7 +91,7 @@ func (e *exporter) Enabled() bool { return e.enabled.Load() } -func (e *exporter) collect(ctx context.Context) error { +func (e *exporter) export(ctx context.Context) error { // TODO: consider using an informer and keeping a list of pods which match the selector // at the moment seems like an overkill dcgmExporterList, err := e.kube.CoreV1().Pods("").List(ctx, metav1.ListOptions{ @@ -110,8 +112,13 @@ func (e *exporter) collect(ctx context.Context) error { if err != nil { return fmt.Errorf("couldn't scrape DCGM exporters %w", err) } - metrics := e.mapper.Map(metricFamilies, time.Now()) - _ = metrics + + batch := e.mapper.Map(metricFamilies, time.Now()) + if err := e.client.UploadBatch(ctx, batch); err != nil { + return fmt.Errorf("error whlie sending metrics %d to castai %w", len(batch.Metrics), err) + } + + e.log.Infof("successfully exported %d metrics", len(batch.Metrics)) return nil } diff --git a/internal/exporter/exporter_test.go b/internal/exporter/exporter_test.go index d4ac263..4e548e4 100644 --- a/internal/exporter/exporter_test.go +++ b/internal/exporter/exporter_test.go @@ -15,7 +15,9 @@ import ( "k8s.io/client-go/kubernetes/fake" "github.com/castai/gpu-metrics-exporter/internal/exporter" + castai_mock "github.com/castai/gpu-metrics-exporter/mock/castai" mocks "github.com/castai/gpu-metrics-exporter/mock/exporter" + "github.com/castai/gpu-metrics-exporter/pb" ) func TestExporter_Running(t *testing.T) { @@ -44,8 +46,9 @@ func TestExporter_Running(t *testing.T) { scraper := mocks.NewMockScraper(t) mapper := mocks.NewMockMetricMapper(t) + client := castai_mock.NewMockClient(t) - ex := exporter.NewExporter(config, kubeClient, log, scraper, mapper) + ex := exporter.NewExporter(config, kubeClient, log, scraper, mapper, client) ex.Enable() metricFamilies := []exporter.MetricFamilyMap{ @@ -75,8 +78,11 @@ func TestExporter_Running(t *testing.T) { }, } + batch := &pb.MetricsBatch{} + scraper.EXPECT().Scrape(ctx, []string{"http://192.168.1.1:9400/metrics"}).Times(1).Return(metricFamilies, nil) - mapper.EXPECT().Map(metricFamilies, mock.Anything).Times(1).Return(nil, nil) + mapper.EXPECT().Map(metricFamilies, mock.Anything).Times(1).Return(batch, nil) + client.EXPECT().UploadBatch(mock.Anything, batch).Times(1).Return(nil, nil) go func() { err := ex.Start(ctx) diff --git a/mock/castai/mock_Client.go b/mock/castai/mock_Client.go new file mode 100644 index 0000000..3762849 --- /dev/null +++ b/mock/castai/mock_Client.go @@ -0,0 +1,84 @@ +// Code generated by mockery v2.42.0. DO NOT EDIT. + +package castai + +import ( + context "context" + + pb "github.com/castai/gpu-metrics-exporter/pb" + mock "github.com/stretchr/testify/mock" +) + +// MockClient is an autogenerated mock type for the Client type +type MockClient struct { + mock.Mock +} + +type MockClient_Expecter struct { + mock *mock.Mock +} + +func (_m *MockClient) EXPECT() *MockClient_Expecter { + return &MockClient_Expecter{mock: &_m.Mock} +} + +// UploadBatch provides a mock function with given fields: ctx, batch +func (_m *MockClient) UploadBatch(ctx context.Context, batch *pb.MetricsBatch) error { + ret := _m.Called(ctx, batch) + + if len(ret) == 0 { + panic("no return value specified for UploadBatch") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, *pb.MetricsBatch) error); ok { + r0 = rf(ctx, batch) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockClient_UploadBatch_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UploadBatch' +type MockClient_UploadBatch_Call struct { + *mock.Call +} + +// UploadBatch is a helper method to define mock.On call +// - ctx context.Context +// - batch *pb.MetricsBatch +func (_e *MockClient_Expecter) UploadBatch(ctx interface{}, batch interface{}) *MockClient_UploadBatch_Call { + return &MockClient_UploadBatch_Call{Call: _e.mock.On("UploadBatch", ctx, batch)} +} + +func (_c *MockClient_UploadBatch_Call) Run(run func(ctx context.Context, batch *pb.MetricsBatch)) *MockClient_UploadBatch_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(*pb.MetricsBatch)) + }) + return _c +} + +func (_c *MockClient_UploadBatch_Call) Return(_a0 error) *MockClient_UploadBatch_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockClient_UploadBatch_Call) RunAndReturn(run func(context.Context, *pb.MetricsBatch) error) *MockClient_UploadBatch_Call { + _c.Call.Return(run) + return _c +} + +// NewMockClient creates a new instance of MockClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockClient(t interface { + mock.TestingT + Cleanup(func()) +}) *MockClient { + mock := &MockClient{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +}