diff --git a/anubis/src/components/DelaysTableView.vue b/anubis/src/components/DelaysTableView.vue index eb3103c5..6266b4c0 100644 --- a/anubis/src/components/DelaysTableView.vue +++ b/anubis/src/components/DelaysTableView.vue @@ -7,7 +7,7 @@ type="text" placeholder="Search.." /> - + @@ -15,6 +15,7 @@ import BasicTable from "../components/BasicTable.vue"; import GridTable from "../components/GridTable.vue"; import { AppModule } from "../store/modules/app"; +import store from "@/store"; import { ReplayerModule } from "../store/modules/replayer"; import { Component, Vue, Prop } from "vue-property-decorator"; import { Notification } from "../generated/protobuf/models/internal/notifications/notification_pb"; @@ -31,39 +32,39 @@ export default class DelaysTableView extends Vue { protected search!: string; get receivedUpdates(): { [key: string]: any }[] { + return this.$store.state.notificationList.map(this.mapDelayToStringKey); + + /* return AppModule.notifications .slice( AppModule.notifications.length - 51, AppModule.notifications.length - 1 ) .map(mapDelayToStringKey); + + */ } -} -function mapDelayToStringKey( - notification: Notification -): { [key: string]: any } { - let delayStringKey: { [key: string]: any } = { - CeptaStationID: notification.getDelay()?.getStationId(), - CeptaID: notification.getDelay()?.getTransportId(), - Delay: notification - .getDelay() - ?.getDelay() - ?.getDelta(), - Details: notification - .getDelay() - ?.getDelay() - ?.getDetails() - }; + mapDelayToStringKey(notification: Notification): { [key: string]: any } { + let delayStringKey: { [key: string]: any } = { + CeptaStationID: notification.getDelay()?.getStationId(), + CeptaID: notification.getDelay()?.getTransportId(), + Delay: notification + .getDelay() + ?.getDelay() + ?.getDelta(), + Details: notification + .getDelay() + ?.getDelay() + ?.getDetails() + }; - return delayStringKey; + return delayStringKey; + } } diff --git a/core/src/main/java/org/bptlab/cepta/Main.java b/core/src/main/java/org/bptlab/cepta/Main.java index 78cf24c5..7aa2d6ef 100644 --- a/core/src/main/java/org/bptlab/cepta/Main.java +++ b/core/src/main/java/org/bptlab/cepta/Main.java @@ -93,16 +93,16 @@ public class Main implements Callable { private FlinkKafkaProducer011 trainDelayNotificationProducer; private void setupConsumers() { - this.liveTrainDataConsumer = - new FlinkKafkaConsumer011<>( - Topic.LIVE_TRAIN_DATA.getValueDescriptor().getName(), - new GenericBinaryProtoDeserializer(EventOuterClass.Event.class), - new KafkaConfig().withClientId("LiveTrainDataMainConsumer").getProperties()); - this.plannedTrainDataConsumer = - new FlinkKafkaConsumer011<>( - Topic.PLANNED_TRAIN_DATA.getValueDescriptor().getName(), - new GenericBinaryProtoDeserializer(EventOuterClass.Event.class), - new KafkaConfig().withClientId("PlannedTrainDataMainConsumer").getProperties()); + this.liveTrainDataConsumer = + new FlinkKafkaConsumer011<>( + Topic.LIVE_TRAIN_DATA.getValueDescriptor().getName(), + new GenericBinaryProtoDeserializer(EventOuterClass.Event.class), + new KafkaConfig().withClientId("LiveTrainDataMainConsumer").getProperties()); + this.plannedTrainDataConsumer = + new FlinkKafkaConsumer011<>( + Topic.PLANNED_TRAIN_DATA.getValueDescriptor().getName(), + new GenericBinaryProtoDeserializer(EventOuterClass.Event.class), + new KafkaConfig().withClientId("PlannedTrainDataMainConsumer").getProperties()); this.weatherDataConsumer = new FlinkKafkaConsumer011<>( diff --git a/deployment/dev/compose/core.compose.yml b/deployment/dev/compose/core.compose.yml index 410526d5..9de2365c 100644 --- a/deployment/dev/compose/core.compose.yml +++ b/deployment/dev/compose/core.compose.yml @@ -152,12 +152,13 @@ services: REDIS_HOST: redis REDIS_PORT: 6379 LOG: DEBUG + TESTING: "true" ports: - ${CEPTA_NOTIFICATION_WS_PORT}:5000 depends_on: #- flink-jobmanager #- flink-taskmanager - - kafka + #- kafkas #- replayer - redis - usermgmt diff --git a/osiris/notification/server.go b/osiris/notification/server.go index 0e9bc364..59af4b88 100644 --- a/osiris/notification/server.go +++ b/osiris/notification/server.go @@ -3,8 +3,6 @@ package main import ( "context" "fmt" - "github.com/bptlab/cepta/osiris/lib/utils" - "github.com/pkg/errors" "io" "net" "net/http" @@ -15,7 +13,9 @@ import ( "syscall" "time" - "github.com/go-redis/redis" + "github.com/bptlab/cepta/osiris/lib/utils" + "github.com/pkg/errors" + "github.com/bptlab/cepta/ci/versioning" topics "github.com/bptlab/cepta/models/constants/topic" pb "github.com/bptlab/cepta/models/grpc/notification" @@ -25,9 +25,10 @@ import ( "github.com/bptlab/cepta/models/internal/types/result" "github.com/bptlab/cepta/models/internal/types/users" libcli "github.com/bptlab/cepta/osiris/lib/cli" - libredis "github.com/bptlab/cepta/osiris/lib/redis" kafkaconsumer "github.com/bptlab/cepta/osiris/lib/kafka/consumer" + libredis "github.com/bptlab/cepta/osiris/lib/redis" "github.com/bptlab/cepta/osiris/notification/websocket" + "github.com/go-redis/redis" clivalues "github.com/romnnn/flags4urfavecli/values" "github.com/golang/protobuf/proto" @@ -43,11 +44,13 @@ var Version string = "Unknown" // BuildTime will be injected at build time var BuildTime string = "" +var testing bool + var ( - defaultLruSize = 1000 // Cache up to 1000 transports - defaultLruMaxEntryLength = 1000 // Cache up to 1000 subscribers per transport - defaultUserNotificationsBufferSize = int64(200) // Store the most recent 200 notifications for each user (not persistent) - defaultNotificationTopic = topics.Topic_DELAY_NOTIFICATIONS + defaultLruSize = 1000 // Cache up to 1000 transports + defaultLruMaxEntryLength = 1000 // Cache up to 1000 subscribers per transport + defaultUserNotificationsBufferSize = int64(200) // Store the most recent 200 notifications for each user (not persistent) + defaultNotificationTopic = topics.Topic_DELAY_NOTIFICATIONS ) // Endpoint ... @@ -69,7 +72,7 @@ type NotificationServer struct { transportCache *lru.Cache Pool *websocket.Pool - usermgmtConn *grpc.ClientConn + usermgmtConn *grpc.ClientConn usermgmtClient usermgmtpb.UserManagementClient grpcServer *grpc.Server wsServer *http.Server @@ -78,26 +81,26 @@ type NotificationServer struct { kc *kafkaconsumer.Consumer redisConfig libredis.Config - rclient *redis.Client + rclient *redis.Client usermgmtEndpoint Endpoint - LruSize int - LruMaxEntryLength int + LruSize int + LruMaxEntryLength int UserNotificationsBufferSize int64 - NotificationTopic topics.Topic + NotificationTopic topics.Topic } // NewNotificationServer ... func NewNotificationServer(kafkaConfig kafkaconsumer.Config, redisConfig libredis.Config) NotificationServer { return NotificationServer{ kafkacConfig: kafkaConfig, - redisConfig: redisConfig, + redisConfig: redisConfig, // Use defaults - LruSize: defaultLruSize, - LruMaxEntryLength: defaultLruMaxEntryLength, + LruSize: defaultLruSize, + LruMaxEntryLength: defaultLruMaxEntryLength, UserNotificationsBufferSize: defaultUserNotificationsBufferSize, - NotificationTopic: defaultNotificationTopic, + NotificationTopic: defaultNotificationTopic, } } @@ -118,7 +121,7 @@ func (s *NotificationServer) Shutdown() { } if s.wsServer != nil { log.Info("Stopping websocket server") - if err := s.wsServer. Shutdown(context.TODO()); err != nil { + if err := s.wsServer.Shutdown(context.TODO()); err != nil { log.Warnf("Failed to close websocket server cleanly: %v", err) } } @@ -274,18 +277,26 @@ func (s *NotificationServer) serveWebsocket(pool *websocket.Pool, w http.Respons } pool.Register <- client - client.Read() } func (s *NotificationServer) handleKafkaMessages(ctx context.Context) { - noopTicker := time.NewTicker(time.Second * 10) + // Send pings to client with this period. Must be less than pongWait on the client. + const pingPeriod = time.Second * 2 + noopTicker := time.NewTicker(pingPeriod) subscriberDone := make(chan bool, 1) stopSubscriber := make(chan bool, 1) go func() { - defer func() { subscriberDone <- true }() + defer func() { + stopSubscriber <- true + noopTicker.Stop() + }() for { select { - + case <-noopTicker.C: + // Noop, may be used for periodic pings + log.Debug("ping") + s.Pool.Ping <- "ping" + break case msg := <-s.kc.Messages: var notification notificationpb.Notification err := proto.Unmarshal(msg.Value, ¬ification) @@ -295,7 +306,7 @@ func (s *NotificationServer) handleKafkaMessages(ctx context.Context) { log.Debugf("Received notification: %v (%v)", notification, msg.Timestamp) // Set occurrence time as a best effort - if occurred := notification.GetOccurred(); occurred.GetSeconds() + int64(occurred.GetNanos()) < 1 && !msg.Timestamp.IsZero() { + if occurred := notification.GetOccurred(); occurred.GetSeconds()+int64(occurred.GetNanos()) < 1 && !msg.Timestamp.IsZero() { if occurredProto, err := utils.ToProtoTime(msg.Timestamp); err != nil { notification.Occurred = occurredProto } @@ -304,11 +315,13 @@ func (s *NotificationServer) handleKafkaMessages(ctx context.Context) { switch notification.GetNotification().(type) { case *notificationpb.Notification_Delay: // Just for testing porpuses - // s.broadcast(¬ification) - // Users need to be assigned to transports to do this - - if err := s.notifySubscribersForTransport(ctx, notification.GetDelay().GetTransportId(), ¬ification); err != nil { - log.Errorf("Failed to notify subscribers of transport %v: %v", notification.GetDelay().GetTransportId(), err) + if testing { + s.broadcast(¬ification) + } else { + // Users need to be assigned to transports to do this + if err := s.notifySubscribersForTransport(ctx, notification.GetDelay().GetTransportId(), ¬ification); err != nil { + log.Errorf("Failed to notify subscribers of transport %v: %v", notification.GetDelay().GetTransportId(), err) + } } break case *notificationpb.Notification_System: @@ -316,8 +329,6 @@ func (s *NotificationServer) handleKafkaMessages(ctx context.Context) { break } break - case <-noopTicker.C: - // Noop, may be used for periodic pings case <-stopSubscriber: return } @@ -458,6 +469,12 @@ func main() { EnvVars: []string{"NOTIFICATIONS_TOPIC"}, Usage: "topic to subscribe for notifications", }, + &cli.BoolFlag{ + Name: "testing", + Value: false, + EnvVars: []string{"TESTING"}, + Usage: "notification service broadcasts all messages for testing purposes", + }, }...) app := &cli.App{ @@ -472,6 +489,7 @@ func main() { level = log.InfoLevel } log.SetLevel(level) + testing = ctx.Bool("testing") server := NewNotificationServer(kafkaconsumer.Config{}.ParseCli(ctx), libredis.Config{}.ParseCli(ctx)) server.usermgmtEndpoint = Endpoint{Host: ctx.String("usermgmt-host"), Port: ctx.Int("usermgmt-port")} diff --git a/osiris/notification/websocket/client.go b/osiris/notification/websocket/client.go index 2a6efc2e..7dc62bc8 100644 --- a/osiris/notification/websocket/client.go +++ b/osiris/notification/websocket/client.go @@ -2,6 +2,7 @@ package websocket import ( "sync" + "time" pb "github.com/bptlab/cepta/models/grpc/notification" "github.com/bptlab/cepta/models/internal/types/users" @@ -10,57 +11,72 @@ import ( log "github.com/sirupsen/logrus" ) +const pongWait = time.Second * 4 + // Client ... type Client struct { - Conn *websocket.Conn - Pool *Pool - ID *users.UserID - Token string - done chan bool - mu sync.Mutex + Conn *websocket.Conn + Pool *Pool + ID *users.UserID + Token string + done chan bool + mu sync.Mutex } func (c *Client) Read() { - defer func() { - c.Pool.Unregister <- c - _ = c.Conn.Close() - }() + go func() { + defer func() { + c.Pool.Unregister <- c + }() - for { - messageType, message, err := c.Conn.ReadMessage() - if err != nil { - log.Errorf("Error reading message from websocket connection: %v", err) - return - } + // Define pong logic to disconnect to client when unreachable + c.Conn.SetReadDeadline(time.Now().Add(pongWait)) + c.Conn.SetPongHandler(func(string) error { + c.Conn.SetReadDeadline(time.Now().Add(pongWait)) + log.Debug("PongHandler") + return nil + }) - switch messageType { - case websocket.BinaryMessage: - // Attempt to decode client message - var clientMessage pb.ClientMessage - err = proto.Unmarshal(message, &clientMessage) + for { + messageType, message, err := c.Conn.ReadMessage() if err != nil { - log.Errorf("unmarshal error: %v", err) + log.Debugf("Error reading message from websocket connection: %v", err) + // return } - switch clientMessage.GetMessage().(type) { - case *pb.ClientMessage_Announcement: - clientID := clientMessage.GetAnnouncement().GetUserId() - clientToken := clientMessage.GetAnnouncement().GetToken() - if clientID == nil || clientID.GetId() == "" || clientToken == "" { - log.Warnf("Received invalid user announcement: %v", clientMessage.GetAnnouncement()) + + switch messageType { + case websocket.BinaryMessage: + // Attempt to decode client message + var clientMessage pb.ClientMessage + err = proto.Unmarshal(message, &clientMessage) + if err != nil { + log.Errorf("unmarshal error: %v", err) + } + switch clientMessage.GetMessage().(type) { + case *pb.ClientMessage_Announcement: + clientID := clientMessage.GetAnnouncement().GetUserId() + clientToken := clientMessage.GetAnnouncement().GetToken() + if clientID == nil || clientID.GetId() == "" || clientToken == "" { + log.Warnf("Received invalid user announcement: %v", clientMessage.GetAnnouncement()) + break + } + // TODO: Check auth! + log.Infof("User registered with ID %s", clientID) + c.ID = clientID + c.Token = clientToken + c.Pool.Login <- c break + default: + log.Warnf("Received client message of unknown type: %v", clientMessage) } - // TODO: Check auth! - log.Infof("User registered with ID %s", clientID) - c.ID = clientID - c.Token = clientToken - c.Pool.Login <- c + break + case -1: + log.Warnf("User is disconnected %v", messageType) + return break default: - log.Warnf("Received client message of unknown type: %v", clientMessage) + log.Warnf("Received non binary websocket message of type %v", messageType) } - break - default: - log.Warnf("Received non binary websocket message of type %v", messageType) } - } + }() } diff --git a/osiris/notification/websocket/pool.go b/osiris/notification/websocket/pool.go index afa35a43..952a57d9 100644 --- a/osiris/notification/websocket/pool.go +++ b/osiris/notification/websocket/pool.go @@ -2,9 +2,10 @@ package websocket import ( "fmt" - "github.com/go-redis/redis" - "github.com/bptlab/cepta/models/internal/types/users" + notificationpb "github.com/bptlab/cepta/models/internal/notifications/notification" + "github.com/bptlab/cepta/models/internal/types/users" + "github.com/go-redis/redis" "github.com/golang/protobuf/proto" "github.com/gorilla/websocket" log "github.com/sirupsen/logrus" @@ -19,9 +20,9 @@ type Notification interface { // UserNotification ... type UserNotification struct { - ID *users.UserID - Score float64 - Msg []byte + ID *users.UserID + Score float64 + Msg []byte Resend bool } @@ -39,8 +40,8 @@ func (un UserNotification) IsResend() bool { // BroadcastNotification ... type BroadcastNotification struct { - Score float64 - Msg []byte + Score float64 + Msg []byte Resend bool } @@ -63,10 +64,11 @@ type Pool struct { Login chan *Client NotifyUser chan UserNotification Broadcast chan BroadcastNotification + Ping chan string Clients map[*Client]bool ClientMapping map[string]*Client - Rclient *redis.Client - BufferSize int64 + Rclient *redis.Client + BufferSize int64 } // NewPool ... @@ -77,9 +79,10 @@ func NewPool(size int64) *Pool { Unregister: make(chan *Client), NotifyUser: make(chan UserNotification, 100), Broadcast: make(chan BroadcastNotification, 100), + Ping: make(chan string), Clients: make(map[*Client]bool), ClientMapping: make(map[string]*Client), - BufferSize: size, + BufferSize: size, } } @@ -177,6 +180,7 @@ func (pool *Pool) Start() { client.done = make(chan bool) pool.Clients[client] = true log.Debugf("connection pool size: %v", len(pool.Clients)) + client.Read() break case client := <-pool.Login: pool.ClientMapping[client.ID.GetId()] = client @@ -191,9 +195,7 @@ func (pool *Pool) Start() { } break case client := <-pool.Unregister: - client.done <- true delete(pool.Clients, client) - log.Infof("connection pool size: %v", len(pool.Clients)) break case notifyUserRequest := <-pool.NotifyUser: if client, ok := pool.ClientMapping[notifyUserRequest.ID.GetId()]; ok { @@ -210,6 +212,7 @@ func (pool *Pool) Start() { } break case broadcastRequest := <-pool.Broadcast: + log.Debugf("pool size is now at %v", len(pool.Clients)) log.Debug("Broadcasting message to all clients in pool") for client := range pool.Clients { if err := client.Conn.WriteMessage(websocket.BinaryMessage, broadcastRequest.Msg); err != nil { @@ -217,6 +220,15 @@ func (pool *Pool) Start() { } } break + case <-pool.Ping: + log.Debugf("connection pool size: %v", len(pool.Clients)) + log.Debug("Broadcasting ping to all clients in pool") + for client := range pool.Clients { + if err := client.Conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil { + log.Debugf("Broadcasting ping to client %v failed: %v", client, err) + } + } + break } } }