當前位置:網站首頁>Spark累加器和廣播變量

Spark累加器和廣播變量

2022-06-24 06:59:48Angryshark_128

累加器

累加器有些類似Redis的計數器,但要比計數器强大,不僅可以用於計數,還可以用來累加求和、累加合並元素等。

假設我們有一個word.txt文本,我們想要統計該文本中單詞“sheep”的行數,我們可以直接讀取文本filter過濾然後計數。

sc.textFile("word.txt").filter(_.contains("sheep")).count()

假設我們想分別統計文本中單詞"sheep""wolf"的行數,如果按照上述方法需要計算兩次

sc.textFile("word.txt").filter(_.contains("sheep")).count()
sc.textFile("word.txt").filter(_.contains("wolf")).count()

如果要分別統計100個單詞的行數,則要計算100次

如果使用累加器,則只需要讀一次即可

val count1=sc.acccumlator(0)
val count2=sc.acccumlator(0)
...

def processLine(line:String):Unit{
    
   if(line.contains("sheep")){
       count1+=1
   }
   
   if(line.contains("wolf")){
       count2+=1
   }
   
   ...
}


sc.textFile("word.txt").foreach(processLine(_))

不僅Int類型可以累加,Long、Double、Collection也可以累加,還可以進行自定義,而且這個變量可以在Spark的WebUI界面看到。

注意:累加器只能在Driver端定義和讀取,不能在Executor端讀取

廣播變量

廣播變量允許緩存一個只讀的變量在每臺機器(worker)上面,而不是每個任務(task)保存一份備份。利用廣播變量能够以一種更有效率的方式將一個大數據量輸入集合的副本分配給每個節點。

廣播變量通過兩個方面提高數據共享效率:

(1)集群中每個節點(物理機器)只有一個副本,默認的閉包是每個任務一個副本;

(2)廣播傳輸是通過BT下載模式實現的,也就是P2P下載,在集群多的情况下,可以極大地提高數據傳輸速率。廣播變量修改後,不會反饋到其他節點。

val list=sc.parallize(0 to 10)
val brdList=sc.broadcast(list)

sc.textFile("test.txt").filter(brdList.value.contains(_.toInt)).foreach(println)

使用時,需注意:

(1)適用於小變量分發,對於動則幾十M的變量,每個任務都發送一次既消耗內存,也浪費時間

(2)廣播變量只能在driver端定義,在Executor端讀取,Executor不能修改

版權聲明
本文為[Angryshark_128]所創,轉載請帶上原文鏈接,感謝
https://cht.chowdera.com/2022/175/202206240050384857.html

隨機推薦