Skip to content

Commit

Permalink
增加 ConcurrentJobsDemo 代码
Browse files Browse the repository at this point in the history
  • Loading branch information
lw-lin authored Aug 10, 2016
1 parent d1b4e7d commit 7f9fa7c
Showing 1 changed file with 50 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,56 @@ Spark Core 的 Job, Stage, Task 就是我们“日常”谈论 Spark 任务时
</tr>
</table>

## 附录

```scala
import java.util.concurrent.{Executors, TimeUnit}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf

object ConcurrentJobsDemo {

def main(args: Array[String]) {

// 完整代码可见本文最后的附录
val BLOCK_INTERVAL = 1 // in seconds
val BATCH_INTERVAL = 5 // in seconds
val CURRENT_JOBS = 10

val conf = new SparkConf()
conf.setAppName(this.getClass.getSimpleName)
conf.setMaster("local[2]")
conf.set("spark.streaming.blockInterval", s"${BLOCK_INTERVAL}s")
conf.set("spark.streaming.concurrentJobs", s"${CURRENT_JOBS}")
val ssc = new StreamingContext(conf, Seconds(BATCH_INTERVAL))

// DStream DAG 定义开始
val inputStream = ssc.receiverStream(new MyReceiver)
inputStream.foreachRDD(_ => Thread.sleep(Int.MaxValue)) // output 1
inputStream.foreachRDD(_ => Thread.sleep(Int.MaxValue)) // output 2
// DStream DAG 定义结束

ssc.start()
ssc.awaitTermination()
}

class MyReceiver() extends Receiver[String](StorageLevel.MEMORY_ONLY) {

override def onStart() {
// invoke store("str") every 100ms
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(new Runnable {
override def run(): Unit = store("str")
}, 0, 10, TimeUnit.MILLISECONDS)
}

override def onStop() {}
}

}
```

<br/>
<br/>
Expand Down

0 comments on commit 7f9fa7c

Please sign in to comment.