日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時(shí)間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營(yíng)銷解決方案
SparkStreaming與Kafka整合遇到的問題及解決方案

SparkStreaming與Kafka整合遇到的問題及解決方案

作者:佚名 2017-08-03 09:37:35

大數(shù)據(jù)

Kafka

Spark 最近工作中是做日志分析的平臺(tái),采用了sparkstreaming+kafka,采用kafka主要是看中了它對(duì)大數(shù)據(jù)量處理的高性能,處理日志類應(yīng)用再好不過(guò)了,采用了sparkstreaming的流處理框架 主要是考慮到它本身是基于spark核心的,以后的批處理可以一站式服務(wù),并且可以提供準(zhǔn)實(shí)時(shí)服務(wù)到elasticsearch中,可以實(shí)現(xiàn)準(zhǔn)實(shí)時(shí)定位系統(tǒng)日志。

網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)建站!專注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開發(fā)、小程序開發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶創(chuàng)新互聯(lián)還提供了臨滄免費(fèi)建站歡迎大家使用!

前言

最近工作中是做日志分析的平臺(tái),采用了sparkstreaming+kafka,采用kafka主要是看中了它對(duì)大數(shù)據(jù)量處理的高性能,處理日志類應(yīng)用再好不過(guò)了,采用了sparkstreaming的流處理框架 主要是考慮到它本身是基于spark核心的,以后的批處理可以一站式服務(wù),并且可以提供準(zhǔn)實(shí)時(shí)服務(wù)到elasticsearch中,可以實(shí)現(xiàn)準(zhǔn)實(shí)時(shí)定位系統(tǒng)日志。

實(shí)現(xiàn)

Spark-Streaming獲取kafka數(shù)據(jù)的兩種方式-Receiver與direct的方式。

一. 基于Receiver方式

這種方式使用Receiver來(lái)獲取數(shù)據(jù)。Receiver是使用Kafka的高層次Consumer API來(lái)實(shí)現(xiàn)的。receiver從Kafka中獲取的數(shù)據(jù)都是存儲(chǔ)在Spark Executor的內(nèi)存中的,然后Spark Streaming啟動(dòng)的job會(huì)去處理那些數(shù)據(jù)。代碼如下:

  
 
 
 
  1. SparkConf sparkConf = new SparkConf().setAppName("log-etl").setMaster("local[4]"); 
  2.     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); 
  3.     int numThreads = Integer.parseInt("4"); 
  4.     Map topicMap = new HashMap(); 
  5.     topicMap.put("group-45", numThreads); 
  6.      //接收的參數(shù)分別是JavaStreamingConetxt,zookeeper連接地址,groupId,kafak的topic  
  7.     JavaPairReceiverInputDStream messages = 
  8.     KafkaUtils.createStream(jssc, "172.16.206.27:2181,172.16.206.28:2181,172.16.206.29:2181", "1", topicMap); 

剛開始的時(shí)候系統(tǒng)正常運(yùn)行,沒有發(fā)現(xiàn)問題,但是如果系統(tǒng)異常重新啟動(dòng)sparkstreaming程序后,發(fā)現(xiàn)程序會(huì)重復(fù)處理已經(jīng)處理過(guò)的數(shù)據(jù),這種基于receiver的方式,是使用Kafka的高階API來(lái)在ZooKeeper中保存消費(fèi)過(guò)的offset的。這是消費(fèi)Kafka數(shù)據(jù)的傳統(tǒng)方式。這種方式配合著WAL機(jī)制可以保證數(shù)據(jù)零丟失的高可靠性,但是卻無(wú)法保證數(shù)據(jù)被處理一次且僅一次,可能會(huì)處理兩次。因?yàn)镾park和ZooKeeper之間可能是不同步的。官方現(xiàn)在也已經(jīng)不推薦這種整合方式,官網(wǎng)相關(guān)地址 http://spark.apache.org/docs/latest/streaming-kafka-integration.html ,下面我們使用官網(wǎng)推薦的第二種方式kafkaUtils的createDirectStream()方式。

二.基于Direct的方式

這種新的不基于Receiver的直接方式,是在Spark 1.3中引入的,從而能夠確保更加健壯的機(jī)制。替代掉使用Receiver來(lái)接收數(shù)據(jù)后,這種方式會(huì)周期性地查詢Kafka,來(lái)獲得每個(gè)topic+partition的***的offset,從而定義每個(gè)batch的offset的范圍。當(dāng)處理數(shù)據(jù)的job啟動(dòng)時(shí),就會(huì)使用Kafka的簡(jiǎn)單consumer api來(lái)獲取Kafka指定offset范圍的數(shù)據(jù)。

