大数据应用 flume整合spark streaming详解
沉沙 2018-09-27 来源 : 阅读 1236 评论 0

摘要:本篇教程介绍了大数据应用 flume整合spark streaming详解,希望阅读本篇文章以后大家有所收获,帮助大家对大数据云计算大数据应用的理解更加深入。

本篇教程介绍了大数据应用 flume整合spark streaming详解,希望阅读本篇文章以后大家有所收获,帮助大家对大数据云计算大数据应用的理解更加深入。

<

我的思路是这样的,flume产生数据,然后输出到spark streaming,flume的源数据是netcat(地址:localhost,端口22222),输出是avro(地址:localhost,端口是11111)。Spark streaming的处理是直接输出有几个events。
一、配置文件
Flume 配置文件如下:example5.properties注意要加上a1.sinks.k1.avro.useLocalTimeStamp = true,这一句,否则,总报这样的错误:“ Unable to deliver event. Exception follows.org.apache.flume.EventDeliveryException: java.lang.NullPointerException: Expected timestamp”
a1.channels = c1
a1.sinks = k1
  
a1.sources.r1.type = netcat
a1.sources.r1.bind = 192.168.0.10
a1.sources.r1.port = 22222
a1.sources.r1.channels = c1
  
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
  
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 192.168.0.10
a1.sinks.k1.port = 11111
a1.sinks.k1.avro.useLocalTimeStamp = true


二、编写处理代码

//创建StreamingContext,10秒一个批次
val ssc = new StreamingContext(sparkConf, Seconds(10))

val hostname = args(0)
val port = args(1).toInt
val storageLevel = StorageLevel.MEMORY_ONLY
val flumeStream = FlumeUtils.createStream(ssc, hostname, port)

flumeStream.count().map(cnt => "Received " + cnt + " flume events." ).print()

//开始运行
ssc.start()
//计算完毕退出
ssc.awaitTermination()

ssc.stop()

    这里的一个坑就是老是报找不到FlumeUtils,其实他在spark-examples-1.6.1-hadoop2.6.0.jar这个包里,
我通过源代码加入了包啊,就是不行
val sparkConf = new SparkConf().setAppName("AdClickedStreamingStats")
  .setMaster("local[5]").setJars( List(
  "/lib/spark-1.6.1/spark-streaming-kafka_2.10-1.6.1.jar",
  "/lib/kafka-0.10.0/kafka-clients-0.10.0.1.jar",
  "/lib/kafka-0.10.0/kafka_2.10-0.10.0.1.jar",
  "/lib/spark-1.6.1/spark-streaming_2.10-1.6.1.jar",
  "/lib/kafka-0.10.0/metrics-core-2.2.0.jar",
  "/lib/kafka-0.10.0/zkclient-0.8.jar",
  "/lib/spark-1.6.1/mysql-connector-java-5.1.13-bin.jar",
  "/lib/spark-1.6.1/spark-examples-1.6.1-hadoop2.6.0.jar",
  "/opt/spark-1.5.0-bin-hadoop2.6/SparkApps.jar"))
  
  没办法,霸王硬上弓,还是硬上吧,
  bin/spark-submit --class com.dt.spark.flume.SparkStreamingFlume 
  --jars /lib/spark-1.6.1/spark-examples-1.6.1-hadoop2.6.0.jar  
  --master local[5] SparkApps.jar 192.168.0.10 11111
  
  这些歇菜了吧!
  
三、运行测试
1、先提交在spark中提交,产生11111监听* in/spark-submit --class com.dt.spark.flume.SparkStreamingFlume   --jars /lib/spark-1.6.1/spark-examples-1.6.1-hadoop2.6.0.jar   --master local[5] SparkApps.jar 192.168.0.10 11111否则会连接不上11111端口。
2、flume启动$ bin/flume-ng agent --conf conf --conf-file example5.properties --name a1 -Dflume.root.logger=INFO,console因为avro的方式,会输出到11111端口,然后启动22222端口监听中间有个坑就是报 Unable to create Rpc client using hostname: 192.168.0.10, port: 11111这样的错误,原来是bin/flume-ng agent --conf conf --conf-file conf/example5.properties --name a1 -Dflume.root.logger=INFO,console中的name搞错了
3、触发数据:telnet localhost 22222输入字符串,然后会在flume的控制台出现效果。    

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据应用频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved