From 7f9fa7cbfed341e78033c0ff688a485d949cac1e Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Wed, 10 Aug 2016 14:05:44 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=20ConcurrentJobsDemo=20?= =?UTF-8?q?=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ..., Job, JobSet \350\257\246\350\247\243.md" | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.1 JobScheduler, Job, JobSet \350\257\246\350\247\243.md" "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.1 JobScheduler, Job, JobSet \350\257\246\350\247\243.md" index 38ac089..b4af47d 100644 --- "a/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.1 JobScheduler, Job, JobSet \350\257\246\350\247\243.md" +++ "b/Spark Streaming \346\272\220\347\240\201\350\247\243\346\236\220\347\263\273\345\210\227/2.1 JobScheduler, Job, JobSet \350\257\246\350\247\243.md" @@ -206,6 +206,56 @@ Spark Core 的 Job, Stage, Task 就是我们“日常”谈论 Spark 任务时 +## 附录 + +```scala +import java.util.concurrent.{Executors, TimeUnit} + +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.SparkConf + +object ConcurrentJobsDemo { + + def main(args: Array[String]) { + + // 完整代码可见本文最后的附录 + val BLOCK_INTERVAL = 1 // in seconds + val BATCH_INTERVAL = 5 // in seconds + val CURRENT_JOBS = 10 + + val conf = new SparkConf() + conf.setAppName(this.getClass.getSimpleName) + conf.setMaster("local[2]") + conf.set("spark.streaming.blockInterval", s"${BLOCK_INTERVAL}s") + conf.set("spark.streaming.concurrentJobs", s"${CURRENT_JOBS}") + val ssc = new StreamingContext(conf, Seconds(BATCH_INTERVAL)) + + // DStream DAG 定义开始 + val inputStream = ssc.receiverStream(new MyReceiver) + inputStream.foreachRDD(_ => Thread.sleep(Int.MaxValue)) // output 1 + inputStream.foreachRDD(_ => Thread.sleep(Int.MaxValue)) // output 2 + // DStream DAG 定义结束 + + ssc.start() + ssc.awaitTermination() + } + + class MyReceiver() extends Receiver[String](StorageLevel.MEMORY_ONLY) { + + override def onStart() { + // invoke store("str") every 100ms + Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable { + override def run(): Unit = store("str") + }, 0, 10, TimeUnit.MILLISECONDS) + } + + override def onStop() {} + } + +} +```