From 3d2974289b7f69d2ee6845883796bdb7ee3e232c Mon Sep 17 00:00:00 2001 From: Leonard Petter Date: Thu, 21 May 2020 03:34:03 +0200 Subject: [PATCH 1/8] Minor changes on working with some bugs --- anubis/src/components/DelaysTableView.vue | 1 + anubis/src/utils.ts | 2 +- anubis/src/views/Dashboard.vue | 4 ++++ 3 files changed, 6 insertions(+), 1 deletion(-) diff --git a/anubis/src/components/DelaysTableView.vue b/anubis/src/components/DelaysTableView.vue index eb3103c5..b537dfc4 100644 --- a/anubis/src/components/DelaysTableView.vue +++ b/anubis/src/components/DelaysTableView.vue @@ -63,6 +63,7 @@ function mapDelayToStringKey( From 6f98cd4d414535b5cb64c772fb96249d61cfcd0f Mon Sep 17 00:00:00 2001 From: Leonard Petter Date: Thu, 21 May 2020 03:39:44 +0200 Subject: [PATCH 2/8] Linted --- anubis/src/utils.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/anubis/src/utils.ts b/anubis/src/utils.ts index ba2375a1..28cb7aa7 100644 --- a/anubis/src/utils.ts +++ b/anubis/src/utils.ts @@ -10,7 +10,7 @@ export const timeSincePretty = ( sec: number; msec: number; } => { - let minutes = seconds / 60 ; + let minutes = seconds / 60; let msec = minutes * 60 * 1000; let hh = Math.floor(msec / 1000 / 60 / 60); msec -= hh * 1000 * 60 * 60; From c66e2050de5efdf559254e7a4def8ed0c90522b6 Mon Sep 17 00:00:00 2001 From: Leonard Petter Date: Thu, 21 May 2020 12:46:46 +0200 Subject: [PATCH 3/8] Attempt to fix our issues with websocket connections --- osiris/notification/websocket/client.go | 7 +++++-- osiris/notification/websocket/pool.go | 1 + 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/osiris/notification/websocket/client.go b/osiris/notification/websocket/client.go index 2a6efc2e..5a5d596a 100644 --- a/osiris/notification/websocket/client.go +++ b/osiris/notification/websocket/client.go @@ -29,8 +29,8 @@ func (c *Client) Read() { for { messageType, message, err := c.Conn.ReadMessage() if err != nil { - log.Errorf("Error reading message from websocket connection: %v", err) - return + log.Debugf("Error reading message from websocket connection: %v", err) + // return } switch messageType { @@ -59,6 +59,9 @@ func (c *Client) Read() { log.Warnf("Received client message of unknown type: %v", clientMessage) } break + case -1: + log.Warnf("User is disconnected %v", messageType) + break default: log.Warnf("Received non binary websocket message of type %v", messageType) } diff --git a/osiris/notification/websocket/pool.go b/osiris/notification/websocket/pool.go index afa35a43..2a69999a 100644 --- a/osiris/notification/websocket/pool.go +++ b/osiris/notification/websocket/pool.go @@ -210,6 +210,7 @@ func (pool *Pool) Start() { } break case broadcastRequest := <-pool.Broadcast: + log.Debugf("connection pool size: %v", len(pool.Clients)) log.Debug("Broadcasting message to all clients in pool") for client := range pool.Clients { if err := client.Conn.WriteMessage(websocket.BinaryMessage, broadcastRequest.Msg); err != nil { From 6f2850d91351cb0fd7339c86fc68be116d50fb75 Mon Sep 17 00:00:00 2001 From: Leonard Petter Date: Wed, 27 May 2020 13:15:17 +0200 Subject: [PATCH 4/8] [IMP] Persistent storage of DelayNotifications --- anubis/src/components/DelaysTableView.vue | 30 ++++++++++++------- anubis/src/store/index.ts | 6 ++-- anubis/src/store/modules/app.ts | 6 +++- core/src/main/java/org/bptlab/cepta/Main.java | 30 +++++++++---------- osiris/notification/server.go | 5 ++-- 5 files changed, 44 insertions(+), 33 deletions(-) diff --git a/anubis/src/components/DelaysTableView.vue b/anubis/src/components/DelaysTableView.vue index b537dfc4..7b947b26 100644 --- a/anubis/src/components/DelaysTableView.vue +++ b/anubis/src/components/DelaysTableView.vue @@ -15,6 +15,7 @@ import BasicTable from "../components/BasicTable.vue"; import GridTable from "../components/GridTable.vue"; import { AppModule } from "../store/modules/app"; +import store from "@/store"; import { ReplayerModule } from "../store/modules/replayer"; import { Component, Vue, Prop } from "vue-property-decorator"; import { Notification } from "../generated/protobuf/models/internal/notifications/notification_pb"; @@ -31,33 +32,40 @@ export default class DelaysTableView extends Vue { protected search!: string; get receivedUpdates(): { [key: string]: any }[] { + return this.$store.state.notifications.map(this.mapDelayToStringKey); + + /* return AppModule.notifications .slice( AppModule.notifications.length - 51, AppModule.notifications.length - 1 ) .map(mapDelayToStringKey); + + */ } -} -function mapDelayToStringKey( - notification: Notification -): { [key: string]: any } { - let delayStringKey: { [key: string]: any } = { - CeptaStationID: notification.getDelay()?.getStationId(), - CeptaID: notification.getDelay()?.getTransportId(), - Delay: notification + mapDelayToStringKey( + notification: Notification + ): { [key: string]: any } { + let delayStringKey: { [key: string]: any } = { + CeptaStationID: notification.getDelay()?.getStationId(), + CeptaID: notification.getDelay()?.getTransportId(), + Delay: notification .getDelay() ?.getDelay() ?.getDelta(), - Details: notification + Details: notification .getDelay() ?.getDelay() ?.getDetails() - }; + }; - return delayStringKey; + return delayStringKey; + } } + + diff --git a/anubis/src/components/Sidebar.vue b/anubis/src/components/Sidebar.vue index 57633149..b280d3bb 100644 --- a/anubis/src/components/Sidebar.vue +++ b/anubis/src/components/Sidebar.vue @@ -239,11 +239,10 @@ export default class Sidebar extends Vue { display: inline-block top: 20% height: 60% - padding-right: 18px .logo-text +theme(color, c-logo-text) - padding-left: 10px + padding-left: 23px .mobile-toggle display: none diff --git a/anubis/src/store/index.ts b/anubis/src/store/index.ts index cb6eb375..46b77320 100644 --- a/anubis/src/store/index.ts +++ b/anubis/src/store/index.ts @@ -17,8 +17,19 @@ export interface IRootState { // export default new Vuex.Store({ export default new Vuex.Store({ state: { - notifications: [], + notificationList: Array(), }, mutations: { + addNotification(state, notification: Notification) { + if (state.notificationList.length > 100){ + state.notificationList = state.notificationList.slice(-99, -1); + } + state.notificationList = [notification, ...state.notificationList] + } + }, + actions: { + addNotification(context, notification: Notification) { + context.commit('addNotification', notification); + } } }); diff --git a/anubis/src/store/modules/app.ts b/anubis/src/store/modules/app.ts index bbd0fbef..f75bd8e8 100644 --- a/anubis/src/store/modules/app.ts +++ b/anubis/src/store/modules/app.ts @@ -54,16 +54,6 @@ class App extends VuexModule implements IAppState { this.isLoading = loading; } - @Mutation - public addNotification(notification: Notification) { - console.log(typeof store.state.notifications); - if (store.state.notifications.length > 100){ - store.state.notifications = store.state.notifications.slice(-99, -1); - } - store.state.notifications.push(notification); - // this.notifications.push(notification); - } - @Mutation public toggleTheme() { this.theme = (this.theme + 1) % this.availableThemes.length; diff --git a/anubis/src/store/modules/notifications.ts b/anubis/src/store/modules/notifications.ts index 5d6240e9..2a722fee 100644 --- a/anubis/src/store/modules/notifications.ts +++ b/anubis/src/store/modules/notifications.ts @@ -1,3 +1,4 @@ + import { VuexModule, Module, @@ -48,7 +49,7 @@ class Notifications extends VuexModule implements NotificationsState { let deserializedEvent = Notification.deserializeBinary( new Uint8Array(event.data) ); - AppModule.addNotification(deserializedEvent); + store.dispatch("addNotification", deserializedEvent); } @Mutation diff --git a/anubis/src/views/Dashboard.vue b/anubis/src/views/Dashboard.vue index 56ef32cf..25c5bd9d 100644 --- a/anubis/src/views/Dashboard.vue +++ b/anubis/src/views/Dashboard.vue @@ -50,7 +50,7 @@ layoutStyle="col-md-12" >

You can use the navigation bar to filter

- +
@@ -136,6 +136,8 @@ export default class Dashboard extends Vue {} font-size: 2.5rem - .delays-overview - height: auto + .delay-table + padding-right: 10px + height: 500px + overflow-y: scroll !important diff --git a/deployment/dev/compose/core.compose.yml b/deployment/dev/compose/core.compose.yml index e4534ab8..9de2365c 100644 --- a/deployment/dev/compose/core.compose.yml +++ b/deployment/dev/compose/core.compose.yml @@ -152,6 +152,7 @@ services: REDIS_HOST: redis REDIS_PORT: 6379 LOG: DEBUG + TESTING: "true" ports: - ${CEPTA_NOTIFICATION_WS_PORT}:5000 depends_on: diff --git a/deployment/dev/devenv.sh b/deployment/dev/devenv.sh index f075929b..19e03d9a 100755 --- a/deployment/dev/devenv.sh +++ b/deployment/dev/devenv.sh @@ -11,10 +11,10 @@ if [ -z "$BUILD" ]; then else if [[ "$OSTYPE" == "darwin"* ]]; then # macOS - #bazel run //osiris/usermgmt:build-image --platforms=@io_bazel_rules_go//go/toolchain:linux_amd64 + bazel run //osiris/usermgmt:build-image --platforms=@io_bazel_rules_go//go/toolchain:linux_amd64 bazel run //osiris/notification:build-image --platforms=@io_bazel_rules_go//go/toolchain:linux_amd64 - #bazel run //osiris/auth:build-image --platforms=@io_bazel_rules_go//go/toolchain:linux_amd64 - #bazel run //auxiliary/producers/replayer:build-image --platforms=@io_bazel_rules_go//go/toolchain:linux_amd64 + bazel run //osiris/auth:build-image --platforms=@io_bazel_rules_go//go/toolchain:linux_amd64 + bazel run //auxiliary/producers/replayer:build-image --platforms=@io_bazel_rules_go//go/toolchain:linux_amd64 else # Build local images first bazel run //:build-images diff --git a/osiris/notification/server.go b/osiris/notification/server.go index 9c4d4802..59af4b88 100644 --- a/osiris/notification/server.go +++ b/osiris/notification/server.go @@ -44,6 +44,8 @@ var Version string = "Unknown" // BuildTime will be injected at build time var BuildTime string = "" +var testing bool + var ( defaultLruSize = 1000 // Cache up to 1000 transports defaultLruMaxEntryLength = 1000 // Cache up to 1000 subscribers per transport @@ -313,13 +315,14 @@ func (s *NotificationServer) handleKafkaMessages(ctx context.Context) { switch notification.GetNotification().(type) { case *notificationpb.Notification_Delay: // Just for testing porpuses - s.broadcast(¬ification) - // Users need to be assigned to transports to do this - /* + if testing { + s.broadcast(¬ification) + } else { + // Users need to be assigned to transports to do this if err := s.notifySubscribersForTransport(ctx, notification.GetDelay().GetTransportId(), ¬ification); err != nil { log.Errorf("Failed to notify subscribers of transport %v: %v", notification.GetDelay().GetTransportId(), err) } - */ + } break case *notificationpb.Notification_System: s.broadcast(¬ification) @@ -466,6 +469,12 @@ func main() { EnvVars: []string{"NOTIFICATIONS_TOPIC"}, Usage: "topic to subscribe for notifications", }, + &cli.BoolFlag{ + Name: "testing", + Value: false, + EnvVars: []string{"TESTING"}, + Usage: "notification service broadcasts all messages for testing purposes", + }, }...) app := &cli.App{ @@ -480,6 +489,7 @@ func main() { level = log.InfoLevel } log.SetLevel(level) + testing = ctx.Bool("testing") server := NewNotificationServer(kafkaconsumer.Config{}.ParseCli(ctx), libredis.Config{}.ParseCli(ctx)) server.usermgmtEndpoint = Endpoint{Host: ctx.String("usermgmt-host"), Port: ctx.Int("usermgmt-port")} diff --git a/osiris/notification/websocket/client.go b/osiris/notification/websocket/client.go index b7cc9e28..7dc62bc8 100644 --- a/osiris/notification/websocket/client.go +++ b/osiris/notification/websocket/client.go @@ -24,59 +24,59 @@ type Client struct { } func (c *Client) Read() { - defer func() { - c.Pool.Unregister <- c - // _ = c.Conn.Close() - }() + go func() { + defer func() { + c.Pool.Unregister <- c + }() - // Define pong logic to disconnect to client when unreachable - c.Conn.SetReadDeadline(time.Now().Add(pongWait)) - c.Conn.SetPongHandler(func(string) error { + // Define pong logic to disconnect to client when unreachable c.Conn.SetReadDeadline(time.Now().Add(pongWait)) - log.Debug("PongHandler") - return nil - }) - - for { - messageType, message, err := c.Conn.ReadMessage() - if err != nil { - log.Debugf("Error reading message from websocket connection: %v", err) - // return - } + c.Conn.SetPongHandler(func(string) error { + c.Conn.SetReadDeadline(time.Now().Add(pongWait)) + log.Debug("PongHandler") + return nil + }) - switch messageType { - case websocket.BinaryMessage: - // Attempt to decode client message - var clientMessage pb.ClientMessage - err = proto.Unmarshal(message, &clientMessage) + for { + messageType, message, err := c.Conn.ReadMessage() if err != nil { - log.Errorf("unmarshal error: %v", err) + log.Debugf("Error reading message from websocket connection: %v", err) + // return } - switch clientMessage.GetMessage().(type) { - case *pb.ClientMessage_Announcement: - clientID := clientMessage.GetAnnouncement().GetUserId() - clientToken := clientMessage.GetAnnouncement().GetToken() - if clientID == nil || clientID.GetId() == "" || clientToken == "" { - log.Warnf("Received invalid user announcement: %v", clientMessage.GetAnnouncement()) + + switch messageType { + case websocket.BinaryMessage: + // Attempt to decode client message + var clientMessage pb.ClientMessage + err = proto.Unmarshal(message, &clientMessage) + if err != nil { + log.Errorf("unmarshal error: %v", err) + } + switch clientMessage.GetMessage().(type) { + case *pb.ClientMessage_Announcement: + clientID := clientMessage.GetAnnouncement().GetUserId() + clientToken := clientMessage.GetAnnouncement().GetToken() + if clientID == nil || clientID.GetId() == "" || clientToken == "" { + log.Warnf("Received invalid user announcement: %v", clientMessage.GetAnnouncement()) + break + } + // TODO: Check auth! + log.Infof("User registered with ID %s", clientID) + c.ID = clientID + c.Token = clientToken + c.Pool.Login <- c break + default: + log.Warnf("Received client message of unknown type: %v", clientMessage) } - // TODO: Check auth! - log.Infof("User registered with ID %s", clientID) - c.ID = clientID - c.Token = clientToken - c.Pool.Login <- c + break + case -1: + log.Warnf("User is disconnected %v", messageType) + return break default: - log.Warnf("Received client message of unknown type: %v", clientMessage) + log.Warnf("Received non binary websocket message of type %v", messageType) } - break - case -1: - log.Warnf("User is disconnected %v", messageType) - delete(c.Pool.Clients, c) - return - break - default: - log.Warnf("Received non binary websocket message of type %v", messageType) } - } + }() } diff --git a/osiris/notification/websocket/pool.go b/osiris/notification/websocket/pool.go index b89ee08d..952a57d9 100644 --- a/osiris/notification/websocket/pool.go +++ b/osiris/notification/websocket/pool.go @@ -180,7 +180,7 @@ func (pool *Pool) Start() { client.done = make(chan bool) pool.Clients[client] = true log.Debugf("connection pool size: %v", len(pool.Clients)) - go client.Read() + client.Read() break case client := <-pool.Login: pool.ClientMapping[client.ID.GetId()] = client @@ -195,9 +195,7 @@ func (pool *Pool) Start() { } break case client := <-pool.Unregister: - client.done <- true delete(pool.Clients, client) - log.Fatalf("connection pool size: %v", len(pool.Clients)) break case notifyUserRequest := <-pool.NotifyUser: if client, ok := pool.ClientMapping[notifyUserRequest.ID.GetId()]; ok { From dda8c4f31b1764103376dc53fc26e889b0e491d3 Mon Sep 17 00:00:00 2001 From: Leonard Petter Date: Thu, 4 Jun 2020 03:45:10 +0200 Subject: [PATCH 8/8] Linted --- anubis/src/components/DelaysTableView.vue | 21 ++++++++------------- anubis/src/store/index.ts | 8 ++++---- anubis/src/store/modules/notifications.ts | 1 - anubis/src/views/Dashboard.vue | 2 +- 4 files changed, 13 insertions(+), 19 deletions(-) diff --git a/anubis/src/components/DelaysTableView.vue b/anubis/src/components/DelaysTableView.vue index af79afd3..6266b4c0 100644 --- a/anubis/src/components/DelaysTableView.vue +++ b/anubis/src/components/DelaysTableView.vue @@ -45,32 +45,27 @@ export default class DelaysTableView extends Vue { */ } - mapDelayToStringKey( - notification: Notification - ): { [key: string]: any } { + mapDelayToStringKey(notification: Notification): { [key: string]: any } { let delayStringKey: { [key: string]: any } = { CeptaStationID: notification.getDelay()?.getStationId(), CeptaID: notification.getDelay()?.getTransportId(), Delay: notification - .getDelay() - ?.getDelay() - ?.getDelta(), + .getDelay() + ?.getDelay() + ?.getDelta(), Details: notification - .getDelay() - ?.getDelay() - ?.getDetails() + .getDelay() + ?.getDelay() + ?.getDetails() }; return delayStringKey; } } - - diff --git a/anubis/src/store/index.ts b/anubis/src/store/index.ts index 46b77320..dde99543 100644 --- a/anubis/src/store/index.ts +++ b/anubis/src/store/index.ts @@ -17,19 +17,19 @@ export interface IRootState { // export default new Vuex.Store({ export default new Vuex.Store({ state: { - notificationList: Array(), + notificationList: Array() }, mutations: { addNotification(state, notification: Notification) { - if (state.notificationList.length > 100){ + if (state.notificationList.length > 100) { state.notificationList = state.notificationList.slice(-99, -1); } - state.notificationList = [notification, ...state.notificationList] + state.notificationList = [notification, ...state.notificationList]; } }, actions: { addNotification(context, notification: Notification) { - context.commit('addNotification', notification); + context.commit("addNotification", notification); } } }); diff --git a/anubis/src/store/modules/notifications.ts b/anubis/src/store/modules/notifications.ts index 2a722fee..2c6b9643 100644 --- a/anubis/src/store/modules/notifications.ts +++ b/anubis/src/store/modules/notifications.ts @@ -1,4 +1,3 @@ - import { VuexModule, Module, diff --git a/anubis/src/views/Dashboard.vue b/anubis/src/views/Dashboard.vue index 25c5bd9d..c9f8b970 100644 --- a/anubis/src/views/Dashboard.vue +++ b/anubis/src/views/Dashboard.vue @@ -50,7 +50,7 @@ layoutStyle="col-md-12" >

You can use the navigation bar to filter

- +