Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sdk): change HTTPSender factory methods and receivers to use… #1259

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 17 additions & 19 deletions pkg/transforms/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,11 @@ type HTTPSender struct {
secretName string
secretPath string
urlFormatter StringValuesFormatter
httpSizeMetrics gometrics.Histogram
}

// NewHTTPSender creates, initializes and returns a new instance of HTTPSender
func NewHTTPSender(url string, mimeType string, persistOnError bool) HTTPSender {
func NewHTTPSender(url string, mimeType string, persistOnError bool) *HTTPSender {
return NewHTTPSenderWithOptions(HTTPSenderOptions{
URL: url,
MimeType: mimeType,
Expand All @@ -55,7 +56,7 @@ func NewHTTPSender(url string, mimeType string, persistOnError bool) HTTPSender
}

// NewHTTPSenderWithSecretHeader creates, initializes and returns a new instance of HTTPSender configured to use a secret header
func NewHTTPSenderWithSecretHeader(url string, mimeType string, persistOnError bool, headerName string, secretPath string, secretName string) HTTPSender {
func NewHTTPSenderWithSecretHeader(url string, mimeType string, persistOnError bool, headerName string, secretPath string, secretName string) *HTTPSender {
return NewHTTPSenderWithOptions(HTTPSenderOptions{
URL: url,
MimeType: mimeType,
Expand All @@ -67,8 +68,8 @@ func NewHTTPSenderWithSecretHeader(url string, mimeType string, persistOnError b
}

// NewHTTPSenderWithOptions creates, initializes and returns a new instance of HTTPSender configured with provided options
func NewHTTPSenderWithOptions(options HTTPSenderOptions) HTTPSender {
return HTTPSender{
func NewHTTPSenderWithOptions(options HTTPSenderOptions) *HTTPSender {
return &HTTPSender{
url: options.URL,
mimeType: options.MimeType,
persistOnError: options.PersistOnError,
Expand Down Expand Up @@ -108,18 +109,18 @@ type HTTPSenderOptions struct {
// HTTPPost will send data from the previous function to the specified Endpoint via http POST.
// If no previous function exists, then the event that triggered the pipeline will be used.
// An empty string for the mimetype will default to application/json.
func (sender HTTPSender) HTTPPost(ctx interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
func (sender *HTTPSender) HTTPPost(ctx interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
return sender.httpSend(ctx, data, http.MethodPost)
}

// HTTPPut will send data from the previous function to the specified Endpoint via http PUT.
// If no previous function exists, then the event that triggered the pipeline will be used.
// An empty string for the mimetype will default to application/json.
func (sender HTTPSender) HTTPPut(ctx interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
func (sender *HTTPSender) HTTPPut(ctx interfaces.AppFunctionContext, data interface{}) (bool, interface{}) {
return sender.httpSend(ctx, data, http.MethodPut)
}

func (sender HTTPSender) httpSend(ctx interfaces.AppFunctionContext, data interface{}, method string) (bool, interface{}) {
func (sender *HTTPSender) httpSend(ctx interfaces.AppFunctionContext, data interface{}, method string) (bool, interface{}) {
lc := ctx.LoggingClient()

lc.Debugf("HTTP Exporting in pipeline '%s'", ctx.PipelineId())
Expand Down Expand Up @@ -214,17 +215,15 @@ func (sender HTTPSender) httpSend(ctx interfaces.AppFunctionContext, data interf

// capture the size into metrics
exportDataBytes := len(exportData)
// TODO: EdgeX 3.0 refactor size metrics once receivers are pointers (like mqtt size metrics)
metrics := gometrics.DefaultRegistry.Get(internal.HttpExportSizeName)
var httpExportSizeMetric gometrics.Histogram
if metrics == nil {
if sender.httpSizeMetrics == nil {
var err error
lc.Debugf("Initializing metric %s.", internal.HttpExportSizeName)
httpExportSizeMetric = gometrics.NewHistogram(gometrics.NewUniformSample(internal.MetricsReservoirSize))
sender.httpSizeMetrics = gometrics.NewHistogram(gometrics.NewUniformSample(internal.MetricsReservoirSize))
metricsManger := ctx.MetricsManager()
if metricsManger != nil {
// TODO: EdgeX 3.0 append url to export size name
err = metricsManger.Register(internal.HttpExportSizeName, httpExportSizeMetric, nil)
metricName := fmt.Sprintf("%s-%s", internal.MqttExportSizeName, sender.url)

err = metricsManger.Register(metricName, sender.httpSizeMetrics, map[string]string{"url": sender.url})
} else {
err = errors.New("metrics manager not available")
}
Expand All @@ -233,10 +232,9 @@ func (sender HTTPSender) httpSend(ctx interfaces.AppFunctionContext, data interf
lc.Errorf("Unable to register metric %s. Collection will continue, but metric will not be reported: %s", internal.HttpExportSizeName, err.Error())
}

} else {
httpExportSizeMetric = metrics.(gometrics.Histogram)
}
httpExportSizeMetric.Update(int64(exportDataBytes))

sender.httpSizeMetrics.Update(int64(exportDataBytes))

ctx.LoggingClient().Debugf("Sent %d bytes of data in pipeline '%s'. Response status is %s", exportDataBytes, ctx.PipelineId(), response.Status)
ctx.LoggingClient().Tracef("Data exported for pipeline '%s' (%s=%s)", ctx.PipelineId(), common.CorrelationHeader, ctx.CorrelationID())
Expand All @@ -258,7 +256,7 @@ func (sender HTTPSender) httpSend(ctx interfaces.AppFunctionContext, data interf
return true, responseData
}

func (sender HTTPSender) determineIfUsingSecrets(ctx interfaces.AppFunctionContext) (bool, error) {
func (sender *HTTPSender) determineIfUsingSecrets(ctx interfaces.AppFunctionContext) (bool, error) {
// not using secrets if both are empty
if len(sender.secretPath) == 0 && len(sender.secretName) == 0 {
if len(sender.httpHeaderName) == 0 {
Expand All @@ -284,7 +282,7 @@ func (sender HTTPSender) determineIfUsingSecrets(ctx interfaces.AppFunctionConte
return true, nil
}

func (sender HTTPSender) setRetryData(ctx interfaces.AppFunctionContext, exportData []byte) {
func (sender *HTTPSender) setRetryData(ctx interfaces.AppFunctionContext, exportData []byte) {
if sender.persistOnError {
ctx.SetRetryData(exportData)
}
Expand Down