大数据应用 spark streaming消费flume采集的kafka数据Directf方式
沉沙 2018-09-27 来源 : 阅读 1604 评论 0

摘要:也这样大数据应用 spark streaming消费flume采集的kafka数据Directf方式

也这样大数据应用 spark streaming消费flume采集的kafka数据Directf方式

<

一、基本背景

    Spark-Streaming获取kafka数据的两种方式Receiver与Direct的方式,本文介绍Direct的方式。具体的流程是这样的:

   1、Direct方式是直接连接到kafka的节点上获取数据了。

   2、基于Direct的方式:周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。

   3、当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

   这种方式有如下优点:

   1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。;

   2、高性能:不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复;

   3、一次且仅一次的事务机制:Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。

   Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

   二、配置文件及编码

     flume版本:1.6.0,此版本直接支持到kafka,不用在单独安装插件。

     kafka版本2.10-0.8.2.1,必须是0.8.2.1,刚开始我用的是0.10,结果出现了下

      四、各类错误大全的第2个错误。

     spark版本:1.6.1。

     

    

      kafka配文件:producer.properties,红色文字为特别要注意的配置坑,呵呵

    

#agentsection 

producer.sources= s 

producer.channels= c

producer.sinks= r


#sourcesection

producer.sources.s.type= exec

producer.sources.s.command= tail -f -n+1 /opt/test/test.log

producer.sources.s.channels= c


# Eachsink‘s type must be defined

producer.sinks.r.type= org.apache.flume.plugins.KafkaSink

producer.sinks.r.metadata.broker.list=192.168.0.10:9092

producer.sinks.r.partition.key=0

producer.sinks.r.partitioner.class=org.apache.flume.plugins.SinglePartition

producer.sinks.r.serializer.class=kafka.serializer.StringEncoder

producer.sinks.r.request.required.acks=0

producer.sinks.r.max.message.size=1000000

producer.sinks.r.producer.type=sync

producer.sinks.r.custom.encoding=UTF-8

producer.sinks.r.custom.topic.name=flume2kafka2streaming930

#Specifythe channel the sink should use

producer.sinks.r.channel= c


# Eachchannel‘s type is defined. 

producer.channels.c.type= memory

producer.channels.c.capacity= 1000

producer.channels.c.transactionCapacity= 100


核心代码如下:

 SparkConf conf = SparkConf().setMaster().
              setAppName()
              .setJars(String[] {
                      })Map kafkaParameters = HashMap()kafkaParameters.put()Set topics =  HashSet()topics.add()JavaPairInputDStream lines = KafkaUtils.(jscString.String.StringDecoder.StringDecoder.kafkaParameterstopics)JavaDStream words = lines.flatMap(FlatMapFunction<Tuple2String>() { Iterable (Tuple2 tuple) Exception {
              Arrays.(tuple..split())}
      })JavaPairDStream pairs = words.mapToPair(PairFunction() {

          Tuple2 (String word) Exception {
              Tuple2(word)}
      })JavaPairDStream wordsCount = pairs.reduceByKey(Function2() { Integer (Integer v1Integer v2) Exception {
              v1 + v2}
      })wordsCount.print()jsc.start()jsc.awaitTermination()jsc.close()



    三、启动脚本

启动zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties &

启动kafka broker

bin/kafka-server-start.sh config/server.properties &


创建topic

bin/kafka-topics.sh --create --zookeeper 192.168.0.10:2181 --replication-factor 1 --partitions 1 --topic flume2kafka2streaming930


启动flume

bin/flume-ng agent --conf conf/  -f conf/producer.properties  -n producer -Dflume.root.logger=INFO,console


bin/spark-submit --class com.dt.spark.sparkstreaming.SparkStreamingOnKafkaDirected  --jars /lib/kafka_2.10-0.8.2.1/kafka-clients-0.8.2.1.jar,/lib/kafka_2.10-


0.8.2.1/kafka_2.10-0.8.2.1.jar,/lib/kafka_2.10-0.8.2.1/metrics-core-2.2.0.jar,/lib/spark-1.6.1/spark-streaming-kafka_2.10-1.6.1.jar --master local[5] SparkApps.jar 


echo "hadoop spark hive storm spark hadoop hdfs" >> /opt/test/test.log 

echo "hive storm " >> /opt/test/test.log

echo "hdfs" >> /opt/test/test.log

echo "hadoop spark hive storm spark hadoop hdfs" >> /opt/test/test.log


    输出结果如下:

* 结果如下:
* -------------------------------------------
* Time: 1475282360000 ms
* -------------------------------------------
*(spark,8)
*(storm,4)
*(hdfs,4)
*(hive,4)
*(hadoop,8)



    四、各类错误大全

    1、Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/spark/streaming/kafka/KafkaUtils
        at com.dt.spark.SparkApps.SparkStreaming.SparkStreamingOnKafkaDirected.main

        一概是没有提交jar包,一概会报错,无法执行,一概在submit脚本里添加:

       

bin/spark-submit --class com.dt.spark.sparkstreaming.SparkStreamingOnKafkaDirected  --jars /lib/kafka_2.10-0.8.2.1/kafka-clients-0.8.2.1.jar,/lib/kafka_2.10-


0.8.2.1/kafka_2.10-0.8.2.1.jar,/lib/kafka_2.10-0.8.2.1/metrics-core-2.2.0.jar,/lib/spark-1.6.1/spark-streaming-kafka_2.10-1.6.1.jar --master local[5] SparkApps.jar  

   2、Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker。

         上stackoverflow.com及spark官网查询,这个是因为版本不兼容引起。官网提供的版本:Spark Streaming 1.6.1 is compatible with Kafka 0.8.2.1    

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据应用频道!

本文由 @沉沙 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved