大数据系列-SPARK-STREAMING流数据receiver
package com.test
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.util.Random
//采集器
object SparkStreamingReceiver {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SparkStreamingReceiver").setMaster("local[*]")
val streamingContext = new StreamingContext(sparkConf, Seconds(5))
streamingContext.receiverStream(new CustomReceiver).print()
streamingContext.start()
streamingContext.awaitTermination()
}
class CustomReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {
private var flag = true
override def onStart(): Unit = {
new Thread(() => {
while (flag) {
val message = "message:" + new Random().nextInt(500).toString
store(message)
Thread.sleep(500)
}
}).start()
}
override def onStop(): Unit = {
flag = false
}
}
}
文章评论