Skip to content
This repository has been archived by the owner on Jan 28, 2022. It is now read-only.

Commit

Permalink
Simplify prometheus metrics with labels (#139)
Browse files Browse the repository at this point in the history
* updated cluster to use new metrics tracking

* Rework tracking function

* Update run metrics

* Add metrics to secret scopes api calls

* dbfsblocks updated

* add metrics to workspaceitems api calls

* added unit tests for metrics

* updated job to use new metric work

* update docs

* Change to single metric with success/failure
  • Loading branch information
stuartleeks authored and Azadehkhojandi committed Dec 19, 2019
1 parent a120b23 commit bd9fa2a
Show file tree
Hide file tree
Showing 13 changed files with 123 additions and 251 deletions.
2 changes: 1 addition & 1 deletion api/v1alpha1/zz_generated.deepcopy.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

// Code generated by controller-gen. DO NOT EDIT.
// autogenerated by controller-gen object, do not modify manually

package v1alpha1

Expand Down
19 changes: 18 additions & 1 deletion controllers/dbfsblock_controller_databricks.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,34 +34,48 @@ func (r *DbfsBlockReconciler) submit(instance *databricksv1alpha1.DbfsBlock) err
}

// Open handler
execution := NewExecution("dbfsblocks", "create")
createResponse, err := r.APIClient.Dbfs().Create(instance.Spec.Path, true)
execution.Finish(err)

if err != nil {
return err
}

// DataBricks limits the AddBlock size to be 1024KB
var g = 1000
for i := 0; i < len(data); i += g {
execution = NewExecution("dbfsblocks", "add_block")

if i+g <= len(data) {
err = r.APIClient.Dbfs().AddBlock(createResponse.Handle, data[i:i+g])
} else {
err = r.APIClient.Dbfs().AddBlock(createResponse.Handle, data[i:])
}

execution.Finish(err)

if err != nil {
return err
}
}

// Close handler
execution = NewExecution("dbfsblocks", "close")
err = r.APIClient.Dbfs().Close(createResponse.Handle)
execution.Finish(err)

if err != nil {
return err
}

time.Sleep(1 * time.Second)

// Refresh info
execution = NewExecution("dbfsblocks", "get_status")
fileInfo, err := r.APIClient.Dbfs().GetStatus(instance.Spec.Path)
execution.Finish(err)

if err != nil {
return err
}
Expand All @@ -83,5 +97,8 @@ func (r *DbfsBlockReconciler) delete(instance *databricksv1alpha1.DbfsBlock) err

path := instance.Status.FileInfo.Path

return r.APIClient.Dbfs().Delete(path, true)
execution := NewExecution("dbfsblocks", "delete")
err := r.APIClient.Dbfs().Delete(path, true)
execution.Finish(err)
return err
}
26 changes: 8 additions & 18 deletions controllers/dcluster_controller_databricks.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"reflect"

databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1"
"github.com/prometheus/client_golang/prometheus"
dbmodels "github.com/xinsnake/databricks-sdk-golang/azure/models"
)

Expand Down Expand Up @@ -80,31 +79,22 @@ func (r *DclusterReconciler) delete(instance *databricksv1alpha1.Dcluster) error
return nil
}

return trackExecutionTime(dclusterDeleteDuration, func() error {
err := r.APIClient.Clusters().PermanentDelete(instance.Status.ClusterInfo.ClusterID)
trackSuccessFailure(err, dclusterCounterVec, "delete")
return err
})
execution := NewExecution("dclusters", "delete")
err := r.APIClient.Clusters().PermanentDelete(instance.Status.ClusterInfo.ClusterID)
execution.Finish(err)
return err
}

func (r *DclusterReconciler) getCluster(clusterID string) (cluster dbmodels.ClusterInfo, err error) {
timer := prometheus.NewTimer(dclusterGetDuration)
defer timer.ObserveDuration()

execution := NewExecution("dclusters", "get")
cluster, err = r.APIClient.Clusters().Get(clusterID)

trackSuccessFailure(err, dclusterCounterVec, "get")

execution.Finish(err)
return cluster, err
}

func (r *DclusterReconciler) createCluster(instance *databricksv1alpha1.Dcluster) (cluster dbmodels.ClusterInfo, err error) {
timer := prometheus.NewTimer(dclusterCreateDuration)
defer timer.ObserveDuration()

execution := NewExecution("dclusters", "create")
cluster, err = r.APIClient.Clusters().Create(*instance.Spec)

trackSuccessFailure(err, dclusterCounterVec, "create")

execution.Finish(err)
return cluster, err
}
53 changes: 0 additions & 53 deletions controllers/dcluster_metrics.go

This file was deleted.

32 changes: 11 additions & 21 deletions controllers/djob_controller_databricks.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"strings"

databricksv1alpha1 "github.com/microsoft/azure-databricks-operator/api/v1alpha1"
"github.com/prometheus/client_golang/prometheus"
dbmodels "github.com/xinsnake/databricks-sdk-golang/azure/models"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -71,7 +70,8 @@ func (r *DjobReconciler) submit(instance *databricksv1alpha1.Djob) error {
}
instance.ObjectMeta.SetOwnerReferences(references)
}
job, err := r.createJob(instance)
jobSettings := databricksv1alpha1.ToDatabricksJobSettings(instance.Spec)
job, err := r.createJob(jobSettings)

if err != nil {
return err
Expand Down Expand Up @@ -138,32 +138,22 @@ func (r *DjobReconciler) delete(instance *databricksv1alpha1.Djob) error {
return err
}

return trackExecutionTime(djobDeleteDuration, func() error {
err := r.APIClient.Jobs().Delete(jobID)
trackSuccessFailure(err, djobCounterVec, "delete")
return err
})
execution := NewExecution("djobs", "delete")
err := r.APIClient.Jobs().Delete(jobID)
execution.Finish(err)
return err
}

func (r *DjobReconciler) getJob(jobID int64) (job dbmodels.Job, err error) {
timer := prometheus.NewTimer(djobGetDuration)
defer timer.ObserveDuration()

execution := NewExecution("djobs", "get")
job, err = r.APIClient.Jobs().Get(jobID)

trackSuccessFailure(err, djobCounterVec, "get")

execution.Finish(err)
return job, err
}

func (r *DjobReconciler) createJob(instance *databricksv1alpha1.Djob) (job dbmodels.Job, err error) {
timer := prometheus.NewTimer(djobCreateDuration)
defer timer.ObserveDuration()

jobSettings := databricksv1alpha1.ToDatabricksJobSettings(instance.Spec)
func (r *DjobReconciler) createJob(jobSettings dbmodels.JobSettings) (job dbmodels.Job, err error) {
execution := NewExecution("djobs", "create")
job, err = r.APIClient.Jobs().Create(jobSettings)

trackSuccessFailure(err, djobCounterVec, "create")

execution.Finish(err)
return job, err
}
53 changes: 0 additions & 53 deletions controllers/djob_metrics.go

This file was deleted.

39 changes: 31 additions & 8 deletions controllers/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,48 @@ limitations under the License.
package controllers

import (
"time"

"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
)

const (
metricPrefix = "databricks_"
successMetric = "success"
failureMetric = "failure"
)

func trackExecutionTime(histogram prometheus.Histogram, f func() error) error {
timer := prometheus.NewTimer(histogram)
defer timer.ObserveDuration()
return f()
var databricksRequestHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Name: "databricks_request_duration_seconds",
Help: "Duration of upstream calls to Databricks REST service endpoints",
}, []string{"object_type", "action", "outcome"})

func init() {
// Register custom metrics with the global prometheus registry
metrics.Registry.MustRegister(databricksRequestHistogram)
}

// NewExecution creates an Execution instance and starts the timer
func NewExecution(objectType string, action string) Execution {
return Execution{
begin: time.Now(),
labels: prometheus.Labels{"object_type": objectType, "action": action},
}
}

// Execution tracks state for an API execution for emitting metrics
type Execution struct {
begin time.Time
labels prometheus.Labels
}

func trackSuccessFailure(err error, counterVec *prometheus.CounterVec, method string) {
// Finish is used to log duration and success/failure
func (e *Execution) Finish(err error) {
if err == nil {
counterVec.With(prometheus.Labels{"status": successMetric, "method": method}).Inc()
e.labels["outcome"] = successMetric
} else {
counterVec.With(prometheus.Labels{"status": failureMetric, "method": method}).Inc()
e.labels["outcome"] = failureMetric
}
duration := time.Since(e.begin)
databricksRequestHistogram.With(e.labels).Observe(duration.Seconds())
}
Loading

0 comments on commit bd9fa2a

Please sign in to comment.