沉沙
2018-09-27
来源 :
阅读 1236
评论 0
摘要:本篇教程介绍了大数据应用 flume整合spark streaming详解,希望阅读本篇文章以后大家有所收获,帮助大家对大数据云计算大数据应用的理解更加深入。
本篇教程介绍了大数据应用 flume整合spark streaming详解,希望阅读本篇文章以后大家有所收获,帮助大家对大数据云计算大数据应用的理解更加深入。
<
我的思路是这样的,flume产生数据,然后输出到spark streaming,flume的源数据是netcat(地址:localhost,端口22222),输出是avro(地址:localhost,端口是11111)。Spark streaming的处理是直接输出有几个events。
一、配置文件
Flume 配置文件如下:example5.properties注意要加上a1.sinks.k1.avro.useLocalTimeStamp = true,这一句,否则,总报这样的错误:“ Unable to deliver event. Exception follows.org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp”
a1.channels = c1
a1.sinks = k1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.0.10
a1.sources.r1.port = 22222
a1.sources.r1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 192.168.0.10
a1.sinks.k1.port = 11111
a1.sinks.k1.avro.useLocalTimeStamp = true
二、编写处理代码
//创建StreamingContext,10秒一个批次
val ssc = new StreamingContext(sparkConf, Seconds(10))
val hostname = args(0)
val port = args(1).toInt
val storageLevel = StorageLevel.MEMORY_ONLY
val flumeStream = FlumeUtils.createStream(ssc, hostname, port)
flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()
//开始运行
ssc.start()
//计算完毕退出
ssc.awaitTermination()
ssc.stop()
这里的一个坑就是老是报找不到FlumeUtils,其实他在spark-examples-1.6.1-hadoop2.6.0.jar这个包里,
我通过源代码加入了包啊,就是不行
val sparkConf = new SparkConf().setAppName("AdClickedStreamingStats")
.setMaster("local[5]").setJars( List(
"/lib/spark-1.6.1/spark-streaming-kafka_2.10-1.6.1.jar",
"/lib/kafka-0.10.0/kafka-clients-0.10.0.1.jar",
"/lib/kafka-0.10.0/kafka_2.10-0.10.0.1.jar",
"/lib/spark-1.6.1/spark-streaming_2.10-1.6.1.jar",
"/lib/kafka-0.10.0/metrics-core-2.2.0.jar",
"/lib/kafka-0.10.0/zkclient-0.8.jar",
"/lib/spark-1.6.1/mysql-connector-java-5.1.13-bin.jar",
"/lib/spark-1.6.1/spark-examples-1.6.1-hadoop2.6.0.jar",
"/opt/spark-1.5.0-bin-hadoop2.6/SparkApps.jar"))
没办法,霸王硬上弓,还是硬上吧,
bin/spark-submit --class com.dt.spark.flume.SparkStreamingFlume
--jars /lib/spark-1.6.1/spark-examples-1.6.1-hadoop2.6.0.jar
--master local[5] SparkApps.jar 192.168.0.10 11111
这些歇菜了吧!
三、运行测试
1、先提交在spark中提交,产生11111监听* in/spark-submit --class com.dt.spark.flume.SparkStreamingFlume --jars /lib/spark-1.6.1/spark-examples-1.6.1-hadoop2.6.0.jar --master local[5] SparkApps.jar 192.168.0.10 11111否则会连接不上11111端口。
2、flume启动$ bin/flume-ng agent --conf conf --conf-file example5.properties --name a1 -Dflume.root.logger=INFO,console因为avro的方式,会输出到11111端口,然后启动22222端口监听中间有个坑就是报 Unable to create Rpc client using hostname: 192.168.0.10, port: 11111这样的错误,原来是bin/flume-ng agent --conf conf --conf-file conf/example5.properties --name a1 -Dflume.root.logger=INFO,console中的name搞错了
3、触发数据:telnet localhost 22222输入字符串,然后会在flume的控制台出现效果。
本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据应用频道!
喜欢 | 0
不喜欢 | 0
您输入的评论内容中包含违禁敏感词
我知道了

请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号