From 7931efd4ddd6e3db44887710e0b88e41ca0f41cf Mon Sep 17 00:00:00 2001 From: Christopher Date: Sat, 16 May 2020 13:09:41 +0200 Subject: [PATCH 01/19] [REF] Changed Delay from Double to Long --- core/src/main/java/org/bptlab/cepta/Main.java | 6 ++--- .../SumOfDelayAtStationFunction.java | 23 ++++++++++--------- .../java/org/bptlab/cepta/providers/BUILD | 1 + .../SumOfDelayAtStationTests.java | 16 ++++++------- 4 files changed, 24 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/bptlab/cepta/Main.java b/core/src/main/java/org/bptlab/cepta/Main.java index 3ca52df1..6b8390ca 100644 --- a/core/src/main/java/org/bptlab/cepta/Main.java +++ b/core/src/main/java/org/bptlab/cepta/Main.java @@ -315,14 +315,14 @@ public LocationData map(Event event) throws Exception{ * Begin - SumOfDelaysAtStation * ------------------------*/ //TODO Discuss Has this to be this way? - SumOfDelayAtStationFunction sumOfDelayAtStationFunction = new SumOfDelayAtStationFunction(); +// SumOfDelayAtStationFunction sumOfDelayAtStationFunction = new SumOfDelayAtStationFunction(); //TODO Decided about input (Stream and events Notification VS DelayNotification) and Window int sumOfDelayWindow = 4; - DataStream> sumOfDelayAtStationStream = sumOfDelayAtStationFunction.SumOfDelayAtStation(trainDelayNotificationDataStream, sumOfDelayWindow ); + DataStream> sumOfDelayAtStationStream = SumOfDelayAtStationFunction.sumOfDelayAtStation(trainDelayNotificationDataStream, sumOfDelayWindow ); //TODO Make Sink/Producer - + sumOfDelayAtStationStream.print(); /*------------------------- * End - SumOfDelaysAtStation * ++++++++++++++++++++++++ diff --git a/core/src/main/java/org/bptlab/cepta/operators/SumOfDelayAtStationFunction.java b/core/src/main/java/org/bptlab/cepta/operators/SumOfDelayAtStationFunction.java index c6c10f41..7ab5856d 100644 --- a/core/src/main/java/org/bptlab/cepta/operators/SumOfDelayAtStationFunction.java +++ b/core/src/main/java/org/bptlab/cepta/operators/SumOfDelayAtStationFunction.java @@ -25,8 +25,8 @@ public class SumOfDelayAtStationFunction { The window is a fixed event number window. It will return a Stream of Tuple2 with the location Id and the sum of delay. */ - public DataStream> SumOfDelayAtStation(DataStream inputStream, int windowSize) { - DataStream> resultStream = inputStream + public static DataStream> sumOfDelayAtStation(DataStream inputStream, int windowSize) { + DataStream> resultStream = inputStream .keyBy( new KeySelector(){ Integer key = 0; @@ -44,21 +44,22 @@ public Integer getKey(NotificationOuterClass.Notification event){ ); return resultStream; }; - - public static ProcessWindowFunction, Integer, GlobalWindow> sumOfDelayAtStationWindowProcessFunction() { - return new ProcessWindowFunction, Integer, GlobalWindow>() { + //TODO Change Delay to Long + public static ProcessWindowFunction, Integer, GlobalWindow> sumOfDelayAtStationWindowProcessFunction() { + return new ProcessWindowFunction, Integer, GlobalWindow>() { @Override - public void process(Integer key, Context context, Iterable input, Collector> out) throws Exception { - HashMap sums = new HashMap(); + public void process(Integer key, Context context, Iterable input, Collector> out) throws Exception { + //TODO CHANGE this to use Flink State eg. MapState + HashMap sums = new HashMap(); for (NotificationOuterClass.Notification outer_in: input) { NotificationOuterClass.DelayNotification in = outer_in.getDelay(); Long trainId = Long.valueOf(in.getTransportId().getId()); Long locationId = Long.valueOf(in.getStationId().getId()); - Double delay = (double) in.getDelay().getDelta().getSeconds(); + Long delay = in.getDelay().getDelta().getSeconds(); if (!sums.containsKey(locationId)) { sums.put(locationId, delay); } else { - double tmp; + Long tmp; tmp = sums.get(locationId); sums.replace(locationId, (tmp + delay)); } @@ -66,8 +67,8 @@ public void process(Integer key, Context context, Iterable(location, delay) ); + Long delay = sums.get(location); + out.collect(new Tuple2(location, delay) ); } } }; diff --git a/core/src/test/java/org/bptlab/cepta/providers/BUILD b/core/src/test/java/org/bptlab/cepta/providers/BUILD index 19d6b4cd..7848d2b8 100644 --- a/core/src/test/java/org/bptlab/cepta/providers/BUILD +++ b/core/src/test/java/org/bptlab/cepta/providers/BUILD @@ -5,6 +5,7 @@ java_library( srcs = glob(["*.java"]), visibility = ["//visibility:public"], deps = [ + "//core/src/main/java/org/bptlab/cepta/utils/notification:java_default_library", "//core/src/test/java/org/bptlab/cepta/containers:java_default_library", "//models/events:live_train_data_java_proto", "//models/events:planned_train_data_java_proto", diff --git a/core/src/test/java/org/bptlab/cepta/sum-of-delay-at-station/SumOfDelayAtStationTests.java b/core/src/test/java/org/bptlab/cepta/sum-of-delay-at-station/SumOfDelayAtStationTests.java index 8eedbc33..26e9f886 100644 --- a/core/src/test/java/org/bptlab/cepta/sum-of-delay-at-station/SumOfDelayAtStationTests.java +++ b/core/src/test/java/org/bptlab/cepta/sum-of-delay-at-station/SumOfDelayAtStationTests.java @@ -24,10 +24,10 @@ public void TestSumOfDelaysAtStation() throws IOException { boolean pass = true; Long expectedStation1 = 1L; Long expectedStation2 = 2L; - Double expectedDelayAtStation1 = 25.0; - Double expectedDelayAtStation2 = 13.0; + Long expectedDelayAtStation1 = 25L; + Long expectedDelayAtStation2 = 13L; - SumOfDelayAtStationFunction sumOfDelayAtStationFunction = new SumOfDelayAtStationFunction(); +// SumOfDelayAtStationFunction sumOfDelayAtStationFunction = new SumOfDelayAtStationFunction(); // the provider provides four TrainDelayNotification elements // element 1 has stationId 1, trainId 1, delay 10 // element 2 has stationId 2, trainId 2, delay 5 @@ -35,15 +35,15 @@ public void TestSumOfDelaysAtStation() throws IOException { // element 4 has stationId 2, trainId 1, delay 8 DataStream delayNotificationStream = TrainDelayNotificationDataProvider.NotificationDataStream(); - DataStream> locationAndDelayStream = sumOfDelayAtStationFunction.SumOfDelayAtStation(delayNotificationStream, 4); - ArrayList> locationAndDelayArray = new ArrayList<>(); - Iterator> iterator = DataStreamUtils.collect(locationAndDelayStream); + DataStream> locationAndDelayStream = SumOfDelayAtStationFunction.sumOfDelayAtStation(delayNotificationStream, 4); + ArrayList> locationAndDelayArray = new ArrayList<>(); + Iterator> iterator = DataStreamUtils.collect(locationAndDelayStream); while(iterator.hasNext()){ - Tuple2 tuple = iterator.next(); + Tuple2 tuple = iterator.next(); locationAndDelayArray.add(tuple); } - for (Tuple2 tuple : locationAndDelayArray) { + for (Tuple2 tuple : locationAndDelayArray) { if (tuple.f0.equals(expectedStation1)) { if (!tuple.f1.equals(expectedDelayAtStation1)) { pass = false; From b1c1044f700ddd63162640df08f38890f267eade Mon Sep 17 00:00:00 2001 From: Christopher Date: Sat, 16 May 2020 14:19:28 +0200 Subject: [PATCH 02/19] [FIX] Bazel Synch: Commented Out WIP Code --- .../bptlab/cepta/patterns/RBDelayPatternTest.java | 8 ++++---- .../cepta/providers/RBDelayPatternProvider.java | 6 +++--- .../bptlab/cepta/providers/ReplayerProvider.java | 15 +++++++++++++-- 3 files changed, 20 insertions(+), 9 deletions(-) diff --git a/core/src/test/java/org/bptlab/cepta/patterns/RBDelayPatternTest.java b/core/src/test/java/org/bptlab/cepta/patterns/RBDelayPatternTest.java index 9a63a64c..bb062957 100644 --- a/core/src/test/java/org/bptlab/cepta/patterns/RBDelayPatternTest.java +++ b/core/src/test/java/org/bptlab/cepta/patterns/RBDelayPatternTest.java @@ -30,9 +30,9 @@ private int countOfMatchesIn(DataStream input) throws Exception{ } */ - @Test - public void TestStaysInStationDirectly() throws Exception { - Assert.assertTrue(RBDelayPatternProvider.receiveEventsFromReplayer()); - } +// @Test +// public void TestStaysInStationDirectly() throws Exception { +// Assert.assertTrue(RBDelayPatternProvider.receiveEventsFromReplayer()); +// } } \ No newline at end of file diff --git a/core/src/test/java/org/bptlab/cepta/providers/RBDelayPatternProvider.java b/core/src/test/java/org/bptlab/cepta/providers/RBDelayPatternProvider.java index 1e90580c..06280a45 100644 --- a/core/src/test/java/org/bptlab/cepta/providers/RBDelayPatternProvider.java +++ b/core/src/test/java/org/bptlab/cepta/providers/RBDelayPatternProvider.java @@ -15,7 +15,7 @@ public class RBDelayPatternProvider { public static void getDataEvent() { - RedBullExampleProvider.getDataEvent(); +// RedBullExampleProvider.getDataEvent(); // return } @@ -33,13 +33,13 @@ public static DataStream receiveEventsFromReplayer(){ QueryOptions options = RedBullExampleProvider.getQueryOptions(); - DataStream eventStream = provider.query(options); +// DataStream eventStream = provider.query(options); java.lang.System.out.println("Successful querying"); java.lang.System.out.println(provider.query(options)); - return eventStream; + return null/*eventStream*/; /* diff --git a/core/src/test/java/org/bptlab/cepta/providers/ReplayerProvider.java b/core/src/test/java/org/bptlab/cepta/providers/ReplayerProvider.java index a919f90a..33a68792 100644 --- a/core/src/test/java/org/bptlab/cepta/providers/ReplayerProvider.java +++ b/core/src/test/java/org/bptlab/cepta/providers/ReplayerProvider.java @@ -41,9 +41,20 @@ public Iterator query(QueryOptions options) throws StatusRuntimeE */ // return blockingStub.query(options); + //TODO Replace Placeholder >>>> + Iterator I = new Iterator() { + @Override + public boolean hasNext() { + return false; + } - Iterator - + @Override + public ReplayedEvent next() { + return null; + } + }; + return I; + //TODO Replace Placeholder <<<< /* try { From e53bd114d11254a6a365f1581971db3d08492035 Mon Sep 17 00:00:00 2001 From: Christopher Date: Sat, 16 May 2020 15:50:56 +0200 Subject: [PATCH 03/19] [FEAT] Optional Index creation for DataToMongo Operator --- .../bptlab/cepta/operators/DataToMongoDB.java | 31 ++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/bptlab/cepta/operators/DataToMongoDB.java b/core/src/main/java/org/bptlab/cepta/operators/DataToMongoDB.java index a0cab8ed..b008350b 100644 --- a/core/src/main/java/org/bptlab/cepta/operators/DataToMongoDB.java +++ b/core/src/main/java/org/bptlab/cepta/operators/DataToMongoDB.java @@ -1,12 +1,14 @@ package org.bptlab.cepta.operators; import com.google.protobuf.Message; +import com.mongodb.client.model.Indexes; import com.mongodb.client.result.InsertOneResult; import com.mongodb.reactivestreams.client.MongoClient; import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; +import org.apache.flink.api.java.tuple.Tuple2; import org.bptlab.cepta.utils.database.Mongo; import org.bptlab.cepta.utils.database.mongohelper.SubscriberHelpers; @@ -14,16 +16,13 @@ import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.bptlab.cepta.config.MongoConfig; -import org.bptlab.cepta.utils.database.Util; -import org.bptlab.cepta.utils.database.Util.ProtoKeyValues; import org.bson.Document; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; import java.util.function.Supplier; import static org.bptlab.cepta.utils.database.Mongo.protoToBson; @@ -36,16 +35,26 @@ public class DataToMongoDB extends RichAsyncFunction { private String collection_name; private MongoConfig mongoConfig = new MongoConfig(); private transient MongoClient mongoClient; + private ArrayList> indices = new ArrayList<>(); public DataToMongoDB(String collection_name, MongoConfig mongoConfig){ this.collection_name = collection_name; this.mongoConfig = mongoConfig; } + public DataToMongoDB(String collection_name, List> createIndexFor, MongoConfig mongoConfig){ + this.collection_name = collection_name; + this.mongoConfig = mongoConfig; + this.indices.addAll(createIndexFor); + } + @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception{ super.open(parameters); this.mongoClient = Mongo.getMongoClient(mongoConfig); + if (!indices.isEmpty()) { + createIndices(); + } } @Override @@ -86,4 +95,18 @@ public Boolean get() { } } + private void createIndices(){ + MongoDatabase database = mongoClient.getDatabase(mongoConfig.getName()); + MongoCollection coll = database.getCollection(collection_name); + SubscriberHelpers.OperationSubscriber indexSubscriber = new SubscriberHelpers.OperationSubscriber<>(); + for (Tuple2 index : indices) { + if (index.f1 > 0){ + coll.createIndex(Indexes.ascending(index.f0)).subscribe(indexSubscriber); + } else { + coll.createIndex(Indexes.descending(index.f0)).subscribe(indexSubscriber); + } + } + //TODO acknowledge? + indexSubscriber.get(); + } } From 36fbb1dafed890fb39b2a500e405737f33765bd6 Mon Sep 17 00:00:00 2001 From: Christopher Date: Tue, 19 May 2020 15:14:44 +0200 Subject: [PATCH 04/19] [REF] WeatherJoin now uses NotifactionHelper --- .../WeatherLiveTrainJoinFunction.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/org/bptlab/cepta/operators/WeatherLiveTrainJoinFunction.java b/core/src/main/java/org/bptlab/cepta/operators/WeatherLiveTrainJoinFunction.java index 78edbcca..7747e920 100644 --- a/core/src/main/java/org/bptlab/cepta/operators/WeatherLiveTrainJoinFunction.java +++ b/core/src/main/java/org/bptlab/cepta/operators/WeatherLiveTrainJoinFunction.java @@ -12,6 +12,7 @@ import org.bptlab.cepta.models.internal.notifications.notification.NotificationOuterClass; import org.bptlab.cepta.models.events.train.LiveTrainDataOuterClass.LiveTrainData; import org.bptlab.cepta.models.internal.types.ids.Ids; +import org.bptlab.cepta.utils.notification.NotificationHelper; public class WeatherLiveTrainJoinFunction { public static DataStream delayFromWeather(DataStream> weather, DataStream train){ @@ -32,24 +33,23 @@ public Integer getKey(LiveTrainData liveTrainData) throws Exception { @Override public NotificationOuterClass.Notification join(Tuple2 weatherDataIntegerTuple2, LiveTrainData liveTrainData) throws Exception { - return NotificationOuterClass.Notification.newBuilder().setDelay( - NotificationOuterClass.DelayNotification.newBuilder() - .setDelay(DelayOuterClass.Delay.newBuilder().setDelta(delayFromWeather(weatherDataIntegerTuple2.f0)).build()) - .setStationId(Ids.CeptaStationID.newBuilder().setId(String.valueOf(liveTrainData.getStationId())).build()) - .build() - ).build(); + return NotificationHelper.getTrainDelayNotificationFrom( + String.valueOf(liveTrainData.getTrainSectionId()), + delayFromWeather(weatherDataIntegerTuple2.f0), + "Delay caused by weather: "+weatherDataIntegerTuple2.f0.getEventClass().toString(), + liveTrainData.getStationId()); } }); } - private static Duration delayFromWeather(WeatherData weather){ + private static Long delayFromWeather(WeatherData weather){ String eventClass = weather.getEventClass().toString(); - int delay; + Long delayedSeconds; switch (eventClass){ - case "Clear_night": delay = 0; break; - case "rain": delay = 10; break; - default: delay = 0; + case "Clear_night": delayedSeconds = 0L; break; + case "rain": delayedSeconds = 600L; break; + default: delayedSeconds = 0L; } - return Duration.newBuilder().setSeconds((long) delay).build(); + return delayedSeconds; } } From 3a6484a294428449eba6a289a57d5126ecf787b2 Mon Sep 17 00:00:00 2001 From: Christopher Date: Tue, 19 May 2020 15:15:43 +0200 Subject: [PATCH 05/19] [FEAT] Implementation of WeatherLocationCorrelationMongoFunction --- core/src/main/java/org/bptlab/cepta/Main.java | 9 +- ...atherLocationCorrelationMongoFunction.java | 120 ++++++++++++++++++ 2 files changed, 125 insertions(+), 4 deletions(-) create mode 100644 core/src/main/java/org/bptlab/cepta/operators/WeatherLocationCorrelationMongoFunction.java diff --git a/core/src/main/java/org/bptlab/cepta/Main.java b/core/src/main/java/org/bptlab/cepta/Main.java index 6b8390ca..c0443d24 100644 --- a/core/src/main/java/org/bptlab/cepta/Main.java +++ b/core/src/main/java/org/bptlab/cepta/Main.java @@ -218,10 +218,13 @@ public LocationData map(Event event) throws Exception{ * ++++++++++++++++++++++++ * Begin - Weather/Locations * ------------------------*/ - locationDataStream.map(new DataToPostgresDatabase("location",postgresConfig)); + //The Stream is not necessary it passes through all events independent from a successful upload + DataStream uploadedLocationStream = AsyncDataStream + .unorderedWait(locationDataStream, new DataToMongoDB("location",mongoConfig), + 100000, TimeUnit.MILLISECONDS, 1); DataStream> weatherLocationStream = AsyncDataStream - .unorderedWait(weatherDataStream, new WeatherLocationCorrelationFunction(postgresConfig), + .unorderedWait(weatherDataStream, new WeatherLocationCorrelationMongoFunction("location",mongoConfig), 100000, TimeUnit.MILLISECONDS, 1); //this is a bit weird compared to the other operators @@ -314,8 +317,6 @@ public LocationData map(Event event) throws Exception{ * ++++++++++++++++++++++++ * Begin - SumOfDelaysAtStation * ------------------------*/ - //TODO Discuss Has this to be this way? -// SumOfDelayAtStationFunction sumOfDelayAtStationFunction = new SumOfDelayAtStationFunction(); //TODO Decided about input (Stream and events Notification VS DelayNotification) and Window int sumOfDelayWindow = 4; diff --git a/core/src/main/java/org/bptlab/cepta/operators/WeatherLocationCorrelationMongoFunction.java b/core/src/main/java/org/bptlab/cepta/operators/WeatherLocationCorrelationMongoFunction.java new file mode 100644 index 00000000..5960727b --- /dev/null +++ b/core/src/main/java/org/bptlab/cepta/operators/WeatherLocationCorrelationMongoFunction.java @@ -0,0 +1,120 @@ +package org.bptlab.cepta.operators; + +import com.github.jasync.sql.db.ConnectionPoolConfigurationBuilder; +import com.github.jasync.sql.db.QueryResult; +import com.github.jasync.sql.db.RowData; +import com.github.jasync.sql.db.pool.ConnectionPool; +import com.github.jasync.sql.db.postgresql.PostgreSQLConnection; +import com.github.jasync.sql.db.postgresql.PostgreSQLConnectionBuilder; +import com.mongodb.reactivestreams.client.MongoClient; +import com.mongodb.reactivestreams.client.MongoCollection; +import com.mongodb.reactivestreams.client.MongoDatabase; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.async.ResultFuture; +import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; +import org.bptlab.cepta.config.MongoConfig; +import org.bptlab.cepta.models.events.train.LiveTrainDataOuterClass; +import org.bptlab.cepta.models.events.train.PlannedTrainDataOuterClass; +import org.bptlab.cepta.models.events.weather.WeatherDataOuterClass.WeatherData; +import org.bptlab.cepta.models.internal.notifications.notification.NotificationOuterClass; +import org.bptlab.cepta.utils.database.Mongo; +import org.bptlab.cepta.utils.database.mongohelper.SubscriberHelpers; +import org.bptlab.cepta.utils.notification.NotificationHelper; +import org.bson.Document; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Supplier; + +import static com.mongodb.client.model.Filters.*; +import static com.mongodb.client.model.Sorts.ascending; + +public class WeatherLocationCorrelationMongoFunction extends + RichAsyncFunction > { + + private MongoConfig mongoConfig = new MongoConfig(); + private transient MongoClient mongoClient; + private String tableName; + + public WeatherLocationCorrelationMongoFunction(String tableName,MongoConfig mongoConfig) { + this.mongoConfig = mongoConfig; + this.tableName = tableName; + } + + public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { + /* + the Configuration class must be from flink, it will give errors when jasync's Configuration is taken + open should be called before methods like map() or join() are executed + this must be set to transient, as flink will otherwise try to serialize it which it is not + */ + super.open(parameters); + this.mongoClient = Mongo.getMongoClient(mongoConfig); + } + + @Override + public void close() throws Exception { + this.mongoClient.close(); + } + + @Override + public void asyncInvoke(WeatherData weatherEvent, + final ResultFuture> resultFuture) throws Exception { + /* + asyncInvoke will be called for each incoming element + the resultFuture is where the outgoing element(s) will be + */ + MongoDatabase database = mongoClient.getDatabase(mongoConfig.getName()); + MongoCollection locationDataCollection = database.getCollection(tableName); + + //The new AsyncMongo Driver now uses Reactive Streams, + // so we need Subscribers to get the Query executed and Results received. + // For further details consider the following links: + //http://mongodb.github.io/mongo-java-driver/4.0/driver-reactive/tutorials/connect-to-mongodb/ + //https://github.com/mongodb/mongo-java-driver/blob/eac754d2eed76fe4fa07dbc10ad3935dfc5f34c4/driver-reactive-streams/src/examples/reactivestreams/tour/QuickTour.java + //https://github.com/mongodb/mongo-java-driver/blob/eac754d2eed76fe4fa07dbc10ad3935dfc5f34c4/driver-reactive-streams/src/examples/reactivestreams/helpers/SubscriberHelpers.java#L53 + //https://github.com/reactive-streams/reactive-streams-jvm/tree/v1.0.3#2-subscriber-code + SubscriberHelpers.OperationSubscriber findMultipleSubscriber = new SubscriberHelpers.OperationSubscriber<>(); + locationDataCollection.find( + and( + /* 0.02 is about 2 kilometers */ + gte("lat", weatherEvent.getLatitude() - 0.02) + , lte("lat", weatherEvent.getLatitude() + 0.02) + , gte("lon", weatherEvent.getLongitude() - 0.02) + , lte("lon", weatherEvent.getLongitude() + 0.02) + ) + ).subscribe(findMultipleSubscriber); + + CompletableFuture queryFuture = CompletableFuture.supplyAsync(new Supplier>() { + @Override + public List get() { + return findMultipleSubscriber.get(); + } + }).thenAccept(result -> { + //only generate Events if there are matching Stations in the DB + if (!result.isEmpty()) { + resultFuture.complete(generateWeatherStationEvents(weatherEvent,result)); + } else { + // Empty Collection will result into no generation of output events + resultFuture.complete(new ArrayList<>()); + } + }); + queryFuture.get(); + } + + private Collection> generateWeatherStationEvents(WeatherData weatherData, List matchingStations) { + Collection> events = new ArrayList<>(); + for ( Document matchedStation : matchingStations) { + try { + events.add(new Tuple2<>(weatherData,matchedStation.getInteger("id"))); + } catch ( Exception e) { + e.printStackTrace(); + } + } + return events; + } +} + From 237d6ad6936a534eddeaec9582edaef9eaf0a19a Mon Sep 17 00:00:00 2001 From: Christopher Date: Tue, 19 May 2020 19:39:31 +0200 Subject: [PATCH 06/19] [FIX] Configured WeatherData for correct replay --- auxiliary/producers/replayer/replayers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/auxiliary/producers/replayer/replayers.go b/auxiliary/producers/replayer/replayers.go index 0c81d1ee..f22487ac 100644 --- a/auxiliary/producers/replayer/replayers.go +++ b/auxiliary/producers/replayer/replayers.go @@ -168,7 +168,7 @@ func (s *ReplayerServer) Setup(ctx context.Context) error { SourceName: "weather", Extractor: extractors.NewMongoExtractor(s.mongo, func(event proto.Message) *eventpb.Event { return &eventpb.Event{Event: &eventpb.Event_Weather{Weather: event.(*weatherpb.WeatherData)}} - }, &weatherpb.WeatherData{}, setSortAndID("identifier", "identifier")), + }, &weatherpb.WeatherData{}, setSortAndID("detectionTime", "eventClass")), Topic: topics.Topic_WEATHER_DATA, } From d3d94aa0336c3b04bfaa14122f5c6f887ace2337 Mon Sep 17 00:00:00 2001 From: Christopher Date: Tue, 19 May 2020 19:40:35 +0200 Subject: [PATCH 07/19] [FEAT] Adds Apache.commons.text for CaseUtil --- WORKSPACE | 1 + 1 file changed, 1 insertion(+) diff --git a/WORKSPACE b/WORKSPACE index 90e66189..9cfd8120 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -160,6 +160,7 @@ load("@io_grpc_grpc_java//:repositories.bzl", "IO_GRPC_GRPC_JAVA_OVERRIDE_TARGET maven_install( artifacts = [ "org.apache.commons:commons-lang3:3.9", + "org.apache.commons:commons-text:1.8", "org.javatuples:javatuples:1.2", "junit:junit:4.13", "org.testcontainers:testcontainers:1.14.1", From 84bd12c4df4d1d994e9f6dd093c62c98d299a5f7 Mon Sep 17 00:00:00 2001 From: Christopher Date: Tue, 19 May 2020 19:43:00 +0200 Subject: [PATCH 08/19] [CHANGE] ProtoToBson attributenames->lowerCamelCase instead lower_under --- .../main/java/org/bptlab/cepta/utils/database/Util.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/core/src/main/java/org/bptlab/cepta/utils/database/Util.java b/core/src/main/java/org/bptlab/cepta/utils/database/Util.java index 69f73d1d..35ca0ff6 100644 --- a/core/src/main/java/org/bptlab/cepta/utils/database/Util.java +++ b/core/src/main/java/org/bptlab/cepta/utils/database/Util.java @@ -1,5 +1,6 @@ package org.bptlab.cepta.utils.database; +import org.apache.commons.text.CaseUtils; import com.google.protobuf.Descriptors.FieldDescriptor; import com.google.protobuf.Message; // import com.google.protobuf.Timestamp; @@ -7,7 +8,9 @@ import java.util.ArrayList; import java.util.Map; import java.util.Objects; -import org.javatuples.Triplet; + + + public class Util { @@ -18,7 +21,7 @@ public static ProtoInfoStrings getInfosOfProtoMessageAsStrings(Message dataSet) for (Map.Entry entry : dataSet.getAllFields().entrySet()) { // System.out.println(entry.getKey() + "/" + entry.getValue()); - columnNames.add(entry.getKey().getName()); + columnNames.add( CaseUtils.toCamelCase(entry.getKey().getName(),false,'_')); if(entry.getValue() instanceof com.google.protobuf.Timestamp){ values.add(String.format("'%s'", ProtoTimestampToSqlTimestamp((com.google.protobuf.Timestamp)entry.getValue()).toString())); @@ -46,7 +49,7 @@ public static ProtoKeyValues getKeyValuesOfProtoMessage(Message dataSet) throws ArrayList values = new ArrayList(); for (Map.Entry entry : dataSet.getAllFields().entrySet()) { // System.out.println(entry.getKey() + "/" + entry.getValue()); - columnNames.add(entry.getKey().getName()); + columnNames.add( CaseUtils.toCamelCase(entry.getKey().getName(),false,'_')); values.add(entry.getValue()); } ProtoKeyValues protoInfo = new ProtoKeyValues(columnNames,values); From 539955c547d5d4e0609b5ad9cda4ed1bf283dad1 Mon Sep 17 00:00:00 2001 From: Christopher Date: Tue, 19 May 2020 19:44:48 +0200 Subject: [PATCH 09/19] [FEAT] Adds IndexContainers --- .../org/bptlab/cepta/utils/database/BUILD | 1 + .../bptlab/cepta/utils/database/Mongo.java | 48 ++++++++++++++++++- 2 files changed, 47 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/org/bptlab/cepta/utils/database/BUILD b/core/src/main/java/org/bptlab/cepta/utils/database/BUILD index 61942ec2..4d50fe7a 100644 --- a/core/src/main/java/org/bptlab/cepta/utils/database/BUILD +++ b/core/src/main/java/org/bptlab/cepta/utils/database/BUILD @@ -9,6 +9,7 @@ java_library( "//core/src/main/java/org/bptlab/cepta/utils/database/mongohelper:java_default_library", "//models/events:planned_train_data_java_proto", "@maven//:com_google_protobuf_protobuf_java", + "@maven//:org_apache_commons_commons_text", "@maven//:org_javatuples_javatuples", "@maven//:org_mongodb_bson", "@maven//:org_mongodb_mongodb_driver_core", diff --git a/core/src/main/java/org/bptlab/cepta/utils/database/Mongo.java b/core/src/main/java/org/bptlab/cepta/utils/database/Mongo.java index 7839f5c0..b83abc74 100644 --- a/core/src/main/java/org/bptlab/cepta/utils/database/Mongo.java +++ b/core/src/main/java/org/bptlab/cepta/utils/database/Mongo.java @@ -4,7 +4,6 @@ import com.google.protobuf.Timestamp; import com.mongodb.ConnectionString; import com.mongodb.MongoClientSettings; -import com.mongodb.reactivestreams.client.*; //import com.mongodb.reactivestreams.client.MongoClients; import com.mongodb.client.MongoClient; @@ -20,12 +19,13 @@ import org.bson.codecs.*; import org.bson.codecs.configuration.CodecRegistry; +import java.io.Serializable; import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; import java.util.*; -import org.bptlab.cepta.models.events.train.PlannedTrainDataOuterClass.PlannedTrainData; +import org.javatuples.Pair; import static org.bson.codecs.configuration.CodecRegistries.*; @@ -146,5 +146,49 @@ public static List getUpcomingPlannedTrainDataStartingFromStat return new ArrayList(); } } + + public static class IndexContainer implements Serializable { + private String indexAttributeNameOrCompound; + private Integer orderIndicator; + + //orderIndicator = 0 indicates both ascending and descending indices will be created + public IndexContainer(String indexAttributeNameOrCompound){ + this.indexAttributeNameOrCompound = indexAttributeNameOrCompound; + this.orderIndicator = 0; + } + + public IndexContainer(String indexAttributeNameOrCompound, Integer orderIndicator){ + this.indexAttributeNameOrCompound = indexAttributeNameOrCompound; + this.orderIndicator = orderIndicator; + } + + public Pair get(){ + return new Pair<>(indexAttributeNameOrCompound, orderIndicator); + } + + public String getIndexAttributeNameOrCompound() { + return indexAttributeNameOrCompound; + } + + public Integer getOrderIndicator() { + return orderIndicator; + } + } + + public static List makeIndexContainerListFromPairs(List> indicesToBeCreated){ + ArrayList indexList = new ArrayList(); + for (Pair indexPair : indicesToBeCreated) { + indexList.add(new IndexContainer(indexPair.getValue0(),indexPair.getValue1()) ); + } + return indexList; + } + + public static List makeIndexContainerList(List indicesToBeCreated){ + ArrayList indexList = new ArrayList(); + for (String indexAttributeOrCompound : indicesToBeCreated) { + indexList.add(new IndexContainer(indexAttributeOrCompound) ); + } + return indexList; + } } From 69aaa5066a5ab0a50580e25f723cecaeafa014fb Mon Sep 17 00:00:00 2001 From: Christopher Date: Tue, 19 May 2020 19:46:55 +0200 Subject: [PATCH 10/19] [FIX] Changes stationId from Integer to Long in --- .../operators/WeatherLiveTrainJoinFunction.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/bptlab/cepta/operators/WeatherLiveTrainJoinFunction.java b/core/src/main/java/org/bptlab/cepta/operators/WeatherLiveTrainJoinFunction.java index 7747e920..f8d0b4f6 100644 --- a/core/src/main/java/org/bptlab/cepta/operators/WeatherLiveTrainJoinFunction.java +++ b/core/src/main/java/org/bptlab/cepta/operators/WeatherLiveTrainJoinFunction.java @@ -15,23 +15,23 @@ import org.bptlab.cepta.utils.notification.NotificationHelper; public class WeatherLiveTrainJoinFunction { - public static DataStream delayFromWeather(DataStream> weather, DataStream train){ + public static DataStream delayFromWeather(DataStream> weather, DataStream train){ return weather.join(train) - .where(new KeySelector, Integer>() { + .where(new KeySelector, Long>() { @Override - public Integer getKey(Tuple2 weatherDataIntegerTuple2) throws Exception { + public Long getKey(Tuple2 weatherDataIntegerTuple2) throws Exception { return weatherDataIntegerTuple2.f1; } - }).equalTo(new KeySelector() { + }).equalTo(new KeySelector() { @Override - public Integer getKey(LiveTrainData liveTrainData) throws Exception { - return (int) liveTrainData.getStationId(); + public Long getKey(LiveTrainData liveTrainData) throws Exception { + return liveTrainData.getStationId(); } }) .window(SlidingEventTimeWindows.of(Time.seconds(60), Time.seconds(60))) - .apply(new RichJoinFunction, LiveTrainData, NotificationOuterClass.Notification>() { + .apply(new RichJoinFunction, LiveTrainData, NotificationOuterClass.Notification>() { @Override - public NotificationOuterClass.Notification join(Tuple2 weatherDataIntegerTuple2, + public NotificationOuterClass.Notification join(Tuple2 weatherDataIntegerTuple2, LiveTrainData liveTrainData) throws Exception { return NotificationHelper.getTrainDelayNotificationFrom( String.valueOf(liveTrainData.getTrainSectionId()), From d345b306a81e3cbd3ceca14d3a7b357d213cf5e4 Mon Sep 17 00:00:00 2001 From: Christopher Date: Tue, 19 May 2020 19:48:20 +0200 Subject: [PATCH 11/19] [FIX] Indexing for DataToMongoDB --- .../bptlab/cepta/operators/DataToMongoDB.java | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/core/src/main/java/org/bptlab/cepta/operators/DataToMongoDB.java b/core/src/main/java/org/bptlab/cepta/operators/DataToMongoDB.java index 88655811..4bd9f137 100644 --- a/core/src/main/java/org/bptlab/cepta/operators/DataToMongoDB.java +++ b/core/src/main/java/org/bptlab/cepta/operators/DataToMongoDB.java @@ -8,7 +8,6 @@ import com.mongodb.reactivestreams.client.MongoCollection; import com.mongodb.reactivestreams.client.MongoDatabase; -import org.apache.flink.api.java.tuple.Tuple2; import org.bptlab.cepta.utils.database.Mongo; import org.bptlab.cepta.utils.database.mongohelper.SubscriberHelpers; @@ -39,14 +38,14 @@ public class DataToMongoDB extends RichAsyncFunction { private MongoConfig mongoConfig = new MongoConfig(); private transient MongoClient mongoClient; private final Logger log = LoggerFactory.getLogger(DataToMongoDB.class); - private ArrayList> indices = new ArrayList<>(); + private ArrayList indices = new ArrayList<>(); public DataToMongoDB(String collection_name, MongoConfig mongoConfig){ this.collection_name = collection_name; this.mongoConfig = mongoConfig; } - public DataToMongoDB(String collection_name, List> createIndexFor, MongoConfig mongoConfig){ + public DataToMongoDB(String collection_name, List createIndexFor, MongoConfig mongoConfig){ this.collection_name = collection_name; this.mongoConfig = mongoConfig; this.indices.addAll(createIndexFor); @@ -56,9 +55,14 @@ public DataToMongoDB(String collection_name, List> create public void open(org.apache.flink.configuration.Configuration parameters) throws Exception{ super.open(parameters); this.mongoClient = Mongo.getMongoClient(mongoConfig); - if (!indices.isEmpty()) { - createIndices(); + try{ + if (!indices.isEmpty()) { + createIndices(); + } + } catch (NullPointerException e) { + //no indexing provided } + log.info("Mongo Connection established"); } @@ -76,7 +80,7 @@ public void asyncInvoke(T dataset, ResultFuture resultFuture) throws Exceptio the resultFuture is where the outgoing element(s) will be */ MongoDatabase database = mongoClient.getDatabase(mongoConfig.getName()); - MongoCollection coll = database.getCollection(collection_name); + MongoCollection collection = database.getCollection(collection_name); Document document = protoToBson(dataset); //The new AsyncMongo Driver now uses Reactive Streams, @@ -88,7 +92,7 @@ the resultFuture is where the outgoing element(s) will be //https://github.com/reactive-streams/reactive-streams-jvm/tree/v1.0.3#2-subscriber-code SubscriberHelpers.OperationSubscriber insertOneSubscriber = new SubscriberHelpers.OperationSubscriber<>(); - coll.insertOne(document).subscribe(insertOneSubscriber); + collection.insertOne(document).subscribe(insertOneSubscriber); //start the subscriber -> start querying timeout defaults to 60seconds CompletableFuture queryFuture = CompletableFuture.supplyAsync(new Supplier() { @@ -112,13 +116,20 @@ public Boolean get() { private void createIndices(){ MongoDatabase database = mongoClient.getDatabase(mongoConfig.getName()); - MongoCollection coll = database.getCollection(collection_name); + MongoCollection collection = database.getCollection(collection_name); SubscriberHelpers.OperationSubscriber indexSubscriber = new SubscriberHelpers.OperationSubscriber<>(); - for (Tuple2 index : indices) { - if (index.f1 > 0){ - coll.createIndex(Indexes.ascending(index.f0)).subscribe(indexSubscriber); - } else { - coll.createIndex(Indexes.descending(index.f0)).subscribe(indexSubscriber); + for (Mongo.IndexContainer index : indices) { + switch (index.getOrderIndicator()){ + default: + collection.createIndex(Indexes.ascending(index.getIndexAttributeNameOrCompound())).subscribe(indexSubscriber); + collection.createIndex(Indexes.descending(index.getIndexAttributeNameOrCompound())).subscribe(indexSubscriber); + break; + case 1: + collection.createIndex(Indexes.ascending(index.getIndexAttributeNameOrCompound())).subscribe(indexSubscriber); + break; + case -1: + collection.createIndex(Indexes.descending(index.getIndexAttributeNameOrCompound())).subscribe(indexSubscriber); + break; } } //TODO acknowledge? From 26dbcca183ac3e43f93498726d963b52ab630464 Mon Sep 17 00:00:00 2001 From: Christopher Date: Tue, 19 May 2020 19:49:30 +0200 Subject: [PATCH 12/19] [FIX] correct Query and Long FIX for StationId --- ...eatherLocationCorrelationMongoFunction.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/bptlab/cepta/operators/WeatherLocationCorrelationMongoFunction.java b/core/src/main/java/org/bptlab/cepta/operators/WeatherLocationCorrelationMongoFunction.java index 5960727b..ecb3e416 100644 --- a/core/src/main/java/org/bptlab/cepta/operators/WeatherLocationCorrelationMongoFunction.java +++ b/core/src/main/java/org/bptlab/cepta/operators/WeatherLocationCorrelationMongoFunction.java @@ -34,7 +34,7 @@ import static com.mongodb.client.model.Sorts.ascending; public class WeatherLocationCorrelationMongoFunction extends - RichAsyncFunction > { + RichAsyncFunction > { private MongoConfig mongoConfig = new MongoConfig(); private transient MongoClient mongoClient; @@ -62,7 +62,7 @@ public void close() throws Exception { @Override public void asyncInvoke(WeatherData weatherEvent, - final ResultFuture> resultFuture) throws Exception { + final ResultFuture> resultFuture) throws Exception { /* asyncInvoke will be called for each incoming element the resultFuture is where the outgoing element(s) will be @@ -81,10 +81,10 @@ the resultFuture is where the outgoing element(s) will be locationDataCollection.find( and( /* 0.02 is about 2 kilometers */ - gte("lat", weatherEvent.getLatitude() - 0.02) - , lte("lat", weatherEvent.getLatitude() + 0.02) - , gte("lon", weatherEvent.getLongitude() - 0.02) - , lte("lon", weatherEvent.getLongitude() + 0.02) + gte("latitude", weatherEvent.getLatitude() - 0.02) + , lte("latitude", weatherEvent.getLatitude() + 0.02) + , gte("longitude", weatherEvent.getLongitude() - 0.02) + , lte("longitude", weatherEvent.getLongitude() + 0.02) ) ).subscribe(findMultipleSubscriber); @@ -105,11 +105,11 @@ public List get() { queryFuture.get(); } - private Collection> generateWeatherStationEvents(WeatherData weatherData, List matchingStations) { - Collection> events = new ArrayList<>(); + private Collection> generateWeatherStationEvents(WeatherData weatherData, List matchingStations) { + Collection> events = new ArrayList<>(); for ( Document matchedStation : matchingStations) { try { - events.add(new Tuple2<>(weatherData,matchedStation.getInteger("id"))); + events.add(new Tuple2<>(weatherData,matchedStation.getLong("stationId"))); } catch ( Exception e) { e.printStackTrace(); } From 5002290e795c8c0203d41d80d11ce09e0582fafc Mon Sep 17 00:00:00 2001 From: Christopher Date: Tue, 19 May 2020 20:28:33 +0200 Subject: [PATCH 13/19] [FEAT] adds Indexing for MongoToDB --- core/BUILD | 1 + 1 file changed, 1 insertion(+) diff --git a/core/BUILD b/core/BUILD index b8ff079e..3c743a65 100644 --- a/core/BUILD +++ b/core/BUILD @@ -11,6 +11,7 @@ MAIN_DEPS = [ "//core/src/main/java/org/bptlab/cepta/serialization:java_default_library", "//core/src/main/java/org/bptlab/cepta/utils/types:java_default_library", "//core/src/main/java/org/bptlab/cepta/utils/functions:java_default_library", + "//core/src/main/java/org/bptlab/cepta/utils/database:java_default_library", "//models/constants:topic_java_proto", "//models/events:live_train_data_java_proto", "//models/events:count_of_trains_at_station_event_java_proto", From 8e57695cfd4c7bacaade54a29ec7538b7df3db43 Mon Sep 17 00:00:00 2001 From: Christopher Date: Tue, 19 May 2020 20:33:53 +0200 Subject: [PATCH 14/19] [FIX] lower_underscore to lowerCamelCase change --- .../operators/DelayShiftFunctionMongo.java | 8 ++-- .../bptlab/cepta/utils/database/Mongo.java | 40 ++++++++++--------- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/core/src/main/java/org/bptlab/cepta/operators/DelayShiftFunctionMongo.java b/core/src/main/java/org/bptlab/cepta/operators/DelayShiftFunctionMongo.java index ca477725..57556693 100644 --- a/core/src/main/java/org/bptlab/cepta/operators/DelayShiftFunctionMongo.java +++ b/core/src/main/java/org/bptlab/cepta/operators/DelayShiftFunctionMongo.java @@ -74,11 +74,11 @@ the resultFuture is where the outgoing element(s) will be SubscriberHelpers.OperationSubscriber findMultipleSubscriber = new SubscriberHelpers.OperationSubscriber<>(); plannedTrainDataCollection.find( and( - eq("train_section_id",dataset.getTrainSectionId()), - eq("end_station_id",dataset.getEndStationId()), - eq("planned_arrival_time_end_station",dataset.getPlannedArrivalTimeEndStation()) + eq("trainSectionId",dataset.getTrainSectionId()), + eq("endStationId",dataset.getEndStationId()), + eq("plannedArrivalTimeEndStation",dataset.getPlannedArrivalTimeEndStation()) ) - ).sort(ascending("planned_event_time")).subscribe(findMultipleSubscriber); + ).sort(ascending("plannedEventTime")).subscribe(findMultipleSubscriber); CompletableFuture queryFuture = CompletableFuture.supplyAsync(new Supplier>() { @Override diff --git a/core/src/main/java/org/bptlab/cepta/utils/database/Mongo.java b/core/src/main/java/org/bptlab/cepta/utils/database/Mongo.java index b83abc74..b14bb651 100644 --- a/core/src/main/java/org/bptlab/cepta/utils/database/Mongo.java +++ b/core/src/main/java/org/bptlab/cepta/utils/database/Mongo.java @@ -103,22 +103,26 @@ public static Document protoToBson(Message dataset ) { public static PlannedTrainData documentToPlannedTrainData(Document doc){ PlannedTrainData.Builder builder = PlannedTrainData.newBuilder(); - builder.setId((Long)doc.get("id")); - builder.setTrainSectionId((Long)doc.get("train_section_id")); - builder.setStationId((Long)doc.get("station_id")); - builder.setPlannedEventTime((Timestamp)doc.get("planned_event_time")); - builder.setStatus((Long)doc.get("status")); - builder.setFirstTrainId((Long)doc.get("first_train_id")); - builder.setTrainId((Long)doc.get("train_id")); - builder.setPlannedDepartureTimeStartStation((Timestamp)doc.get("planned_departure_time_start_station")); - builder.setPlannedArrivalTimeEndStation((Timestamp)doc.get("planned_arrival_time_end_station")); - builder.setRuId((Long)doc.get("ru_id")); - builder.setEndStationId((Long)doc.get("end_station_id")); - builder.setImId((Long)doc.get("im_id")); - builder.setFollowingImId((Long)doc.get("following_im_id")); - builder.setMessageStatus((Long)doc.get("message_status")); - builder.setIngestionTime((Timestamp)doc.get("ingestion_time")); - builder.setOriginalTrainId((Long)doc.get("original_train_id")); + try { + builder.setId((Long) doc.get("id")); + builder.setTrainSectionId((Long) doc.get("trainSectionId")); + builder.setStationId((Long) doc.get("stationId")); + builder.setPlannedEventTime((Timestamp) doc.get("plannedEventTime")); + builder.setStatus((Long) doc.get("status")); + builder.setFirstTrainId((Long) doc.get("firstTrainId")); + builder.setTrainId((Long) doc.get("trainId")); + builder.setPlannedDepartureTimeStartStation((Timestamp) doc.get("plannedDepartureTimeStartStation")); + builder.setPlannedArrivalTimeEndStation((Timestamp) doc.get("plannedArrivalTimeEndStation")); + builder.setRuId((Long) doc.get("ruId")); + builder.setEndStationId((Long) doc.get("endStationId")); + builder.setImId((Long) doc.get("imId")); + builder.setFollowingImId((Long) doc.get("followingImId")); + builder.setMessageStatus((Long) doc.get("messageStatus")); + builder.setIngestionTime((Timestamp) doc.get("ingestionTime")); + builder.setOriginalTrainId((Long) doc.get("originalTrainId")); + }catch (Exception e) { + e.printStackTrace(); + } return builder.build(); } @@ -127,7 +131,7 @@ public static List getUpcomingPlannedTrainDataStartingFromStat boolean hasReferenceStation = false; try { for (int backwardsIterator = documentList.size()-1; backwardsIterator >= 0; backwardsIterator--) { - long plannedStationId = (long) documentList.get(backwardsIterator).get("station_id"); + long plannedStationId = (long) documentList.get(backwardsIterator).get("stationId"); if (currentStationId != plannedStationId ) { plannedTrainDataList.add(documentToPlannedTrainData(documentList.get(backwardsIterator))); } else { @@ -137,7 +141,7 @@ public static List getUpcomingPlannedTrainDataStartingFromStat } } } catch ( Exception e) { - // No element in DocumentList with station_id + // No element in DocumentList with stationId e.printStackTrace(); } if (hasReferenceStation) { From 12f9aa57351611542fe7890df8878056c9a86319 Mon Sep 17 00:00:00 2001 From: Christopher Date: Tue, 19 May 2020 20:36:47 +0200 Subject: [PATCH 15/19] [FEAT] Adds Mongo Indexing, Change Weather processing to Mongo --- core/src/main/java/org/bptlab/cepta/Main.java | 34 +++++++------------ 1 file changed, 13 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/org/bptlab/cepta/Main.java b/core/src/main/java/org/bptlab/cepta/Main.java index c0443d24..b85820d0 100644 --- a/core/src/main/java/org/bptlab/cepta/Main.java +++ b/core/src/main/java/org/bptlab/cepta/Main.java @@ -18,7 +18,7 @@ package org.bptlab.cepta; -import java.util.Optional; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.functions.FlatMapFunction; @@ -50,6 +50,8 @@ import org.bptlab.cepta.patterns.StaysInStationPattern; import org.bptlab.cepta.serialization.GenericBinaryProtoDeserializer; import org.bptlab.cepta.serialization.GenericBinaryProtoSerializer; +import org.bptlab.cepta.utils.database.Mongo; +import org.bptlab.cepta.utils.database.Mongo.IndexContainer; import org.bptlab.cepta.utils.functions.StreamUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +70,6 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; -import java.util.Properties; @Command( name = "cepta core", @@ -218,12 +219,15 @@ public LocationData map(Event event) throws Exception{ * ++++++++++++++++++++++++ * Begin - Weather/Locations * ------------------------*/ + //Instruct to create an Index to increase performance of queries after Insertion + List locationIndex = Mongo.makeIndexContainerList(Arrays.asList("latitude","longitude")); + //The Stream is not necessary it passes through all events independent from a successful upload DataStream uploadedLocationStream = AsyncDataStream - .unorderedWait(locationDataStream, new DataToMongoDB("location",mongoConfig), + .unorderedWait(locationDataStream, new DataToMongoDB("location",locationIndex,mongoConfig), 100000, TimeUnit.MILLISECONDS, 1); - DataStream> weatherLocationStream = AsyncDataStream + DataStream> weatherLocationStream = AsyncDataStream .unorderedWait(weatherDataStream, new WeatherLocationCorrelationMongoFunction("location",mongoConfig), 100000, TimeUnit.MILLISECONDS, 1); @@ -231,24 +235,24 @@ public LocationData map(Event event) throws Exception{ DataStream delayFromWeatherStream = WeatherLiveTrainJoinFunction.delayFromWeather(weatherLocationStream,liveTrainDataStream); delayFromWeatherStream.addSink(trainDelayNotificationProducer); - +// delayFromWeatherStream.print(); /*------------------------- * End - Weather * ++++++++++++++++++++++++ * Begin - MongoDelayShift * ------------------------*/ - + List plannedTrainDataIndex = Mongo.makeIndexContainerList(Arrays.asList("trainSectionId","endStationId","plannedArrivalTimeEndStation","plannedEventTime")); //The Stream is not necessary it passes through all events independent from a successful upload DataStream plannedTrainDataStreamUploaded = AsyncDataStream - .unorderedWait(plannedTrainDataStream, new DataToMongoDB("plannedTrainData", mongoConfig), + .unorderedWait(plannedTrainDataStream, new DataToMongoDB("plannedTrainData",plannedTrainDataIndex, mongoConfig), 100000, TimeUnit.MILLISECONDS, 1); DataStream notificationFromDelayShift = AsyncDataStream .unorderedWait(liveTrainDataStream, new DelayShiftFunctionMongo(mongoConfig), 100000, TimeUnit.MILLISECONDS, 1); - notificationFromDelayShift.addSink(trainDelayNotificationProducer); // notificationFromDelayShift.print(); + notificationFromDelayShift.addSink(trainDelayNotificationProducer); /*------------------------- * End - MongoDelayShift * ++++++++++++++++++++++++ @@ -282,16 +286,6 @@ public LocationData map(Event event) throws Exception{ .unorderedWait(liveTrainDataStream, new LivePlannedCorrelationFunctionMongo( mongoConfig), 100000, TimeUnit.MILLISECONDS, 1); - // LivePlannedCorrelationFunction Postgre - //TODO!! - //This might be Very Slot, maybe Too slow for LivePlannedCorrelationFunction!! -// plannedTrainDataStream.map(new DataToPostgresDatabase("planned",postgresConfig)); - -// DataStream> matchedLivePlannedStream = -// AsyncDataStream -// .unorderedWait(liveTrainDataStream, new LivePlannedCorrelationFunction(postgresConfig), -// 100000, TimeUnit.MILLISECONDS, 1); - // DetectStationArrivalDelay DataStream trainDelayNotificationDataStream = matchedLivePlannedStream .process(new DetectStationArrivalDelay()).name("train-delays"); @@ -311,19 +305,17 @@ public LocationData map(Event event) throws Exception{ patternStream.process(NoMatchingPlannedTrainDataPattern.generateNMPTDEventsFunc()); //TODO add consumer for these Events - /*------------------------- * End - matchedLivePlanned * ++++++++++++++++++++++++ * Begin - SumOfDelaysAtStation * ------------------------*/ //TODO Decided about input (Stream and events Notification VS DelayNotification) and Window - int sumOfDelayWindow = 4; DataStream> sumOfDelayAtStationStream = SumOfDelayAtStationFunction.sumOfDelayAtStation(trainDelayNotificationDataStream, sumOfDelayWindow ); //TODO Make Sink/Producer - sumOfDelayAtStationStream.print(); +// sumOfDelayAtStationStream.print(); /*------------------------- * End - SumOfDelaysAtStation * ++++++++++++++++++++++++ From 492618dd0d4935c94def9d2e907240a61cbf9d12 Mon Sep 17 00:00:00 2001 From: Christopher Date: Tue, 19 May 2020 20:48:11 +0200 Subject: [PATCH 16/19] [FIX] Weather Test Integer->Long --- .../providers/LiveTrainDataProvider.java | 40 +++++++++---------- .../WeatherLiveTrainJoinTests.java | 12 +++--- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/core/src/test/java/org/bptlab/cepta/providers/LiveTrainDataProvider.java b/core/src/test/java/org/bptlab/cepta/providers/LiveTrainDataProvider.java index fdd0b3ec..825ef20e 100644 --- a/core/src/test/java/org/bptlab/cepta/providers/LiveTrainDataProvider.java +++ b/core/src/test/java/org/bptlab/cepta/providers/LiveTrainDataProvider.java @@ -109,7 +109,7 @@ public long extractAscendingTimestamp(LiveTrainData liveTrainData) { } // @DataProvider(name = "one-matching-live-train-weather-data-provider") - public static Pair, DataStream>> oneMatchingLiveTrainWeatherData() { + public static Pair, DataStream>> oneMatchingLiveTrainWeatherData() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); ArrayList oneMatchingTrain = new ArrayList<>(); @@ -123,24 +123,24 @@ public long extractAscendingTimestamp(LiveTrainData liveTrainData) { return liveTrainData.getIngestionTime().getSeconds(); } }); - ArrayList> weather = new ArrayList<>(); + ArrayList> weather = new ArrayList<>(); weather.add(correlatedWeatherEventWithStationIdClass(1, "weather1")); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - DataStream> weatherStream = env.fromCollection(weather) + DataStream> weatherStream = env.fromCollection(weather) .assignTimestampsAndWatermarks( - new AscendingTimestampExtractor>() { + new AscendingTimestampExtractor>() { @Override public long extractAscendingTimestamp( - Tuple2 weatherDataIntegerTuple2) { + Tuple2 weatherDataIntegerTuple2) { return weatherDataIntegerTuple2.f0.getStartTime().getSeconds(); } }); - return new Pair, DataStream>>(liveTrainStream, weatherStream); + return new Pair, DataStream>>(liveTrainStream, weatherStream); } // @DataProvider(name = "several-matching-live-train-weather-data-provider") - public static Pair, DataStream>> multipleMatchingLiveTrainWeatherData() { + public static Pair, DataStream>> multipleMatchingLiveTrainWeatherData() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); ArrayList matchingTrains = new ArrayList<>(); @@ -157,28 +157,28 @@ public long extractAscendingTimestamp(LiveTrainData liveTrainData) { return liveTrainData.getIngestionTime().getSeconds(); } }); - ArrayList> weather = new ArrayList<>(); + ArrayList> weather = new ArrayList<>(); weather.add(correlatedWeatherEventWithStationIdClass(1, "Clear_night")); weather.add(correlatedWeatherEventWithStationIdClass(2, "Clear_night")); weather.add(correlatedWeatherEventWithStationIdClass(3, "Clear_night")); weather.add(correlatedWeatherEventWithStationIdClass(4, "Clear_night")); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - DataStream> weatherStream = env.fromCollection(weather) + DataStream> weatherStream = env.fromCollection(weather) .assignTimestampsAndWatermarks( - new AscendingTimestampExtractor>() { + new AscendingTimestampExtractor>() { @Override public long extractAscendingTimestamp( - Tuple2 weatherDataIntegerTuple2) { + Tuple2 weatherDataIntegerTuple2) { return weatherDataIntegerTuple2.f0.getStartTime().getSeconds(); } }); - return new Pair, DataStream>>(liveTrainStream, weatherStream); + return new Pair, DataStream>>(liveTrainStream, weatherStream); // return new Object[][] { {liveTrainStream, weatherStream} }; } // @DataProvider(name = "not-matching-live-train-weather-data-provider") - public static Pair, DataStream>> noMatchingLiveTrainWeatherData(){ + public static Pair, DataStream>> noMatchingLiveTrainWeatherData(){ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); ArrayList oneMatchingTrain = new ArrayList<>(); @@ -192,20 +192,20 @@ public long extractAscendingTimestamp(LiveTrainData liveTrainData) { return liveTrainData.getIngestionTime().getSeconds(); } }); - ArrayList> weather = new ArrayList<>(); + ArrayList> weather = new ArrayList<>(); weather.add(correlatedWeatherEventWithStationIdClass(2, "weather1")); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); - DataStream> weatherStream = env.fromCollection(weather) + DataStream> weatherStream = env.fromCollection(weather) .assignTimestampsAndWatermarks( - new AscendingTimestampExtractor>() { + new AscendingTimestampExtractor>() { @Override public long extractAscendingTimestamp( - Tuple2 weatherDataIntegerTuple2) { + Tuple2 weatherDataIntegerTuple2) { return weatherDataIntegerTuple2.f0.getStartTime().getSeconds(); } }); - return new Pair, DataStream>>(liveTrainStream, weatherStream); + return new Pair, DataStream>>(liveTrainStream, weatherStream); } public static LiveTrainData trainEventWithEventTime( Timestamp timestamp ){ @@ -230,9 +230,9 @@ public static LiveTrainData trainEventWithTrainSectionIdStationId(int trainId, i .setTrainSectionId(trainId).setStationId(stationId).build(); } - public static Tuple2 correlatedWeatherEventWithStationIdClass(int stationId, String eventClass){ + public static Tuple2 correlatedWeatherEventWithStationIdClass(int stationId, String eventClass){ WeatherData weather = WeatherDataProvider.getDefaultWeatherEvent().toBuilder() .setEventClass(eventClass).build(); - return new Tuple2<>(weather, stationId); + return new Tuple2<>(weather, (long) stationId); } } diff --git a/core/src/test/java/org/bptlab/cepta/weather-tests/WeatherLiveTrainJoinTests.java b/core/src/test/java/org/bptlab/cepta/weather-tests/WeatherLiveTrainJoinTests.java index 24718917..8c8f3548 100644 --- a/core/src/test/java/org/bptlab/cepta/weather-tests/WeatherLiveTrainJoinTests.java +++ b/core/src/test/java/org/bptlab/cepta/weather-tests/WeatherLiveTrainJoinTests.java @@ -24,9 +24,9 @@ public class WeatherLiveTrainJoinTests { @Test public void testMatchesOne() throws IOException { - Pair, DataStream>> input = LiveTrainDataProvider.oneMatchingLiveTrainWeatherData(); + Pair, DataStream>> input = LiveTrainDataProvider.oneMatchingLiveTrainWeatherData(); DataStream liveTrainStream = input.getValue0(); - DataStream> correlatedWeatherStream = input.getValue1(); + DataStream> correlatedWeatherStream = input.getValue1(); DataStream trainDelayNotificationDataStream = WeatherLiveTrainJoinFunction.delayFromWeather(correlatedWeatherStream, liveTrainStream); @@ -38,9 +38,9 @@ public void testMatchesOne() throws IOException { @Test public void testMatchesMultiple() throws IOException { - Pair, DataStream>> input = LiveTrainDataProvider.multipleMatchingLiveTrainWeatherData(); + Pair, DataStream>> input = LiveTrainDataProvider.multipleMatchingLiveTrainWeatherData(); DataStream liveTrainStream = input.getValue0(); - DataStream> correlatedWeatherStream = input.getValue1(); + DataStream> correlatedWeatherStream = input.getValue1(); DataStream trainDelayNotificationDataStream = WeatherLiveTrainJoinFunction.delayFromWeather(correlatedWeatherStream, liveTrainStream); @@ -52,9 +52,9 @@ public void testMatchesMultiple() throws IOException { @Test public void testMatchesNone() throws IOException { - Pair, DataStream>> input = LiveTrainDataProvider.noMatchingLiveTrainWeatherData(); + Pair, DataStream>> input = LiveTrainDataProvider.noMatchingLiveTrainWeatherData(); DataStream liveTrainStream = input.getValue0(); - DataStream> correlatedWeatherStream = input.getValue1(); + DataStream> correlatedWeatherStream = input.getValue1(); DataStream trainDelayNotificationDataStream = WeatherLiveTrainJoinFunction.delayFromWeather(correlatedWeatherStream, liveTrainStream); From 3657fe5a5998c30e74391499ece46ce646c57a60 Mon Sep 17 00:00:00 2001 From: Christopher Date: Wed, 20 May 2020 17:01:19 +0200 Subject: [PATCH 17/19] [FIX] LivePlannedCorrelation test based on Postgre Int->bigint/Long --- .../PlannedTrainDataDatabaseConverter.java | 48 +++++++++---------- .../bptlab/cepta/live-planned-trains/BUILD | 2 + .../LivePlannedCorrelationTests.java | 34 ++++++++----- 3 files changed, 47 insertions(+), 37 deletions(-) diff --git a/core/src/main/java/org/bptlab/cepta/utils/converters/PlannedTrainDataDatabaseConverter.java b/core/src/main/java/org/bptlab/cepta/utils/converters/PlannedTrainDataDatabaseConverter.java index 0b0b6ba2..43dbd271 100644 --- a/core/src/main/java/org/bptlab/cepta/utils/converters/PlannedTrainDataDatabaseConverter.java +++ b/core/src/main/java/org/bptlab/cepta/utils/converters/PlannedTrainDataDatabaseConverter.java @@ -14,22 +14,22 @@ public class PlannedTrainDataDatabaseConverter extends DatabaseConverter Date: Tue, 26 May 2020 12:20:20 +0200 Subject: [PATCH 18/19] [FIX] Postgre Fix int->bigint/long --- .../delay-shift/DelayShiftFunctionTests.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/test/java/org/bptlab/cepta/delay-shift/DelayShiftFunctionTests.java b/core/src/test/java/org/bptlab/cepta/delay-shift/DelayShiftFunctionTests.java index 141f4804..3d0599e5 100644 --- a/core/src/test/java/org/bptlab/cepta/delay-shift/DelayShiftFunctionTests.java +++ b/core/src/test/java/org/bptlab/cepta/delay-shift/DelayShiftFunctionTests.java @@ -237,21 +237,21 @@ private String insertTrainWithSectionIdStationIdPlannedTimeQuery(long trainSecti private String createPlannedDatabaseQuery(){ return "CREATE TABLE public.planned ( " + - "id integer, " + - "train_section_id integer, " + - "station_id integer, " + + "id bigint, " + + "train_section_id bigint, " + + "station_id bigint, " + "planned_event_time timestamp, " + - "status integer, " + - "first_train_id integer, " + - "train_id integer, " + + "status bigint, " + + "first_train_id bigint, " + + "train_id bigint, " + "planned_departure_time_start_station timestamp, " + "planned_arrival_time_end_station timestamp, " + - "ru_id integer, " + - "end_station_id integer, " + - "im_id integer, " + - "following_im_id integer, " + - "message_status integer, " + + "ru_id bigint, " + + "end_station_id bigint, " + + "im_id bigint, " + + "following_im_id bigint, " + + "message_status bigint, " + "ingestion_time timestamp, " + - "original_train_id integer)"; + "original_train_id bigint)"; } } From b09d926ff678ca2868d709adce95173c1b588d37 Mon Sep 17 00:00:00 2001 From: Vincent Opitz Date: Tue, 26 May 2020 14:55:27 +0200 Subject: [PATCH 19/19] revert merge --- .../cepta/providers/ReplayerProvider.java | 30 +++++-------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/core/src/test/java/org/bptlab/cepta/providers/ReplayerProvider.java b/core/src/test/java/org/bptlab/cepta/providers/ReplayerProvider.java index d5fba0c0..33a68792 100644 --- a/core/src/test/java/org/bptlab/cepta/providers/ReplayerProvider.java +++ b/core/src/test/java/org/bptlab/cepta/providers/ReplayerProvider.java @@ -32,28 +32,14 @@ public ReplayerProvider(ManagedChannelBuilder channelBuilder) { this.asyncStub = ReplayerGrpc.newStub(channel); } -// public Iterator query(QueryOptions options) throws StatusRuntimeException { -// /* -// ReplayedEvent request = -// Rectangle.newBuilder() -// .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build()) -// .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build(); -// */ -// // return blockingStub.query(options); -// -// -// //Iterator -// -// -// /* -// try { -// features = blockingStub.listFeatures(request); -// } catch (StatusRuntimeException ex) { -// logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus()); -// return; -// } -// */ -// } + public Iterator query(QueryOptions options) throws StatusRuntimeException { + /* + ReplayedEvent request = + Rectangle.newBuilder() + .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build()) + .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build(); + */ + // return blockingStub.query(options); //TODO Replace Placeholder >>>> Iterator I = new Iterator() {