diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c45acca --- /dev/null +++ b/.gitignore @@ -0,0 +1,62 @@ +# Created by .ignore support plugin (hsz.mobi) +### Go template +# Binaries for programs and plugins +*.exe +*.dll +*.so +*.dylib + +# Test binary, build with `go test -c` +*.test + +# Output of the go coverage tool, specifically when used with LiteIDE +*.out + +# Project-local glide cache, RE: https://github.com/Masterminds/glide/issues/736 +.glide/ +/vendor/ + +### JetBrains template +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and Webstorm +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff: +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/dictionaries + +# Sensitive or high-churn files: +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.xml +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml + +# Gradle: +.idea/**/gradle.xml +.idea/**/libraries + +# Mongo Explorer plugin: +.idea/**/mongoSettings.xml + +## File-based project format: +*.iws + +## Plugin-specific files: + +# IntelliJ +/out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties diff --git a/Makefile b/Makefile index dec9711..76f4a12 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ appname := kura-sowa sources := $(wildcard *.go) -build = GOOS=$(1) GOARCH=$(2) go build -o build/$(appname)$(3) +build = GOOS=$(1) GOARCH=$(2) go build -o build/$(1)/$(2)/$(appname)$(3) tar = cd build && tar -cvzf $(1)_$(2).tar.gz $(appname)$(3) && rm $(appname)$(3) zip = cd build && zip $(1)_$(2).zip $(appname)$(3) && rm $(appname)$(3) @@ -10,42 +10,46 @@ zip = cd build && zip $(1)_$(2).zip $(appname)$(3) && rm $(appname)$(3) all: windows darwin linux +install: + go install -v clean: rm -rf build/ ##### LINUX BUILDS ##### -linux: build/linux_arm.tar.gz build/linux_arm64.tar.gz build/linux_386.tar.gz build/linux_amd64.tar.gz +linux: linux-arm linux-arm64 linux-386 linux-amd64 -build/linux_386.tar.gz: $(sources) +linux-386: $(sources) $(call build,linux,386,) - $(call tar,linux,386) + # $(call tar,linux,386) -build/linux_amd64.tar.gz: $(sources) +linux-amd64: $(sources) $(call build,linux,amd64,) - $(call tar,linux,amd64) + # $(call tar,linux,amd64) -build/linux_arm.tar.gz: $(sources) +linux-arm: $(sources) $(call build,linux,arm,) - $(call tar,linux,arm) + # $(call tar,linux,arm) -build/linux_arm64.tar.gz: $(sources) +linux-arm64: $(sources) $(call build,linux,arm64,) - $(call tar,linux,arm64) + # $(call tar,linux,arm64) ##### DARWIN (MAC) BUILDS ##### -darwin: build/darwin_amd64.tar.gz +darwin: $(sources) + $(call build,darwin,amd64,) + # $(call tar,darwin,amd64) build/darwin_amd64.tar.gz: $(sources) $(call build,darwin,amd64,) - $(call tar,darwin,amd64) + # $(call tar,darwin,amd64) ##### WINDOWS BUILDS ##### -windows: build/windows_386.zip build/windows_amd64.zip +windows: windows-386 windows-amd64 -build/windows_386.zip: $(sources) +windows-386: $(sources) $(call build,windows,386,.exe) - $(call zip,windows,386,.exe) + # $(call zip,windows,386,.exe) -build/windows_amd64.zip: $(sources) +windows-amd64: $(sources) $(call build,windows,amd64,.exe) - $(call zip,windows,amd64,.exe) \ No newline at end of file + # $(call zip,windows,amd64,.exe) \ No newline at end of file diff --git a/main.go b/main.go index 4786ae3..aeb8798 100644 --- a/main.go +++ b/main.go @@ -7,7 +7,7 @@ import ( "flag" //"github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" - "github.com/prometheus/log" + log "github.com/sirupsen/logrus" "io/ioutil" "os" "os/signal" @@ -16,66 +16,89 @@ import ( "time" MQTT "github.com/eclipse/paho.mqtt.golang" - pb "github.com/federicobaldo/go-kura/kuradatatypes" - "strings" + kpb "github.com/ccustine/kura-sowa/kuradatatypes" ) func getPayload(payloadBytes []byte) ([]byte, error) { log.Debugf("Maybe this is compressed...") gzipReader, err := gzip.NewReader(bytes.NewReader(payloadBytes)) if err != nil { - log.Debugf("Not gzipped: %v", err) //Not a gzip payload + log.Infof("Not gzipped: %v", err) //Not a gzip payload return payloadBytes, nil } bytesArray, err := ioutil.ReadAll(gzipReader) log.Debugf("Read %v bytes.", len(bytesArray)) if err != nil { - log.Infof("Maybe it is not compressed...") + log.Infof("Maybe it is not compressed after all...") bytesArray = payloadBytes } - + log.Debugf("gzipped Kura Payload...") return bytesArray, nil } func onMessageReceived(client MQTT.Client, message MQTT.Message) { bytesArray, err := getPayload(message.Payload()) + ctxLogger := log.WithFields(log.Fields{ + //"id": message.MessageID(), + "topic": message.Topic(), + }) + if err != nil { - log.Fatal("Unable to unmarshal payload") + ctxLogger.Fatal("Unable to unmarshal payload") } - kuraPayload := &pb.KuraPayload{} + kuraPayload := &kpb.KuraPayload{} err = proto.Unmarshal(bytesArray, kuraPayload) if err != nil { - log.Errorf("%v", err) - log.Errorf("Not a valid Kura message: %s\nMessage: %s\n", message.Topic(), message.Payload()) + ctxLogger.Errorf("%v", err) + ctxLogger.Errorf("Not a valid Kura message: %s\nMessage: %s\n", message.Topic(), message.Payload()) return } //marshaler := &jsonpb.Marshaler{} //jsonString, _ := marshaler.MarshalToString(kuraPayload) - log.Infof("Sensor ID: %s", message.Topic()[strings.LastIndex(message.Topic(), "/") + 1: ]) + //tm := time.Unix(0, kuraPayload.GetTimestamp() * int64(time.Millisecond)) + //ctxLogger.Infof("Full Topic: %s", message.Topic()) + //ctxLogger.Infof("Sensor ID: %s - Timestamp: %s", message.Topic()[strings.LastIndex(message.Topic(), "/") + 1: ], tm.Local()) + fields := log.Fields{} for _, metric := range kuraPayload.Metric { switch metric.GetType() { - case pb.KuraPayload_KuraMetric_INT32: - log.Infof("\t%s (%s): %v", metric.GetName(), metric.GetType(), metric.GetIntValue()) - case pb.KuraPayload_KuraMetric_INT64: - log.Infof("\t%s (%s): %v", metric.GetName(), metric.GetType(), metric.GetLongValue()) - case pb.KuraPayload_KuraMetric_BOOL: - log.Infof("\t%s (%s): %v", metric.GetName(), metric.GetType(), metric.GetBoolValue()) - case pb.KuraPayload_KuraMetric_DOUBLE: - log.Infof("\t%s (%s): %v", metric.GetName(), metric.GetType(), metric.GetDoubleValue()) - case pb.KuraPayload_KuraMetric_FLOAT: - log.Infof("\t%s (%s): %v", metric.GetName(), metric.GetType(), metric.GetFloatValue()) - case pb.KuraPayload_KuraMetric_BYTES: - log.Infof("\t%s (%s): %v", metric.GetName(), metric.GetType(), metric.GetBoolValue()) - case pb.KuraPayload_KuraMetric_STRING: - log.Infof("\t%s (%s): %s", metric.GetName(), metric.GetType(), metric.GetStringValue()) + case kpb.KuraPayload_KuraMetric_INT32: + fields[metric.GetName()] = metric.GetIntValue() + //ctxLogger.Infof("\t%s (%s): %v", metric.GetName(), metric.GetType(), metric.GetIntValue()) + case kpb.KuraPayload_KuraMetric_INT64: + fields[metric.GetName()] = metric.GetLongValue() + //ctxLogger.Infof("\t%s (%s): %v", metric.GetName(), metric.GetType(), metric.GetLongValue()) + case kpb.KuraPayload_KuraMetric_BOOL: + fields[metric.GetName()] = metric.GetBoolValue() + //ctxLogger.Infof("\t%s (%s): %v", metric.GetName(), metric.GetType(), metric.GetBoolValue()) + case kpb.KuraPayload_KuraMetric_DOUBLE: + fields[metric.GetName()] = metric.GetDoubleValue() + /* + if (metric.GetName() == "timestamp") { + fields["metric_timestamp"] = time.Unix(0, int64(time.Millisecond) * int64(metric.GetDoubleValue())) + } + */ + //ctxLogger.Infof("\t%s (%s): %v", metric.GetName(), metric.GetType(), metric.GetDoubleValue()) + case kpb.KuraPayload_KuraMetric_FLOAT: + fields[metric.GetName()] = metric.GetFloatValue() + //ctxLogger.Infof("\t%s (%s): %v", metric.GetName(), metric.GetType(), metric.GetFloatValue()) + case kpb.KuraPayload_KuraMetric_BYTES: + fields[metric.GetName()] = metric.GetBytesValue() + //ctxLogger.Infof("\t%s (%s): %v", metric.GetName(), metric.GetType(), metric.GetBoolValue()) + case kpb.KuraPayload_KuraMetric_STRING: + fields[metric.GetName()] = metric.GetStringValue() + //ctxLogger.Infof("\t%s (%s): %s", metric.GetName(), metric.GetType(), metric.GetStringValue()) default: - log.Infof("\t%s (%s): %s", metric.GetName(), metric.GetType(), metric.GetStringValue()) + //ctxLogger.Infof("\t%s (%s): %v", metric.GetName(), metric.GetType(), metric.GetStringValue()) } } + fields["payload_timestamp"] = time.Unix(0, kuraPayload.GetTimestamp() * int64(time.Millisecond)) + + ctxLogger.WithFields(fields).Infof("Kura Metric Payload:") + } var i int @@ -83,14 +106,7 @@ var i int func main() { //MQTT.DEBUG = log.New(os.Stdout, "", 0) //MQTT.ERROR = log.New(os.Stdout, "", 0) - c := make(chan os.Signal, 1) i = 0 - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - <-c - log.Infof("signal received, exiting") - os.Exit(0) - }() hostname, _ := os.Hostname() @@ -112,11 +128,17 @@ func main() { TLSConfig: tls.Config{InsecureSkipVerify: true, ClientAuth: tls.NoClientCert}, } connOpts.AddBroker(*server) - connOpts.OnConnect = func(c MQTT.Client) { + connOpts.SetAutoReconnect(true) + connOpts.SetConnectTimeout(5 * time.Second) + connOpts.SetConnectionLostHandler(func(c MQTT.Client, err error) { + log.Warnf("Connection has been lost: %s", err) + }) + connOpts.SetOnConnectHandler(func(c MQTT.Client) { + log.Info("Connected...") if token := c.Subscribe(*topic, byte(*qos), onMessageReceived); token.Wait() && token.Error() != nil { log.Fatalf("%v", token.Error()) } - } + }) client := MQTT.NewClient(connOpts) if token := client.Connect(); token.Wait() && token.Error() != nil { @@ -125,7 +147,12 @@ func main() { log.Infof("Connected to %s\n", *server) } - for { - time.Sleep(1 * time.Second) - } + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + <-c + log.Infof("signal received, exiting") + client.Disconnect(5000) + os.Exit(0) + + }