Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

277 pattern in redbull trains #278

Open
wants to merge 9 commits into
base: dev
Choose a base branch
from
43 changes: 22 additions & 21 deletions anubis/src/components/DelaysTableView.vue
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
type="text"
placeholder="Search.."
/>
<grid-table :grid-data="receivedUpdates"></grid-table>
<grid-table id="table" :grid-data="receivedUpdates"></grid-table>
</div>
</template>

<script lang="ts">
import BasicTable from "../components/BasicTable.vue";
import GridTable from "../components/GridTable.vue";
import { AppModule } from "../store/modules/app";
import store from "@/store";
import { ReplayerModule } from "../store/modules/replayer";
import { Component, Vue, Prop } from "vue-property-decorator";
import { Notification } from "../generated/protobuf/models/internal/notifications/notification_pb";
Expand All @@ -31,39 +32,39 @@ export default class DelaysTableView extends Vue {
protected search!: string;

get receivedUpdates(): { [key: string]: any }[] {
return this.$store.state.notificationList.map(this.mapDelayToStringKey);

/*
return AppModule.notifications
.slice(
AppModule.notifications.length - 51,
AppModule.notifications.length - 1
)
.map(mapDelayToStringKey);

*/
}
}

function mapDelayToStringKey(
notification: Notification
): { [key: string]: any } {
let delayStringKey: { [key: string]: any } = {
CeptaStationID: notification.getDelay()?.getStationId(),
CeptaID: notification.getDelay()?.getTransportId(),
Delay: notification
.getDelay()
?.getDelay()
?.getDelta(),
Details: notification
.getDelay()
?.getDelay()
?.getDetails()
};
mapDelayToStringKey(notification: Notification): { [key: string]: any } {
let delayStringKey: { [key: string]: any } = {
CeptaStationID: notification.getDelay()?.getStationId(),
CeptaID: notification.getDelay()?.getTransportId(),
Delay: notification
.getDelay()
?.getDelay()
?.getDelta(),
Details: notification
.getDelay()
?.getDelay()
?.getDetails()
};

return delayStringKey;
return delayStringKey;
}
}
</script>

<style scoped lang="sass">
#mainContent
padding: 0

