Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

287 connect all operators #292

Merged
merged 23 commits into from
May 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7931efd
[REF] Changed Delay from Double to Long
skydivin4ng3l May 16, 2020
3a57df4
Merge remote-tracking branch 'origin/277-pattern-redbull' into 287-co…
skydivin4ng3l May 16, 2020
b1c1044
[FIX] Bazel Synch: Commented Out WIP Code
skydivin4ng3l May 16, 2020
e53bd11
[FEAT] Optional Index creation for DataToMongo Operator
skydivin4ng3l May 16, 2020
3b9fb94
Merge branch '261-Postgres-Correlation' into 287-connect-all-operators
skydivin4ng3l May 19, 2020
36fbb1d
[REF] WeatherJoin now uses NotifactionHelper
skydivin4ng3l May 19, 2020
3a6484a
[FEAT] Implementation of WeatherLocationCorrelationMongoFunction
skydivin4ng3l May 19, 2020
237d6ad
[FIX] Configured WeatherData for correct replay
skydivin4ng3l May 19, 2020
d3d94aa
[FEAT] Adds Apache.commons.text for CaseUtil
skydivin4ng3l May 19, 2020
84bd12c
[CHANGE] ProtoToBson attributenames->lowerCamelCase instead lower_under
skydivin4ng3l May 19, 2020
539955c
[FEAT] Adds IndexContainers
skydivin4ng3l May 19, 2020
69aaa50
[FIX] Changes stationId from Integer to Long in <weather,stationId>
skydivin4ng3l May 19, 2020
d345b30
[FIX] Indexing for DataToMongoDB
skydivin4ng3l May 19, 2020
26dbcca
[FIX] correct Query and Long FIX for StationId
skydivin4ng3l May 19, 2020
5002290
[FEAT] adds Indexing for MongoToDB
skydivin4ng3l May 19, 2020
8e57695
[FIX] lower_underscore to lowerCamelCase change
skydivin4ng3l May 19, 2020
12f9aa5
[FEAT] Adds Mongo Indexing, Change Weather processing to Mongo
skydivin4ng3l May 19, 2020
492618d
[FIX] Weather Test Integer->Long
skydivin4ng3l May 19, 2020
76fb506
Merge remote-tracking branch 'origin/review-demo' into 287-connect-al…
skydivin4ng3l May 20, 2020
3657fe5
[FIX] LivePlannedCorrelation test based on Postgre Int->bigint/Long
skydivin4ng3l May 20, 2020
4d0bec8
[FIX] Postgre Fix int->bigint/long
skydivin4ng3l May 26, 2020
064a53e
Merge in dev
May 26, 2020
b09d926
revert merge
May 26, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ load("@io_grpc_grpc_java//:repositories.bzl", "IO_GRPC_GRPC_JAVA_OVERRIDE_TARGET
maven_install(
artifacts = [
"org.apache.commons:commons-lang3:3.9",
"org.apache.commons:commons-text:1.8",
"org.javatuples:javatuples:1.2",
"junit:junit:4.13",
"org.testcontainers:testcontainers:1.14.1",
Expand Down
2 changes: 1 addition & 1 deletion auxiliary/producers/replayer/replayers.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (s *ReplayerServer) Setup(ctx context.Context) error {
SourceName: "weather",
Extractor: extractors.NewMongoExtractor(s.mongo, func(event proto.Message) *eventpb.Event {
return &eventpb.Event{Event: &eventpb.Event_Weather{Weather: event.(*weatherpb.WeatherData)}}
}, &weatherpb.WeatherData{}, setSortAndID("identifier", "identifier")),
}, &weatherpb.WeatherData{}, setSortAndID("detectionTime", "eventClass")),
Topic: topics.Topic_WEATHER_DATA,
}

Expand Down
1 change: 1 addition & 0 deletions core/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ MAIN_DEPS = [
"//core/src/main/java/org/bptlab/cepta/serialization:java_default_library",
"//core/src/main/java/org/bptlab/cepta/utils/types:java_default_library",
"//core/src/main/java/org/bptlab/cepta/utils/functions:java_default_library",
"//core/src/main/java/org/bptlab/cepta/utils/database:java_default_library",
"//models/constants:topic_java_proto",
"//models/events:live_train_data_java_proto",
"//models/events:count_of_trains_at_station_event_java_proto",
Expand Down
47 changes: 19 additions & 28 deletions core/src/main/java/org/bptlab/cepta/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

package org.bptlab.cepta;

import java.util.Optional;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.FlatMapFunction;
Expand Down Expand Up @@ -53,6 +53,8 @@
import org.bptlab.cepta.patterns.StaysInStationPattern;
import org.bptlab.cepta.serialization.GenericBinaryProtoDeserializer;
import org.bptlab.cepta.serialization.GenericBinaryProtoSerializer;
import org.bptlab.cepta.utils.database.Mongo;
import org.bptlab.cepta.utils.database.Mongo.IndexContainer;
import org.bptlab.cepta.utils.functions.StreamUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -71,7 +73,6 @@
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

@Command(
name = "cepta core",
Expand Down Expand Up @@ -221,34 +222,40 @@ public LocationData map(Event event) throws Exception{
* ++++++++++++++++++++++++
* Begin - Weather/Locations
* ------------------------*/
locationDataStream.map(new DataToPostgresDatabase<LocationData>("location",postgresConfig));
//Instruct to create an Index to increase performance of queries after Insertion
List<IndexContainer> locationIndex = Mongo.makeIndexContainerList(Arrays.asList("latitude","longitude"));

DataStream<Tuple2<WeatherData, Integer>> weatherLocationStream = AsyncDataStream
.unorderedWait(weatherDataStream, new WeatherLocationCorrelationFunction(postgresConfig),
//The Stream is not necessary it passes through all events independent from a successful upload
DataStream<LocationData> uploadedLocationStream = AsyncDataStream
.unorderedWait(locationDataStream, new DataToMongoDB<LocationData>("location",locationIndex,mongoConfig),
100000, TimeUnit.MILLISECONDS, 1);

DataStream<Tuple2<WeatherData, Long>> weatherLocationStream = AsyncDataStream
.unorderedWait(weatherDataStream, new WeatherLocationCorrelationMongoFunction("location",mongoConfig),
100000, TimeUnit.MILLISECONDS, 1);

//this is a bit weird compared to the other operators
DataStream<NotificationOuterClass.Notification> delayFromWeatherStream = WeatherLiveTrainJoinFunction.delayFromWeather(weatherLocationStream,liveTrainDataStream);

delayFromWeatherStream.addSink(trainDelayNotificationProducer);

// delayFromWeatherStream.print();
/*-------------------------
* End - Weather
* ++++++++++++++++++++++++
* Begin - MongoDelayShift
* ------------------------*/

List<IndexContainer> plannedTrainDataIndex = Mongo.makeIndexContainerList(Arrays.asList("trainSectionId","endStationId","plannedArrivalTimeEndStation","plannedEventTime"));
//The Stream is not necessary it passes through all events independent from a successful upload
DataStream<PlannedTrainData> plannedTrainDataStreamUploaded = AsyncDataStream
.unorderedWait(plannedTrainDataStream, new DataToMongoDB("plannedTrainData", mongoConfig),
.unorderedWait(plannedTrainDataStream, new DataToMongoDB("plannedTrainData",plannedTrainDataIndex, mongoConfig),
100000, TimeUnit.MILLISECONDS, 1);

DataStream<Notification> notificationFromDelayShift = AsyncDataStream
.unorderedWait(liveTrainDataStream, new DelayShiftFunctionMongo(mongoConfig),
100000, TimeUnit.MILLISECONDS, 1);

notificationFromDelayShift.addSink(trainDelayNotificationProducer);
// notificationFromDelayShift.print();
notificationFromDelayShift.addSink(trainDelayNotificationProducer);
/*-------------------------
* End - MongoDelayShift
* ++++++++++++++++++++++++
Expand All @@ -267,9 +274,7 @@ public LocationData map(Event event) throws Exception{
* Begin - CountOfTrainsAtStation
* ------------------------*/

DataStream<NotificationOuterClass.Notification> delayShiftNotifications = AsyncDataStream
.unorderedWait(liveTrainDataStream, new DelayShiftFunction(postgresConfig),
100000, TimeUnit.MILLISECONDS, 1);
DataStream<CountOfTrainsAtStationEvent> countOfTrainsAtStationDataStream = CountOfTrainsAtStationFunction.countOfTrainsAtStation(liveTrainDataStream);

// countOfTrainsAtStationDataStream.print();

Expand All @@ -283,16 +288,6 @@ public LocationData map(Event event) throws Exception{
.unorderedWait(liveTrainDataStream, new LivePlannedCorrelationFunctionMongo( mongoConfig),
100000, TimeUnit.MILLISECONDS, 1);

// LivePlannedCorrelationFunction Postgre
//TODO!!
//This might be Very Slot, maybe Too slow for LivePlannedCorrelationFunction!!
// plannedTrainDataStream.map(new DataToPostgresDatabase<PlannedTrainData>("planned",postgresConfig));

// DataStream<Tuple2<LiveTrainData, PlannedTrainData>> matchedLivePlannedStream =
// AsyncDataStream
// .unorderedWait(liveTrainDataStream, new LivePlannedCorrelationFunction(postgresConfig),
// 100000, TimeUnit.MILLISECONDS, 1);

// DetectStationArrivalDelay
DataStream<NotificationOuterClass.Notification> trainDelayNotificationDataStream = matchedLivePlannedStream
.process(new DetectStationArrivalDelay()).name("train-delays");
Expand All @@ -312,21 +307,17 @@ public LocationData map(Event event) throws Exception{
patternStream.process(NoMatchingPlannedTrainDataPattern.generateNMPTDEventsFunc());

//TODO add consumer for these Events

/*-------------------------
* End - matchedLivePlanned
* ++++++++++++++++++++++++
* Begin - SumOfDelaysAtStation
* ------------------------*/
//TODO Discuss Has this to be this way?
SumOfDelayAtStationFunction sumOfDelayAtStationFunction = new SumOfDelayAtStationFunction();
//TODO Decided about input (Stream and events Notification VS DelayNotification) and Window

int sumOfDelayWindow = 4;
DataStream<Tuple2<Long, Double>> sumOfDelayAtStationStream = sumOfDelayAtStationFunction.SumOfDelayAtStation(trainDelayNotificationDataStream, sumOfDelayWindow );
DataStream<Tuple2<Long, Long>> sumOfDelayAtStationStream = SumOfDelayAtStationFunction.sumOfDelayAtStation(trainDelayNotificationDataStream, sumOfDelayWindow );

//TODO Make Sink/Producer

// sumOfDelayAtStationStream.print();
/*-------------------------
* End - SumOfDelaysAtStation
* ++++++++++++++++++++++++
Expand Down
42 changes: 38 additions & 4 deletions core/src/main/java/org/bptlab/cepta/operators/DataToMongoDB.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.bptlab.cepta.operators;

import com.google.protobuf.Message;
import com.mongodb.client.model.Indexes;
import com.mongodb.client.result.InsertOneResult;
import com.mongodb.reactivestreams.client.MongoClient;

Expand All @@ -14,16 +15,13 @@
import org.apache.flink.streaming.api.functions.async.ResultFuture;

import org.bptlab.cepta.config.MongoConfig;
import org.bptlab.cepta.utils.database.Util;
import org.bptlab.cepta.utils.database.Util.ProtoKeyValues;

import org.bson.Document;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Supplier;

import org.slf4j.Logger;
Expand All @@ -40,16 +38,31 @@ public class DataToMongoDB<T extends Message> extends RichAsyncFunction<T, T> {
private MongoConfig mongoConfig = new MongoConfig();
private transient MongoClient mongoClient;
private final Logger log = LoggerFactory.getLogger(DataToMongoDB.class);
private ArrayList<Mongo.IndexContainer> indices = new ArrayList<>();

public DataToMongoDB(String collection_name, MongoConfig mongoConfig){
this.collection_name = collection_name;
this.mongoConfig = mongoConfig;
}

public DataToMongoDB(String collection_name, List<Mongo.IndexContainer> createIndexFor, MongoConfig mongoConfig){
this.collection_name = collection_name;
this.mongoConfig = mongoConfig;
this.indices.addAll(createIndexFor);
}

@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception{
super.open(parameters);
this.mongoClient = Mongo.getMongoClient(mongoConfig);
try{
if (!indices.isEmpty()) {
createIndices();
}
} catch (NullPointerException e) {
//no indexing provided
}

log.info("Mongo Connection established");
}

Expand Down Expand Up @@ -101,4 +114,25 @@ public Boolean get() {
}
}

private void createIndices(){
MongoDatabase database = mongoClient.getDatabase(mongoConfig.getName());
MongoCollection<Document> collection = database.getCollection(collection_name);
SubscriberHelpers.OperationSubscriber<String> indexSubscriber = new SubscriberHelpers.OperationSubscriber<>();
for (Mongo.IndexContainer index : indices) {
switch (index.getOrderIndicator()){
default:
collection.createIndex(Indexes.ascending(index.getIndexAttributeNameOrCompound())).subscribe(indexSubscriber);
collection.createIndex(Indexes.descending(index.getIndexAttributeNameOrCompound())).subscribe(indexSubscriber);
break;
case 1:
collection.createIndex(Indexes.ascending(index.getIndexAttributeNameOrCompound())).subscribe(indexSubscriber);
break;
case -1:
collection.createIndex(Indexes.descending(index.getIndexAttributeNameOrCompound())).subscribe(indexSubscriber);
break;
}
}
//TODO acknowledge?
indexSubscriber.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,11 @@ the resultFuture is where the outgoing element(s) will be
SubscriberHelpers.OperationSubscriber<Document> findMultipleSubscriber = new SubscriberHelpers.OperationSubscriber<>();
plannedTrainDataCollection.find(
and(
eq("train_section_id",dataset.getTrainSectionId()),
eq("end_station_id",dataset.getEndStationId()),
eq("planned_arrival_time_end_station",dataset.getPlannedArrivalTimeEndStation())
eq("trainSectionId",dataset.getTrainSectionId()),
eq("endStationId",dataset.getEndStationId()),
eq("plannedArrivalTimeEndStation",dataset.getPlannedArrivalTimeEndStation())
)
).sort(ascending("planned_event_time")).subscribe(findMultipleSubscriber);
).sort(ascending("plannedEventTime")).subscribe(findMultipleSubscriber);

CompletableFuture<Void> queryFuture = CompletableFuture.supplyAsync(new Supplier<List<Document>>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class SumOfDelayAtStationFunction {
The window is a fixed event number window.
It will return a Stream of Tuple2 with the location Id and the sum of delay.
*/
public DataStream<Tuple2<Long, Double>> SumOfDelayAtStation(DataStream<NotificationOuterClass.Notification> inputStream, int windowSize) {
DataStream<Tuple2<Long, Double>> resultStream = inputStream
public static DataStream<Tuple2<Long, Long>> sumOfDelayAtStation(DataStream<NotificationOuterClass.Notification> inputStream, int windowSize) {
DataStream<Tuple2<Long, Long>> resultStream = inputStream
.keyBy(
new KeySelector<NotificationOuterClass.Notification, Integer>(){
Integer key = 0;
Expand All @@ -44,30 +44,31 @@ public Integer getKey(NotificationOuterClass.Notification event){
);
return resultStream;
};

public static ProcessWindowFunction<NotificationOuterClass.Notification, Tuple2<Long, Double>, Integer, GlobalWindow> sumOfDelayAtStationWindowProcessFunction() {
return new ProcessWindowFunction<NotificationOuterClass.Notification, Tuple2<Long, Double>, Integer, GlobalWindow>() {
//TODO Change Delay to Long
public static ProcessWindowFunction<NotificationOuterClass.Notification, Tuple2<Long, Long>, Integer, GlobalWindow> sumOfDelayAtStationWindowProcessFunction() {
return new ProcessWindowFunction<NotificationOuterClass.Notification, Tuple2<Long, Long>, Integer, GlobalWindow>() {
@Override
public void process(Integer key, Context context, Iterable<NotificationOuterClass.Notification> input, Collector<Tuple2<Long, Double>> out) throws Exception {
HashMap<Long, Double> sums = new HashMap<Long, Double>();
public void process(Integer key, Context context, Iterable<NotificationOuterClass.Notification> input, Collector<Tuple2<Long, Long>> out) throws Exception {
//TODO CHANGE this to use Flink State eg. MapState
HashMap<Long, Long> sums = new HashMap<Long, Long>();
for (NotificationOuterClass.Notification outer_in: input) {
NotificationOuterClass.DelayNotification in = outer_in.getDelay();
Long trainId = Long.valueOf(in.getTransportId().getId());
Long locationId = Long.valueOf(in.getStationId().getId());
Double delay = (double) in.getDelay().getDelta().getSeconds();
Long delay = in.getDelay().getDelta().getSeconds();
if (!sums.containsKey(locationId)) {
sums.put(locationId, delay);
} else {
double tmp;
Long tmp;
tmp = sums.get(locationId);
sums.replace(locationId, (tmp + delay));
}
}

for (Long location: sums.keySet()) {

Double delay = sums.get(location);
out.collect(new Tuple2<Long, Double>(location, delay) );
Long delay = sums.get(location);
out.collect(new Tuple2<Long, Long>(location, delay) );
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,44 +12,44 @@
import org.bptlab.cepta.models.internal.notifications.notification.NotificationOuterClass;
import org.bptlab.cepta.models.events.train.LiveTrainDataOuterClass.LiveTrainData;
import org.bptlab.cepta.models.internal.types.ids.Ids;
import org.bptlab.cepta.utils.notification.NotificationHelper;

public class WeatherLiveTrainJoinFunction {
public static DataStream<NotificationOuterClass.Notification> delayFromWeather(DataStream<Tuple2<WeatherData, Integer>> weather, DataStream<LiveTrainData> train){
public static DataStream<NotificationOuterClass.Notification> delayFromWeather(DataStream<Tuple2<WeatherData, Long>> weather, DataStream<LiveTrainData> train){
return weather.join(train)
.where(new KeySelector<Tuple2<WeatherData, Integer>, Integer>() {
.where(new KeySelector<Tuple2<WeatherData, Long>, Long>() {
@Override
public Integer getKey(Tuple2<WeatherData, Integer> weatherDataIntegerTuple2) throws Exception {
public Long getKey(Tuple2<WeatherData, Long> weatherDataIntegerTuple2) throws Exception {
return weatherDataIntegerTuple2.f1;
}
}).equalTo(new KeySelector<LiveTrainData, Integer>() {
}).equalTo(new KeySelector<LiveTrainData, Long>() {
@Override
public Integer getKey(LiveTrainData liveTrainData) throws Exception {
return (int) liveTrainData.getStationId();
public Long getKey(LiveTrainData liveTrainData) throws Exception {
return liveTrainData.getStationId();
}
})
.window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(60)))
.apply(new RichJoinFunction<Tuple2<WeatherData, Integer>, LiveTrainData, NotificationOuterClass.Notification>() {
.apply(new RichJoinFunction<Tuple2<WeatherData, Long>, LiveTrainData, NotificationOuterClass.Notification>() {
@Override
public NotificationOuterClass.Notification join(Tuple2<WeatherData, Integer> weatherDataIntegerTuple2,
public NotificationOuterClass.Notification join(Tuple2<WeatherData, Long> weatherDataIntegerTuple2,
LiveTrainData liveTrainData) throws Exception {
return NotificationOuterClass.Notification.newBuilder().setDelay(
NotificationOuterClass.DelayNotification.newBuilder()
.setDelay(DelayOuterClass.Delay.newBuilder().setDelta(delayFromWeather(weatherDataIntegerTuple2.f0)).build())
.setStationId(Ids.CeptaStationID.newBuilder().setId(String.valueOf(liveTrainData.getStationId())).build())
.build()
).build();
return NotificationHelper.getTrainDelayNotificationFrom(
String.valueOf(liveTrainData.getTrainSectionId()),
delayFromWeather(weatherDataIntegerTuple2.f0),
"Delay caused by weather: "+weatherDataIntegerTuple2.f0.getEventClass().toString(),
liveTrainData.getStationId());
}
});
}

private static Duration delayFromWeather(WeatherData weather){
private static Long delayFromWeather(WeatherData weather){
String eventClass = weather.getEventClass().toString();
int delay;
Long delayedSeconds;
switch (eventClass){
case "Clear_night": delay = 0; break;
case "rain": delay = 10; break;
default: delay = 0;
case "Clear_night": delayedSeconds = 0L; break;
case "rain": delayedSeconds = 600L; break;
default: delayedSeconds = 0L;
}
return Duration.newBuilder().setSeconds((long) delay).build();
return delayedSeconds;
}
}
Loading