-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
133 lines (120 loc) · 4.47 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"bytes"
"encoding/json"
"fmt"
"github.com/application-research/delta-db/db_models"
"github.com/application-research/delta-db/messaging"
db2 "github.com/application-research/delta-events-consumer/db"
"github.com/nsqio/go-nsq"
"github.com/spf13/viper"
"log"
"os"
"os/signal"
"syscall"
)
func main() {
// Instantiate a new consumer that subscribes to the specified topic
viper.SetConfigFile(".env")
viper.ReadInConfig()
dbDsn := viper.Get("DBDSN").(string)
db, err := db2.OpenDatabase(dbDsn)
if err != nil {
log.Fatal(err)
}
// Instantiate a new consumer that subscribes to the specified topic
consumer := messaging.NewDeltaMetricsMessageConsumer()
// add a handler to consume messages
consumer.Consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
var baseMessage messaging.DeltaMetricsBaseMessage
json.Unmarshal(message.Body, &baseMessage)
// parse the message body and check the db_models type using reflection
fmt.Println(baseMessage.ObjectType)
fmt.Println(baseMessage.Object)
switch baseMessage.ObjectType {
case "DeltaStartupLogs":
baseString := baseMessage.Object.(map[string]interface{})
var deltaStartupLogs db_models.DeltaStartupLogs
transcode(baseString, &deltaStartupLogs)
db.Create(&deltaStartupLogs)
case "ContentLog":
baseContentLog := baseMessage.Object.(map[string]interface{})
var contentLog db_models.ContentLog
transcode(baseContentLog, &contentLog)
db.Create(&contentLog)
case "ContentDealLog":
baseContentDealLog := baseMessage.Object.(map[string]interface{})
var contentDealLog db_models.ContentDealLog
transcode(baseContentDealLog, &contentDealLog)
db.Create(&contentDealLog)
case "ContentDealProposalLog":
baseContentDealLog := baseMessage.Object.(map[string]interface{})
var contentDealProposalLog db_models.ContentDealProposalLog
transcode(baseContentDealLog, &contentDealProposalLog)
db.Create(&contentDealProposalLog)
case "ContentDealProposalParametersLog":
baseContentDealLog := baseMessage.Object.(map[string]interface{})
var contentDealProposalParametersLog db_models.ContentDealProposalParametersLog
transcode(baseContentDealLog, &contentDealProposalParametersLog)
db.Create(&contentDealProposalParametersLog)
case "ContentMinerLog":
baseContentDealLog := baseMessage.Object.(map[string]interface{})
var contentMinerLog db_models.ContentMinerLog
transcode(baseContentDealLog, &contentMinerLog)
db.Create(&contentMinerLog)
case "ContentWalletLog":
baseContentDealLog := baseMessage.Object.(map[string]interface{})
var contentWalletLog db_models.ContentWalletLog
transcode(baseContentDealLog, &contentWalletLog)
db.Create(&contentWalletLog)
case "PieceCommitmentLog":
baseContentDealLog := baseMessage.Object.(map[string]interface{})
var pieceCommitmentLog db_models.PieceCommitmentLog
transcode(baseContentDealLog, &pieceCommitmentLog)
db.Create(&pieceCommitmentLog)
case "WalletLog":
baseContentDealLog := baseMessage.Object.(map[string]interface{})
var walletLog db_models.WalletLog
transcode(baseContentDealLog, &walletLog)
db.Create(&walletLog)
case "LogEvent":
baseLogEvent := baseMessage.Object.(map[string]interface{})
var logEvent messaging.LogEvent
transcode(baseLogEvent, &logEvent)
db.Create(&logEvent)
case "InstanceMetaLog":
baseInstanceMetaLog := baseMessage.Object.(map[string]interface{})
var instanceMetaLog db_models.InstanceMetaLog
transcode(baseInstanceMetaLog, &instanceMetaLog)
db.Create(&instanceMetaLog)
case "BatchImportLog":
baseInstanceMetaLog := baseMessage.Object.(map[string]interface{})
var batchImportLog db_models.BatchImportLog
transcode(baseInstanceMetaLog, &batchImportLog)
db.Create(&batchImportLog)
case "BatchImportContentLog":
baseInstanceMetaLog := baseMessage.Object.(map[string]interface{})
var batchContentLog db_models.BatchImportContentLog
transcode(baseInstanceMetaLog, &batchContentLog)
db.Create(&batchContentLog)
default:
log.Printf("Unknown message type: %v", baseMessage.ObjectType)
}
return nil
}))
err = consumer.Consumer.ConnectToNSQD(messaging.MetricsTopicUrl)
if err != nil {
log.Fatal(err)
}
// wait for signal to exit
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
// disconnect
consumer.Consumer.Stop()
}
func transcode(in, out interface{}) {
buf := new(bytes.Buffer)
json.NewEncoder(buf).Encode(in)
json.NewDecoder(buf).Decode(out)
}