.form-control
width: 100%
margin-bottom: 20px
Expand Down
3 changes: 1 addition & 2 deletions anubis/src/components/Sidebar.vue
Original file line number Diff line number Diff line change
Expand Up @@ -239,11 +239,10 @@ export default class Sidebar extends Vue {
display: inline-block
top: 20%
height: 60%
padding-right: 18px

.logo-text
+theme(color, c-logo-text)
padding-left: 10px
padding-left: 23px

.mobile-toggle
display: none
Expand Down
15 changes: 12 additions & 3 deletions anubis/src/store/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Vue from "vue";
import Vuex, { StoreOptions } from "vuex";
import { Notification } from "../generated/protobuf/models/internal/notifications/notification_pb";
import { IAppState } from "./modules/app";
import { IAuthState } from "./modules/auth";
import { IReplayerState } from "./modules/replayer";
Expand All @@ -16,11 +17,19 @@ export interface IRootState {
// export default new Vuex.Store<IRootState>({
export default new Vuex.Store({
state: {
websocket: ""
notificationList: Array<Notification>()
},
mutations: {
setWebsocket(state: { websocket: String }, websocket: String) {
state.websocket = websocket;
addNotification(state, notification: Notification) {
if (state.notificationList.length > 100) {
state.notificationList = state.notificationList.slice(-99, -1);
}
state.notificationList = [notification, ...state.notificationList];
}
},
actions: {
addNotification(context, notification: Notification) {
context.commit("addNotification", notification);
}
}
});
5 changes: 0 additions & 5 deletions anubis/src/store/modules/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ class App extends VuexModule implements IAppState {
this.isLoading = loading;
}

@Mutation
public addNotification(notification: Notification) {
this.notifications.push(notification);
}

@Mutation
public toggleTheme() {
this.theme = (this.theme + 1) % this.availableThemes.length;
Expand Down
2 changes: 1 addition & 1 deletion anubis/src/store/modules/notifications.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class Notifications extends VuexModule implements NotificationsState {
let deserializedEvent = Notification.deserializeBinary(
new Uint8Array(event.data)
);
AppModule.addNotification(deserializedEvent);
store.dispatch("addNotification", deserializedEvent);
}

@Mutation
Expand Down
8 changes: 7 additions & 1 deletion anubis/src/views/Dashboard.vue
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
layoutStyle="col-md-12"
>
<p>You can use the navigation bar to filter</p>
<delays-table-view></delays-table-view>
<delays-table-view class="delay-table"></delays-table-view>
<div class="view-all">
<router-link :to="{ name: 'map' }">
<div class="btn">
Expand Down Expand Up @@ -134,4 +134,10 @@ export default class Dashboard extends Vue {}

&.metric-value
font-size: 2.5rem


.delay-table
padding-right: 10px
height: 500px
overflow-y: scroll !important
</style>
20 changes: 10 additions & 10 deletions core/src/main/java/org/bptlab/cepta/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,16 @@ public class Main implements Callable<Integer> {
private FlinkKafkaProducer011<NotificationOuterClass.Notification> trainDelayNotificationProducer;

private void setupConsumers() {
this.liveTrainDataConsumer =
new FlinkKafkaConsumer011<>(
Topic.LIVE_TRAIN_DATA.getValueDescriptor().getName(),
new GenericBinaryProtoDeserializer<EventOuterClass.Event>(EventOuterClass.Event.class),
new KafkaConfig().withClientId("LiveTrainDataMainConsumer").getProperties());
this.plannedTrainDataConsumer =
new FlinkKafkaConsumer011<>(
Topic.PLANNED_TRAIN_DATA.getValueDescriptor().getName(),
new GenericBinaryProtoDeserializer<EventOuterClass.Event>(EventOuterClass.Event.class),
new KafkaConfig().withClientId("PlannedTrainDataMainConsumer").getProperties());
this.liveTrainDataConsumer =
new FlinkKafkaConsumer011<>(
Topic.LIVE_TRAIN_DATA.getValueDescriptor().getName(),
new GenericBinaryProtoDeserializer<EventOuterClass.Event>(EventOuterClass.Event.class),
new KafkaConfig().withClientId("LiveTrainDataMainConsumer").getProperties());
this.plannedTrainDataConsumer =
new FlinkKafkaConsumer011<>(
Topic.PLANNED_TRAIN_DATA.getValueDescriptor().getName(),
new GenericBinaryProtoDeserializer<EventOuterClass.Event>(EventOuterClass.Event.class),
new KafkaConfig().withClientId("PlannedTrainDataMainConsumer").getProperties());

this.weatherDataConsumer =
new FlinkKafkaConsumer011<>(
Expand Down
3 changes: 2 additions & 1 deletion deployment/dev/compose/core.compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,13 @@ services:
REDIS_HOST: redis
REDIS_PORT: 6379
LOG: DEBUG
TESTING: "true"
ports:
- ${CEPTA_NOTIFICATION_WS_PORT}:5000
depends_on:
#- flink-jobmanager
#- flink-taskmanager
- kafka
#- kafkas
#- replayer
- redis
- usermgmt
Expand Down
78 changes: 48 additions & 30 deletions osiris/notification/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package main
import (
"context"
"fmt"
"github.com/bptlab/cepta/osiris/lib/utils"
"github.com/pkg/errors"
"io"
"net"
"net/http"
Expand All @@ -15,7 +13,9 @@ import (
"syscall"
"time"

"github.com/go-redis/redis"
"github.com/bptlab/cepta/osiris/lib/utils"
"github.com/pkg/errors"

"github.com/bptlab/cepta/ci/versioning"
topics "github.com/bptlab/cepta/models/constants/topic"
pb "github.com/bptlab/cepta/models/grpc/notification"
Expand All @@ -25,9 +25,10 @@ import (
"github.com/bptlab/cepta/models/internal/types/result"
"github.com/bptlab/cepta/models/internal/types/users"
libcli "github.com/bptlab/cepta/osiris/lib/cli"
libredis "github.com/bptlab/cepta/osiris/lib/redis"
kafkaconsumer "github.com/bptlab/cepta/osiris/lib/kafka/consumer"
libredis "github.com/bptlab/cepta/osiris/lib/redis"
"github.com/bptlab/cepta/osiris/notification/websocket"
"github.com/go-redis/redis"
clivalues "github.com/romnnn/flags4urfavecli/values"

"github.com/golang/protobuf/proto"
Expand All @@ -43,11 +44,13 @@ var Version string = "Unknown"
// BuildTime will be injected at build time
var BuildTime string = ""

var testing bool

var (
defaultLruSize = 1000 // Cache up to 1000 transports
defaultLruMaxEntryLength = 1000 // Cache up to 1000 subscribers per transport
defaultUserNotificationsBufferSize = int64(200) // Store the most recent 200 notifications for each user (not persistent)
defaultNotificationTopic = topics.Topic_DELAY_NOTIFICATIONS
defaultLruSize = 1000 // Cache up to 1000 transports
defaultLruMaxEntryLength = 1000 // Cache up to 1000 subscribers per transport
defaultUserNotificationsBufferSize = int64(200) // Store the most recent 200 notifications for each user (not persistent)
defaultNotificationTopic = topics.Topic_DELAY_NOTIFICATIONS
)

// Endpoint ...
Expand All @@ -69,7 +72,7 @@ type NotificationServer struct {
transportCache *lru.Cache
Pool *websocket.Pool

usermgmtConn *grpc.ClientConn
usermgmtConn *grpc.ClientConn
usermgmtClient usermgmtpb.UserManagementClient
grpcServer *grpc.Server
wsServer *http.Server
Expand All @@ -78,26 +81,26 @@ type NotificationServer struct {
kc *kafkaconsumer.Consumer

redisConfig libredis.Config
rclient *redis.Client
rclient *redis.Client

usermgmtEndpoint Endpoint

LruSize int
LruMaxEntryLength int
LruSize int
LruMaxEntryLength int
UserNotificationsBufferSize int64
NotificationTopic topics.Topic
NotificationTopic topics.Topic
}

// NewNotificationServer ...
func NewNotificationServer(kafkaConfig kafkaconsumer.Config, redisConfig libredis.Config) NotificationServer {
return NotificationServer{
kafkacConfig: kafkaConfig,
redisConfig: redisConfig,
redisConfig: redisConfig,
// Use defaults
LruSize: defaultLruSize,
LruMaxEntryLength: defaultLruMaxEntryLength,
LruSize: defaultLruSize,
LruMaxEntryLength: defaultLruMaxEntryLength,
UserNotificationsBufferSize: defaultUserNotificationsBufferSize,
NotificationTopic: defaultNotificationTopic,
NotificationTopic: defaultNotificationTopic,
}
}

Expand All @@ -118,7 +121,7 @@ func (s *NotificationServer) Shutdown() {
}
if s.wsServer != nil {
log.Info("Stopping websocket server")
if err := s.wsServer. Shutdown(context.TODO()); err != nil {
if err := s.wsServer.Shutdown(context.TODO()); err != nil {
log.Warnf("Failed to close websocket server cleanly: %v", err)
}
}
Expand Down Expand Up @@ -274,18 +277,26 @@ func (s *NotificationServer) serveWebsocket(pool *websocket.Pool, w http.Respons
}

pool.Register <- client
client.Read()
}

func (s *NotificationServer) handleKafkaMessages(ctx context.Context) {
noopTicker := time.NewTicker(time.Second * 10)
// Send pings to client with this period. Must be less than pongWait on the client.
const pingPeriod = time.Second * 2
noopTicker := time.NewTicker(pingPeriod)
subscriberDone := make(chan bool, 1)
stopSubscriber := make(chan bool, 1)
go func() {
defer func() { subscriberDone <- true }()
defer func() {
stopSubscriber <- true
noopTicker.Stop()
}()
for {
select {

case <-noopTicker.C:
// Noop, may be used for periodic pings
log.Debug("ping")
s.Pool.Ping <- "ping"
break
case msg := <-s.kc.Messages:
var notification notificationpb.Notification
err := proto.Unmarshal(msg.Value, &notification)
Expand All @@ -295,7 +306,7 @@ func (s *NotificationServer) handleKafkaMessages(ctx context.Context) {
log.Debugf("Received notification: %v (%v)", notification, msg.Timestamp)

// Set occurrence time as a best effort
if occurred := notification.GetOccurred(); occurred.GetSeconds() + int64(occurred.GetNanos()) < 1 && !msg.Timestamp.IsZero() {
if occurred := notification.GetOccurred(); occurred.GetSeconds()+int64(occurred.GetNanos()) < 1 && !msg.Timestamp.IsZero() {
if occurredProto, err := utils.ToProtoTime(msg.Timestamp); err != nil {
notification.Occurred = occurredProto
}
Expand All @@ -304,20 +315,20 @@ func (s *NotificationServer) handleKafkaMessages(ctx context.Context) {
switch notification.GetNotification().(type) {
case *notificationpb.Notification_Delay:
// Just for testing porpuses
// s.broadcast(&notification)
// Users need to be assigned to transports to do this

if err := s.notifySubscribersForTransport(ctx, notification.GetDelay().GetTransportId(), &notification); err != nil {
log.Errorf("Failed to notify subscribers of transport %v: %v", notification.GetDelay().GetTransportId(), err)
if testing {
s.broadcast(&notification)
} else {
// Users need to be assigned to transports to do this
if err := s.notifySubscribersForTransport(ctx, notification.GetDelay().GetTransportId(), &notification); err != nil {
log.Errorf("Failed to notify subscribers of transport %v: %v", notification.GetDelay().GetTransportId(), err)
}
}
break
case *notificationpb.Notification_System:
s.broadcast(&notification)
break
}
break
case <-noopTicker.C:
// Noop, may be used for periodic pings
case <-stopSubscriber:
return
}
Expand Down Expand Up @@ -458,6 +469,12 @@ func main() {
EnvVars: []string{"NOTIFICATIONS_TOPIC"},
Usage: "topic to subscribe for notifications",
},
&cli.BoolFlag{
Name: "testing",
Value: false,
EnvVars: []string{"TESTING"},
Usage: "notification service broadcasts all messages for testing purposes",
},
}...)

app := &cli.App{
Expand All @@ -472,6 +489,7 @@ func main() {
level = log.InfoLevel
}
log.SetLevel(level)
testing = ctx.Bool("testing")

server := NewNotificationServer(kafkaconsumer.Config{}.ParseCli(ctx), libredis.Config{}.ParseCli(ctx))
server.usermgmtEndpoint = Endpoint{Host: ctx.String("usermgmt-host"), Port: ctx.Int("usermgmt-port")}
Expand Down
Loading