-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathstructured-streaming-single-burst.sh
executable file
·236 lines (205 loc) · 8.24 KB
/
structured-streaming-single-burst.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
#! /bin/bash
Help()
{
# Display Help
echo "Structured Streaming single burst workload"
echo
echo "Parameters:"
echo "-h Print this Help."
echo
}
# Get the options
while getopts "h" option; do
case $option in
h) # display Help
Help
exit;;
esac
done
# hardcoded env vars
echo "Job Configuration:"
export FRAMEWORK="STRUCTUREDSTREAMING"; echo "- FRAMEWORK = $FRAMEWORK"
export MODE="single-burst"; echo "- MODE = $MODE"
export DATA_VOLUME=15; echo "- FRAMEWORK = $FRAMEWORK"
export JAR_PATH=`cat ../benchmark-jars-path`
export JAR_NAME=$JAR_PATH/structured-streaming-benchmark-assembly-3.0.jar; echo "- JAR_NAME = $JAR_NAME"
export PUBLISHER_COUNT=1
export KAFKA_AUTO_OFFSET_RESET_STRATEGY="earliest"
export INPUT_DATA_PATH=`cat ../benchmark-input-data-path`
export AMT_WORKERS=5; echo "- AMT_WORKERS = $AMT_WORKERS"
export WORKER_CPU=4; echo "- WORKER_CPU = $WORKER_CPU"
export NUM_PARTITIONS=20
export NUM_SQL_PARTITIONS=20
export AWS_ACCESS_KEY=`cat ../AWS_ACCESS_KEY`
export AWS_SECRET_KEY=`cat ../AWS_SECRET_KEY`
if [ -z "$AWS_ACCESS_KEY" ] || [ -z "$AWS_SECRET_KEY" ] ; then
echo 'Missing AWS_ACCESS_KEY and/or AWS_SECRET_KEY. Fill it in in the AWS_ACCESS_KEY and AWS_SECRET_KEY files in the automation_scripts folder.' >&2
exit 1
fi
# depending env vars
export WORKER_MEM=$(($WORKER_CPU*5))
export WORKER_HEAP_MEM=$(($WORKER_MEM*9/10-1))
export SPARK_EXECUTOR_MEMORY="${WORKER_HEAP_MEM}g"
export NUM_PARTITIONS=$(($WORKER_CPU*$AMT_WORKERS))
export SPARK_CORES_MAX=$(($AMT_WORKERS*$WORKER_CPU))
export CONC_GC_THREADS=$(($WORKER_CPU/2))
echo
echo
eval $(ssh-agent -s)
ssh-add ~/.ssh/id_rsa_benchmark
## start spark
cd ..
./start-spark-cluster.sh $AMT_WORKERS $WORKER_CPU
cd ../aws_marathon_files
# get the hosts of the jmx containers
SPARK_CONTAINER_NAME=($(dcos task spark | awk '{ print $1 }' | grep spark))
SPARK_CONTAINER_IP=($(dcos task spark | awk '{ print $2 }' | grep 10))
export JMX_HOSTS=""
for i in $(seq 0 $AMT_WORKERS)
do
JMX_HOSTS="${JMX_HOSTS}${SPARK_CONTAINER_NAME[$i]}:${SPARK_CONTAINER_IP[$i]},"
done
JMX_HOSTS=${JMX_HOSTS::-1} # remove the last comma
echo $JMX_HOSTS
#### HDFS name nodes
HDFS_NAME_NODE_1=$(dcos task name-0-node | awk '{ print $2 }' | grep 10)
HDFS_NAME_NODE_2=$(dcos task name-1-node | awk '{ print $2 }' | grep 10)
STATE_NAME_NODE_1=$(curl "http://$HDFS_NAME_NODE_1:9002/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus" | jq '.beans[0].State')
STATE_NAME_NODE_2=$(curl "http://$HDFS_NAME_NODE_2:9002/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus" | jq '.beans[0].State')
if [[ $STATE_NAME_NODE_1 == *"active"* ]]; then
export ACTIVE_HDFS_NAME_NODE="$HDFS_NAME_NODE_1:9001"
else
export ACTIVE_HDFS_NAME_NODE="$HDFS_NAME_NODE_2:9001"
fi
# Requesting the values for the required environment variables for ndw publisher and output consumer
## get the Kafka brokers
BOOTSTRAP_SERVER_LIST=($(dcos task kafka-brokers | awk '{ print $2 }' | grep 10))
BROKER_LIST_STRING="${BOOTSTRAP_SERVER_LIST[*]}"
export KAFKA_BOOTSTRAP_SERVERS=$(echo "${BROKER_LIST_STRING//${IFS:0:1}/,}" | sed -E "s/([^,]+)/\1:10000/g")
echo "Kafka bootstrap servers configured as: $KAFKA_BOOTSTRAP_SERVERS"
# DCOS IP (for jmx exporter)
DCOS_DNS_ADDRESS=$(aws cloudformation describe-stacks --region eu-west-1 --stack-name=streaming-benchmark | jq '.Stacks[0].Outputs | .[] | select(.Description=="Master") | .OutputValue' | awk '{print tolower($0)}')
export CLUSTER_URL=http://${DCOS_DNS_ADDRESS//\"}
echo $CLUSTER_URL
# DCOS access token (for jmx exporter)
export DCOS_ACCESS_TOKEN=$(dcos config show core.dcos_acs_token)
echo $DCOS_ACCESS_TOKEN
topicnames=()
begintimes=()
endtimes=()
stages=$(seq 3 3)
for LAST_STAGE in $stages
do
export LAST_STAGE=$LAST_STAGE
################## RUN PREPARATION ######################
# Create a new topic for the metrics of this job
# Do this by generating a UUID and using this as the topicname for the Kafka metrics topic
# as well as the output filename for the output consumer and evaluator
export TOPICNAME=$(uuidgen)
export FLOWTOPIC=flow-$TOPICNAME
export SPEEDTOPIC=speed-$TOPICNAME
cd ../automation_scripts
./create-kafka-topic.sh $TOPICNAME $NUM_PARTITIONS
./create-kafka-topic.sh $FLOWTOPIC $NUM_PARTITIONS
./create-kafka-topic.sh $SPEEDTOPIC $NUM_PARTITIONS
# Add the topic name to an output topic list that will be used later on to start an output consumer and evaluator per topic
topicnames+=("$TOPICNAME")
begintime=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
echo "begintime of $TOPICNAME - $begintime"
begintimes+=($begintime)
sleep 10
cd ../aws_marathon_files
echo "topic $TOPICNAME created"
echo "topic $FLOWTOPIC created"
echo "topic $SPEEDTOPIC created"
################## BENCHMARK RUN ######################
#Start publisher for five minutes
export VOLUME_PER_PUBLISHER=$((($DATA_VOLUME+($PUBLISHER_COUNT-1))/$PUBLISHER_COUNT))
echo "adding publishers"
for PUBLISHER_NB in $(seq 1 $PUBLISHER_COUNT)
do
export PUBLISHER_NB=$PUBLISHER_NB
envsubst < aws-publisher-with-env.json > aws-publisher-without-env-$PUBLISHER_NB.json
dcos marathon app add aws-publisher-without-env-$PUBLISHER_NB.json
done
for PUBLISHER_NB in $(seq 1 $PUBLISHER_COUNT)
do
export PUBLISHER_NB=$PUBLISHER_NB
dcos marathon app start /benchmark/$MODE-publisher-$PUBLISHER_NB
done
# wait 10 min
for k in {1..5}
do
sleep 1m
echo "publisher pubbing for $k minutes"
done
# Spark job deployment
envsubst < spark-submit-with-env.json > spark-submit.json
dcos marathon app add spark-submit.json
sleep 30
# get the hosts of the jmx containers
SPARK_CONTAINER_NAME=($(dcos task spark | awk '{ print $1 }' | grep spark))
SPARK_CONTAINER_IP=($(dcos task spark | awk '{ print $2 }' | grep 10))
export JMX_HOSTS=""
# we also need to monitor spark submit
export NUM_ENTRIES=$(($AMT_WORKERS+1))
for i in $(seq 0 $NUM_ENTRIES)
do
JMX_HOSTS="${JMX_HOSTS}${SPARK_CONTAINER_NAME[$i]}:${SPARK_CONTAINER_IP[$i]},"
done
JMX_HOSTS=${JMX_HOSTS::-1} # remove the last comma
echo $JMX_HOSTS
echo "cadvisor hosts of spark containers"
export CADVISOR_HOSTS=""
for i in $(seq 0 $NUM_ENTRIES)
do
CADVISOR_HOSTS="${CADVISOR_HOSTS}${SPARK_CONTAINER_IP[$i]}:8888,"
done
CADVISOR_HOSTS=${CADVISOR_HOSTS::-1} # remove the last comma
echo $CADVISOR_HOSTS
# Start up the jmx metrics gathering
echo "Start up JMX metrics exporter"
envsubst < jmx-exporter-with-env.json > jmx-exporter-without-env.json
dcos marathon app add jmx-exporter-without-env.json
# wait 5 min
for k in {1..10}
do
sleep 1m
echo "benchmark running for $k minutes"
done
################## END OF RUN ######################
echo "Killing jmx-exporter for stage $LAST_STAGE"
dcos marathon app stop jmx-exporter
dcos marathon app remove jmx-exporter
# kill spark job
dcos marathon app stop spark-submit
dcos marathon app remove spark-submit
endtime=$(date -u +"%Y-%m-%dT%H:%M:%SZ")
echo "endtime of $TOPICNAME - $endtime"
endtimes+=($endtime)
# kill ndw-publisher
echo "Killing ndw-publisher for stage $LAST_STAGE"
for PUBLISHER_NB in $(seq 1 $PUBLISHER_COUNT)
do
export PUBLISHER_NB=$PUBLISHER_NB
dcos marathon app stop /benchmark/$MODE-publisher-$PUBLISHER_NB
dcos marathon app remove /benchmark/$MODE-publisher-$PUBLISHER_NB
done
done
### EVALUATION
echo "all jobs finished: ${topicnames[@]} with begintimes ${begintimes[@]} and endtimes ${endtimes[@]}"
cd ../automation_scripts
for topic in "${topicnames[@]}"; do
echo "starting output consumer for $topic"
./run-output-consumer.sh $FRAMEWORK $MODE "$topic"
done
echo "starting evaluators for topics: ${topicnames[@]}"
for i in "${!topicnames[@]}"; do
echo "starting evaluator for ${topicnames[$i]}"
echo "starting evaluator for begintimes ${begintimes[$i]} and endtimes ${endtimes[$i]}"
./run-evaluator.sh $FRAMEWORK $MODE "${stages[$i]}" "${topicnames[$i]}" ${begintimes[$i]} ${endtimes[$i]} $AMT_WORKERS $WORKER_CPU $WORKER_MEM $SPARK_EXECUTOR_MEMORY
done
sleep 30
./remove-spark-cluster.sh
echo "BENCHMARK FINISHED"
( speaker-test -t sine -f 500 )& pid=$! ; sleep 0.1s ; kill -9 $pid