代碼如下:

  
 
 
 
  1. SparkConf sparkConf = new SparkConf().setAppName("log-etl"); 
  2. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2)); 
  3.  
  4. HashSet topicsSet = new HashSet(Arrays.asList(topics.split(","))); 
  5. HashMap kafkaParams = new HashMap(); 
  6. kafkaParams.put("metadata.broker.list", brokers); 
  7. // Create direct kafka stream with brokers and topics 
  8. JavaPairInputDStream messages = KafkaUtils.createDirectStream( 
  9.     jssc, 
  10.     String.class, 
  11.     String.class, 
  12.     StringDecoder.class, 
  13.     StringDecoder.class, 
  14.     kafkaParams, 
  15.     topicsSet 
  16. ); 

這種direct方式的優(yōu)點(diǎn)如下:

1.簡(jiǎn)化并行讀?。喝绻x取多個(gè)partition,不需要?jiǎng)?chuàng)建多個(gè)輸入DStream然后對(duì)它們進(jìn)行union操作。Spark會(huì)創(chuàng)建跟Kafka partition一樣多的RDD partition,并且會(huì)并行從Kafka中讀取數(shù)據(jù)。所以在Kafka partition和RDD partition之間,有一個(gè)一對(duì)一的映射關(guān)系。

2.一次且僅一次的事務(wù)機(jī)制:基于receiver的方式,在spark和zk中通信,很有可能導(dǎo)致數(shù)據(jù)的不一致。

3.高效率:在receiver的情況下,如果要保證數(shù)據(jù)的不丟失,需要開啟wal機(jī)制,這種方式下,為、數(shù)據(jù)實(shí)際上被復(fù)制了兩份,一份在kafka自身的副本中,另外一份要復(fù)制到wal中, direct方式下是不需要副本的。

三.基于Direct方式丟失消息的問題

貌似這種方式很***,但是還是有問題的,當(dāng)業(yè)務(wù)需要重啟sparkstreaming程序的時(shí)候,業(yè)務(wù)日志依然會(huì)打入到kafka中,當(dāng)job重啟后只能從***的offset開始消費(fèi)消息,造成重啟過(guò)程中的消息丟失。kafka中的offset如下圖(使用kafkaManager實(shí)時(shí)監(jiān)控隊(duì)列中的消息):

當(dāng)停止業(yè)務(wù)日志的接受后,先重啟spark程序,但是發(fā)現(xiàn)job并沒有將先前打入到kafka中的數(shù)據(jù)消費(fèi)掉。這是因?yàn)橄]有經(jīng)過(guò)zk,topic的offset也就沒有保存

四.解決消息丟失的處理方案

一般有兩種方式處理這種問題,可以先spark streaming 保存offset,使用spark checkpoint機(jī)制,第二種是程序中自己實(shí)現(xiàn)保存offset邏輯,我比較喜歡第二種方式,以為這種方式可控,所有主動(dòng)權(quán)都在自己手中。

