生产者&消费者小案例
主要通过Flume进行数据的收集,作为数据的生产者,接入kafka作为消息中间件,用SparkStreaming作为消费者进行数据的消费【前提Linux已准备好Flume+Kafka环境,网上很多教程这里不多赘述,直接进入主要内容】
1、准备Flume的配置文件
apache-flume-1.9.0-bin/options/exec2kafka.conf文件:
## flume-ng agent -n a1 -c options/ -f exec2kafka.conf -Dflume.root.logger=INFO,console
##定义a1的三个组件的名称
a1.sources = r1
a1.sinks = k1
a1.channels = c1
##定义Source的类型
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/baidu.log
##定义Channel的类型
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
##定义Sink的类型
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = baidu
a1.sinks.k1.kafka.bootstrap.servers = 192.168.88.101:9092,192.168.88.102:9092,192.168.88.103:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
##组装source channel sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2、编写SparkStreaming消费者
object KafkaSpark {
def main(args: Array[String]): Unit = {
//创建
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("Hello07Kafka")
val streamingContext = new StreamingContext(sparkConf, Seconds(1))
//配置信息
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "node01:9092,node02:9092,node03:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "yjx_kafka",
"auto.offset.reset" -> "earliest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
//主题
val topics = Array("baidu")
//开始创建Kafka
val linesDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils createDirectStream(streamingContext, PreferConsistent, Subscribe[String, String](topics, kafkaParams))
//打印数据
linesDStream.map(_.value()).foreachRDD(rdd => rdd.foreach(println))
streamingContext.start()
streamingContext.awaitTermination()
}
}
3、启动环境
① 启动zookeeper集群
【1】【2】【3】zkServer.sh start
② 启动kafka集群
#启动kafka集群
【1】【2】【3】kafka-server-start.sh /opt/yjx/kafka_2.12-0.11.0.3/config/server.properties
#创建主题
【1】kafka-topics.sh --zookeeper node01:2181 --create --replication-factor 2 --partitions 3 --topic baidu
③ 启动Flume
#flume启动命令
【1】flume-ng agent -n a1 -f /opt/yjx/apache-flume-1.9.0-bin/options/exec2kafka.conf -Dflume.root.logger=INFO,console
最后启动消费者SparkStreaming代码~
4、进行数据测试
① 通过ping baidu进行测试
#ping结果到baidu.log文件中
ping www.baidu.com >> /root/baidu.log 2>&1 &
② 通过文件追加
[root@node01 ~]# echo helloworld>>/root/baidu.log
[root@node01 ~]# echo helloworld1>>/root/baidu.log
[root@node01 ~]# echo helloworld2>>/root/baidu.log
文章评论