Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added metric report on saga timeout + bug fix in mysql timeout manager #114

Merged
merged 1 commit into from
Aug 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions gbus/metrics/saga_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package metrics

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
io_prometheus_client "github.com/prometheus/client_model/go"
)

var SagaTimeoutCounter = newSagaTimeoutCounter()

func GetSagaTimeoutCounterValue() (float64, error) {
m := &io_prometheus_client.Metric{}
err := SagaTimeoutCounter.Write(m)

if err != nil {
return 0, err
}

return m.GetCounter().GetValue(), nil
}

func newSagaTimeoutCounter() prometheus.Counter {
return promauto.NewCounter(prometheus.CounterOpts{
Namespace: grabbitPrefix,
Subsystem: "saga",
Name: "timeedout_sagas",
Help: "counting the number of timedout saga instances",
})
}
4 changes: 4 additions & 0 deletions gbus/saga/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/sirupsen/logrus"
"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/metrics"
)

func fqnsFromMessages(objs []gbus.Message) []string {
Expand Down Expand Up @@ -247,6 +248,7 @@ func (imsm *Glue) registerEvent(exchange, topic string, event gbus.Message) erro
func (imsm *Glue) TimeoutSaga(tx *sql.Tx, sagaID string) error {

saga, err := imsm.sagaStore.GetSagaByID(tx, sagaID)

//we are assuming that if the TimeoutSaga has been called but no instance returned from the store the saga
//has been completed already and
if err == ErrInstanceNotFound {
Expand All @@ -260,6 +262,8 @@ func (imsm *Glue) TimeoutSaga(tx *sql.Tx, sagaID string) error {
imsm.Log().WithError(timeoutErr).WithField("sagaID", sagaID).Error("failed to timeout saga")
return timeoutErr
}

metrics.SagaTimeoutCounter.Inc()
return imsm.completeOrUpdateSaga(tx, saga)
}

Expand Down
2 changes: 1 addition & 1 deletion gbus/tx/mysql/timeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (tm *TimeoutManager) RegisterTimeout(tx *sql.Tx, sagaID string, duration ti
//ClearTimeout clears a timeout for a specific saga
func (tm *TimeoutManager) ClearTimeout(tx *sql.Tx, sagaID string) error {

deleteSQL := `delete from ` + tm.timeoutsTableName + ` where saga_id_id = ?`
deleteSQL := `delete from ` + tm.timeoutsTableName + ` where saga_id = ?`
_, err := tx.Exec(deleteSQL, sagaID)
return err
}
Expand Down
File renamed without changes.
5 changes: 5 additions & 0 deletions tests/saga_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/wework/grabbit/gbus"
"github.com/wework/grabbit/gbus/metrics"
)

/*
Expand Down Expand Up @@ -225,6 +226,10 @@ func TestSagaTimeout(t *testing.T) {
}

<-proceed
timeoutCounter, e := metrics.GetSagaTimeoutCounterValue()
if timeoutCounter != 1 || e != nil {
t.Errorf("saga timeout counter expected to be 1 actual %v", timeoutCounter)
}
}

func TestSagaSelfMessaging(t *testing.T) {
Expand Down