先看下大體流程圖,

  
 
 
 
  1. SparkConf sparkConf = new SparkConf().setMaster("local[2]").setAppName("log-etl"); 
  2.  Set topicSet = new HashSet(); 
  3.         topicSet.add("group-45"); 
  4.         kafkaParam.put("metadata.broker.list", "172.16.206.17:9092,172.16.206.31:9092,172.16.206.32:9092"); 
  5.         kafkaParam.put("group.id", "simple1"); 
  6.  
  7.         // transform java Map to scala immutable.map 
  8.         scala.collection.mutable.Map testMap = JavaConversions.mapAsScalaMap(kafkaParam); 
  9.         scala.collection.immutable.Map scalaKafkaParam = 
  10.                 testMap.toMap(new Predef.$less$colon$less, Tuple2>() { 
  11.                     public Tuple2 apply(Tuple2 v1) { 
  12.                         return v1; 
  13.                     } 
  14.                 }); 
  15.  
  16.         // init KafkaCluster 
  17.         kafkaCluster = new KafkaCluster(scalaKafkaParam); 
  18.  
  19.         scala.collection.mutable.Set mutableTopics = JavaConversions.asScalaSet(topicSet); 
  20.         immutableTopics = mutableTopics.toSet(); 
  21.         scala.collection.immutable.Set topicAndPartitionSet2 = kafkaCluster.getPartitions(immutableTopics).right().get(); 
  22.  
  23.         // kafka direct stream 初始化時(shí)使用的offset數(shù)據(jù) 
  24.         Map consumerOffsetsLong = new HashMap(); 
  25.  
  26.         // 沒有保存offset時(shí)(該group***消費(fèi)時(shí)), 各個(gè)partition offset 默認(rèn)為0 
  27.         if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).isLeft()) { 
  28.  
  29.             System.out.println(kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet2).left().get()); 
  30.  
  31.             Set topicAndPartitionSet1 = JavaConversions.setAsJavaSet((scala.collection.immutable.Set)topicAndPartitionSet2); 
  32.  
  33.             for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) { 
  34.                 consumerOffsetsLong.put(topicAndPartition, 0L); 
  35.             } 
  36.  
  37.         } 
  38.         // offset已存在, 使用保存的offset 
  39.         else { 
  40.  
  41.             scala.collection.immutable.Map consumerOffsetsTemp = kafkaCluster.getConsumerOffsets("simple1", topicAndPartitionSet2).right().get(); 
  42.  
  43.             Map consumerOffsets = JavaConversions.mapAsJavaMap((scala.collection.immutable.Map)consumerOffsetsTemp); 
  44.  
  45.             Set topicAndPartitionSet1 = JavaConversions.setAsJavaSet((scala.collection.immutable.Set)topicAndPartitionSet2); 
  46.  
  47.             for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) { 
  48.                 Long offset = (Long)consumerOffsets.get(topicAndPartition); 
  49.                 consumerOffsetsLong.put(topicAndPartition, offset); 
  50.             } 
  51.  
  52.         } 
  53.  
  54.         JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000)); 
  55.         kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam); 
  56.  
  57.         // create direct stream 
  58.         JavaInputDStream message = KafkaUtils.createDirectStream( 
  59.                 jssc, 
  60.                 String.class, 
  61.                 String.class, 
  62.                 StringDecoder.class, 
  63.                 StringDecoder.class, 
  64.                 String.class, 
  65.                 kafkaParam, 
  66.                 consumerOffsetsLong, 
  67.                 new Function, String>() { 
  68.                     public String call(MessageAndMetadata v1) throws Exception { 
  69.                         System.out.println("接收到的數(shù)據(jù)《《==="+v1.message()); 
  70.                         return v1.message(); 
  71.                     } 
  72.                 } 
  73.         ); 
  74.  
  75.         // 得到rdd各個(gè)分區(qū)對(duì)應(yīng)的offset, 并保存在offsetRanges中 
  76.         final AtomicReference offsetRanges = new AtomicReference(); 
  77.  
  78.         JavaDStream javaDStream = message.transform(new Function, JavaRDD>() { 
  79.             public JavaRDD call(JavaRDD rdd) throws Exception { 
  80.                 OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); 
  81.                 offsetRanges.set(offsets); 
  82.                 return rdd; 
  83.             } 
  84.         }); 
  85.  
  86.         // output 
  87.         javaDStream.foreachRDD(new Function, Void>() { 
  88.  
  89.             public Void call(JavaRDD v1) throws Exception { 
  90.                 if (v1.isEmpty()) return null; 
  91.  
  92.                 List list = v1.collect(); 
  93.                 for(String s:list){ 
  94.                     System.out.println("數(shù)據(jù)==="+s); 
  95.                 } 
  96.  
  97.                 for (OffsetRange o : offsetRanges.get()) { 
  98.  
  99.                     // 封裝topic.partition 與 offset對(duì)應(yīng)關(guān)系 java Map 
  100.                     TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition()); 
  101.                     Map topicAndPartitionObjectMap = new HashMap(); 
  102.                     topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset()); 
  103.  
  104.                     // 轉(zhuǎn)換java map to scala immutable.map 
  105.                     scala.collection.mutable.Map testMap = 
  106.                             JavaConversions.mapAsScalaMap(topicAndPartitionObjectMap); 
  107.                     scala.collection.immutable.Map scalatopicAndPartitionObjectMap = 
  108.                             testMap.toMap(new Predef.$less$colon$less, Tuple2>() { 
  109.                                 public Tuple2 apply(Tuple2 v1) { 
  110.                                     return v1; 
  111.                                 } 
  112.                             }); 
  113.  
  114.                     // 更新offset到kafkaCluster 
  115.                     kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"), scalatopicAndPartitionObjectMap); 
  116.                        System.out.println("原數(shù)據(jù)====》"+o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset() 
  117.                     ); 
  118.                 } 
  119.                 return null; 
  120.             } 
  121.         }); 
  122.  
  123.         jssc.start(); 
  124.         jssc.awaitTermination(); 
  125.     } 

基本使用這種方式就可以解決數(shù)據(jù)丟失的問題。


網(wǎng)站標(biāo)題:SparkStreaming與Kafka整合遇到的問題及解決方案
本文地址:http://www.5511xx.com/article/coddcie.html