-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathSparkStreamingConsumer.java
116 lines (99 loc) · 4.41 KB
/
SparkStreamingConsumer.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import com.google.gson.Gson;
import kafka.serializer.StringDecoder;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Milliseconds;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple5;
import java.io.Serializable;
import java.util.*;
public class SparkStreamingConsumer {
public static final String KAFKA_TOPIC = "spark-streaming-topic";
public static final Long TIME = 10 * 1000L; //milliseconds
public static final int delayFactor = 10000; // 0..
private static Gson gson = new Gson();
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf()
.setAppName("BenchmarkSpark")
.set("spark.streaming.backpressure.enabled","true")
// uncomment it to set physical limits of processing
// .set("spark.streaming.receiver.maxRate", "10000")
// .set("spark.streaming.kafka.maxRatePerPartition", "10000")
.setMaster("local");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Milliseconds.apply(TIME/2));
// see: http://spark.apache.org/docs/latest/streaming-kafka-integration.html
Set<String> topicMap = new HashSet<>(Arrays.asList(KAFKA_TOPIC));
Map<String, String> kafkaParams = new HashMap<String, String>() {
{
put("metadata.broker.list", "localhost:9092");
//put("auto.offset.reset", "smallest");
put("auto.offset.reset", "largest");
}
};
JavaPairInputDStream<String, String> messages =
KafkaUtils.createDirectStream(jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicMap);
messages
.map(x -> {
Message message = gson.fromJson(x._2(), Message.class);
// time delay emulation
int count = 0;
byte[] array = message.getUid().getBytes();
for (int j = 0; j < delayFactor; j++) {
for (int i = 0; i < array.length; i++) {
if (array[0] == array[i]) count++;
}
}
return new Tuple5<String, Message, Long, Long, Integer>(
x._2(), message, System.currentTimeMillis(), 1L, count);
})
.window(Milliseconds.apply(TIME), Milliseconds.apply(TIME))
.reduce((x1, x2) -> new Tuple5<String, Message, Long, Long, Integer>(
x1._1(),
x1._2(),
x1._3(),
x1._4() + x2._4(),
x1._5()))
.foreachRDD(rdd -> rdd.foreachPartition(
partitionOfRecords -> partitionOfRecords.forEachRemaining(x -> {
System.out.println(
"***************************************************************************"
+ "\nProcessing time: " + Long.toString(System.currentTimeMillis() - x._3())
+ "\nExpected time: " + Long.toString(TIME)
+ "\nProcessed messages: " + Long.toString(x._4())
+ "\nMessage example: " + x._1()
+ "\nRecovered json: " + x._2()
);
}
)
)
);
jssc.start();
jssc.awaitTermination();
}
public class Message implements Serializable {
private Long message;
private String uid;
public Long getMessage() {
return message;
}
public void setMessage(Long message) {
this.message = message;
}
public String getUid() {
return uid;
}
public void setUid(String uid) {
this.uid = uid;
}
public String toString() {
return gson.toJson(this);
}
}
}