《通信數據分析與實戰》課件-第三章 Spark RDD_第1頁
《通信數據分析與實戰》課件-第三章 Spark RDD_第2頁
《通信數據分析與實戰》課件-第三章 Spark RDD_第3頁
《通信數據分析與實戰》課件-第三章 Spark RDD_第4頁
《通信數據分析與實戰》課件-第三章 Spark RDD_第5頁
已閱讀5頁,還剩71頁未讀 繼續免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

通信數據分析與實戰SparkRDD第三章第1節2知道RDD的作用理解RDD的五大特征學習目標TARGETRDD的概述傳統的MapReduce雖然具有自動容錯、平衡負載和可拓展性的優點,但是其最大缺點是采用非循環式的數據流模型,使得在迭代計算式要進行大量的磁盤IO操作。Spark中的RDD可以很好的解決這一缺點。RDD(ResilientDistributedDataset),即彈性分布式數據集,是一個容錯的、并行的數據結構,可以讓用戶顯式地將數據存儲到磁盤和內存中,并且還能控制數據的分區。對于迭代式計算和交互式數據挖掘,RDD可以將中間計算的數據結果保存在內存中,若是后面需要中間結果參與計算時,則可以直接從內存中讀取,從而可以極大地提高計算速度。RDD是Spark提供的最重要的抽象概念,我們可以將RDD理解為一個分布式存儲在集群中的大型數據集合,不同RDD之間可以通過轉換操作形成依賴關系實現管道化,從而避免了中間結果的I/O操作,提高數據處理的速度和性能。RDD的五大特征

分區列表計算函數依賴其他RDDKV類型分區器優先位置列表RDD的五大特征分區列表每個RDD被分為多個分區(Partitions),這些分區運行在集群中的不同節點,每個分區都會被一個計算任務處理,分區數決定了并行計算的數量,創建RDD時可以指定RDD分區的個數。如果不指定分區數量,當RDD從集合創建時,默認分區數量為該程序所分配到的資源的CPU核數(每個Core可以承載2~4個Partition),如果是從HDFS文件創建,默認為文件的Block數。RDD的五大特征每個分區都有一個計算函數

Spark的RDD的計算函數是以分片為基本單位的,每個RDD都會實現compute函數,對具體的分片進行計算。RDD的五大特征依賴于其他RDD

RDD的每次轉換都會生成一個新的RDD,所以RDD之間就會形成類似于流水線一樣的前后依賴關系。在部分分區數據丟失時,Spark可以通過這個依賴關系重新計算丟失的分區數據,而不是對RDD的所有分區進行重新計算。RDD的五大特征K-V數據類型的RDD分區器當前Spark中實現了兩種類型的分區函數,一個是基于哈希的HashPartitioner,另外一個是基于范圍的RangePartitioner。只有對于(Key,Value)的RDD,才會有Partitioner(分區),非(Key,Value)的RDD的Parititioner的值是None。Partitioner函數不但決定了RDD本身的分區數量,也決定了parentRDDShuffle輸出時的分區數量。RDD的五大特征每個分區都有一個優先位置列表優先位置列表會存儲每個Partition的優先位置,對于一個HDFS文件來說,就是每個Partition塊的位置。按照“移動數據不如移動計算”的理念,Spark在進行任務調度的時候,會盡可能地將計算任務分配到其所要處理數據塊的存儲位置。10小結知道RDD的作用理解RDD的五大特征通信數據分析與實戰SparkRDD第三章第2節12掌握從本地系統創建RDD掌握并行集合創建RDD學習目標TARGETRDD的創建方式從本地-HDFS文件系統加載數據創建通過并行集合創建RDD從文件系統加載數據創建RDDSpark可以從Hadoop支持的任何存儲源中加載數據去創建RDD,包括本地文件系統和HDFS等文件系統。我們通過Spark中的SparkContext對象調用textFile()方法加載數據創建RDD。scala>valtest=sc.textFile("file:///export/data/test.txt")test:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[1]attextFileat<console>:241、從本地文件中加載數據創建RDD從文件系統加載數據創建RDDscala>valtestRDD=sc.textFile("/data/test.txt")testRDD:org.apache.spark.rdd.RDD[String]=/data/test.txtMapPartitionsRDD[1]attextFileat<console>:242、從HDFS中加載數據創建RDD通過并行集合創建RDDscala>valarray=Array(1,2,3,4,5)array:Array[Int]=Array(1,2,3,4,5)scala>valarrRDD=sc.parallelize(array)arrRDD:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[6]atparallelizeat<console>:26Spark可以通過并行集合創建RDD。即從一個已經存在的集合、數組上,通過SparkContext對象調用parallelize()方法創建RDD。17小結掌握從本地系統創建RDD掌握并行集合創建RDD通信數據分析與實戰SparkRDD第三章第3節19知道RDD的處理過程熟悉RDD的轉換算子熟悉RDD的行動算子學習目標TARGETRDD的處理過程Spark用Scala語言實現了RDD的API,程序開發者可以通過調用API對RDD進行操作處理。RDD經過一系列的“轉換”操作,每一次轉換都會產生不同的RDD,以供給下一次“轉換”操作使用,直到最后一個RDD經過“行動”操作才會被真正計算處理,并輸出到外部數據源中,若是中間的數據結果需要復用,則可以進行緩存處理,將數據緩存到內存中。RDD的轉換算子RDD處理過程中的“轉換”操作主要用于根據已有RDD創建新的RDD,每一次通過Transformation算子計算后都會返回一個新RDD,供給下一個轉換算子使用。下面,通過一張表來列舉一些常用轉換算子操作的API,具體如下。RDD的轉換算子下面,我們通過結合具體的示例對這些轉換算子API進行詳細講解。filter(func)操作會篩選出滿足函數func的元素,并返回一個新的數據集。假設,有一個文件test.txt,下面,通過一張圖來描述如何通過filter算子操作,篩選出包含單詞“spark”的元素。test.txtRDD(lines)RDD(linesWithSpark)hadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshihadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshihadoopsparksparkyuminyuanlongpingsparksc.textFile()lines.filter()RDD的轉換算子

通過從test.txt文件中加載數據的方式創建RDD,然后通過filter操作篩選出滿足條件的元素,這些元素組成的集合是一個新的RDD。接下來,通過代碼來進行演示,具體代碼如下:

scala>vallines=sc.textFile("file:///export/data/test.txt")

lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txt

MapPartitionsRDD[1]attextFileat<console>:24

scala>vallinesWithSpark=lines.filter(line=>line.contains("spark"))

linesWithSpark:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[2]at

filterat<console>:25RDD的轉換算子

map(func)操作將每個元素傳遞到函數func中,并將結果返回為一個新的數據集。有一個文件test.txt,接下來,通過一張圖來描述如何通過map算子操作把文件內容拆分成一個個的單詞并封裝在數組對象中,具體過程如下

test.txtRDD(lines)RDD(words)hadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshihadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshiArray(“Hadoop”,“spark”)Array(“tuyouyou”,”yuanshi”)Array(“spark”,”yumin”)Array(“yuanlongping”,“spark”)Array(“zhongnanshan”,“yuanshi”)sc.textFile()lines.map()RDD的轉換算子通過從test.txt文件中加載數據的方式創建RDD,然后通過map操作將文件的每一行內容都拆分成一個個的單詞元素,這些元素組成的集合是一個新的RDD。接下來,通過代碼來進行演示,具體代碼如下:scala>vallines=sc.textFile("file:///export/data/test.txt")lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[4]attextFileat<console>:24

scala>valwords=lines.map(line=>line.split(""))words:org.apache.spark.rdd.RDD[Array[String]]=MapPartitionsRDD[13]amapat<console>:25RDD的轉換算子flatMap(func)與map(func)相似,但是每個輸入的元素都可以映射到0或者多個輸出的結果。有一個文件test.txt,接下來,通過一張圖來描述如何通過flatMap算子操作,把文件內容拆分成一個個的單詞test.txtRDD(lines)RDD(words)hadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshihadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshiArray(“Hadoop”,“spark”)Array(“tuyouyou”,”yuanshi”)Array(“spark”,”yumin”)Array(“yuanlongping”,“spark”)Array(“zhongnanshan”,“yuanshi”)sc.textFile()lines.map()hadoopsparkTuyouyouyuanshiSparkyuminyuanlongpingsparkZhongnanshanyuanshilines.flatmap()lines.flat()RDD的轉換算子通過從test.txt文件中加載數據的方式創建RDD,然后通過flatMap操作將文件的每一行內容都拆分成一個個的單詞元素,這些元素組成的集合是一個新的RDD。接下來,通過代碼來進行演示,具體代碼如下:scala>vallines=sc.textFile("file:///export/data/test.txt")lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[5]attextFileat<console>:24

scala>valwords=lines.flatMap(line=>line.split(""))words:org.apache.spark.rdd.RDD[Array[String]]=MapPartitionsRDD[14]atmapat<console>:25RDD的轉換算子

groupByKey()主要用于(Key,Value)鍵值對的數據集,將具有相同Key的Value進行分組,會返回一個新的(Key,Iterable)形式的數據集。同樣以文件test.txt為例,接下來,通過一張圖來描述如何通過groupByKey算子操作,將文件內容中的所有單詞進行分組.

RDD(words)RDD(groupWords)(“Hadoop”,1)(“spark”,1)(“tuyouyou”,1)(“yuanshi”,1)(“spark”,1)(“yumin”,1)(“yuanlongping”,1)(“spark”,1)(“zhongnanshan”,1)(“yuanshi”,1)(“hadoop”,1)(“spark”,(1,1,1))(“tuyouyou”,1)(“yuanshi”,(1,1))(“yumin”,1)(“yaunlongping”,1)(“zhongnanshan”,1)Words.groupByKey()RDD的轉換算子通過groupByKey操作把(Key,Value)鍵值對類型的RDD,按單詞將單詞出現的次數進行分組,這些元素組成的集合是一個新的RDD。接下來,通過代碼來進行演示,具體代碼如下:scala>vallines=sc.textFile("file:///export/data/test.txt")lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[6]attextFileat<console>:24

scala>valwords=lines.flatMap(line=>line.split("")).map(word=>(word,1))ords:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[15]atmapat<console>:25

scala>valgroupWords=words.groupByKey()groupWords:org.apache.spark.rdd.RDD[(String,Iterable[Int])]=ShuffledRDD[16]atgroupByKeyat<console>:25RDD的轉換算子

reduceByKey()主要用于(Key,Value)鍵值對的數據集,返回的是一個新的(Key,Iterable)形式的數據集,該數據集是每個Key傳遞給函數func進行聚合運算后得到的結果。同樣以文件test.txt,接下來,通過一張圖來描述如何通過reduceByKey算子操作統計單詞出現的次數。RDD(words)RDD(reduceWords)(“hadoop”,1)(“spark”,1)(“tuyouyou”,1)(“yuanshi”,1)(“spark”,1)(“yumin”,1)(“yuanlongping”,1)(“spark”,1)(“zhongnanshan”,1)(“yuanshi”,1)(“hadoop”,1)(“spark”,3)(“tuyouyou”,1)(“yuanshi”,2)(“yumin”,1)(“yaunlongping”,1)(“zhongnanshan”,1)Words.reduceByKey()RDD的轉換算子通過reduceByKey操作把(Key,Value)鍵值對類型的RDD,按單詞Key將單詞出現的次數Value進行聚合,這些元素組成的集合是一個新的RDD。接下來,通過代碼來進行演示,具體代碼如下:scala>vallines=sc.textFile("file:///export/data/test.txt")lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[7]attextFileat<console>:24

scala>valwords=lines.flatMap(line=>line.split("")).map(word=>(word,1))words:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[16]atmapat<console>:25

scala>valreduceWords=words.reduceByKey((a,b)=>a+b)reduceWords:org.apache.spark.rdd.RDD[(String,Int)]=ShuffledRDD[17]atreduceByKeyat<console>:25RDD的行動算子行動算子主要是將在數據集上運行計算后的數值返回到驅動程序,從而觸發真正的計算。下面,通過一張表來列舉一些常用行動算子操作的API,具體如下。RDD的行動算子count()主要用于返回數據集中的元素個數。假設,現有一個arrRdd,如果要統計arrRdd元素的個數,示例代碼如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.count()es0:Long=5RDD的行動算子first()主要用于返回數組的第一個元素。現有一個arrRdd,如果要獲取arrRdd中第一個元素,示例代碼如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.first()res1:Int=1RDD的行動算子

take()主要用于以數組的形式返回數組集中的前n個元素。現有一個arrRdd,如果要獲取arrRdd中的前三個元素,示例代碼如下scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.take(3)res2:Array[Int]=Array(1,2,3)RDD的行動算子reduce()主要用于通過函數func(輸入兩個參數并返回一個值)聚合數據集中的元素。現有一個arrRdd,如果要對arrRdd中的元素進行聚合,示例代碼如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.reduce((a,b)=>a+b)res3:Int=15RDD的行動算子collect()主要用于以數組的形式返回數據集中的所有元素。現有一個rdd,如果希望rdd中的元素以數組的形式輸出,示例代碼如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.collect()res4:Array[Int]=Array(1,2,3,4,5)RDD的行動算子

foreach()主要用于將數據集中的每個元素傳遞到函數func中運行。現有一個arrRdd,如果希望遍歷輸出arrRdd中的元素,示例代碼如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.foreach(x=>println(x))

1

2

3

4

539小結知道RDD的處理過程熟悉RDD的轉換算子熟悉RDD的行動算子通信數據分析與實戰SparkRDD第三章第4節41知道RDD的分區作用熟悉RDD的分區方式了解RDD的自定義分區學習目標TARGETRDD的分區在分布式程序中,網絡通信的開銷是很大的,因此控制數據分布以獲得最少的網絡傳輸可以極大的提升程序的整體性能,Spark程序可以通過控制RDD分區方式來減少通信開銷。Spark中所有的RDD都可以進行分區,系統會根據一個針對鍵的函數對元素進行分區。雖然Spark不能控制每個鍵具體劃分到哪個節點上,但是可以確保相同的鍵出現在同一個分區上。RDD分區的作用RDD的分區RDD分區的默認數目Mesos模式RDD的分區原則是分區的個數盡量等于集群中的CPU核心(Core)數目。對于不同的Spark部署模式而言,都可以通過設置spark.default.parallelism這個參數值來配置默認的分區數目。默認為本地機器的CPU數目,若設置了local[N],則默認為NLocal模式Standalone/yarn模式默認的分區數是8。在“集群中所有CPU核數總和”和“2”這兩者中取較大值作為默認值RDD的分區RDD分區的方式Spark框架為RDD提供了兩種分區方式,分別是哈希分區(HashPartitioner)和范圍分區(RangePartitioner)。哈希分區是根據哈希值進行分區;范圍分區是將一定范圍的數據映射到一個分區中。這兩種分區方式已經可以滿足大多數應用場景的需求。與此同時,Spark也支持自定義分區方式,即通過一個自定義的Partitioner對象來控制RDD的分區,從而進一步減少通信開銷。RDD的分區RDD分區的方式需要注意的是,RDD的分區函數是針對(Key,Value)類型的RDD,分區函數根據Key對RDD元素進行分區。因此,當需要對一些非(Key,Value)類型的RDD進行自定義分區時,需要先把RDD元素轉換為(Key,Value)類型,再通過分區函數進行分區操作。如果想要實現自定義分區,就需要定義一個類,使得這個自定義的類繼承org.apache.spark.Partitioner類,并實現其中的3個方法,具體如下:(1).defnumPartitions:Int:用于返回創建的分區個數。(2).defgetPartition(Key:Any):用于對輸入的Key做處理,并返回該Key的分區ID,分區ID的范圍是0~numPartitions-1。(3).equals(other:Any):用于Spark判斷自定義的Partitioner對象和其他的Partitioner對象是否相同,從而判斷兩個RDD的分區方式是否相同。46小結知道RDD的分區作用熟悉RDD的分區方式了解RDD的自定義分區通信數據分析與實戰SparkRDD第三章第5節48知道RDD的窄依賴關系知道RDD的寬依賴關系學習目標TARGETRDD的依賴關系寬依賴窄依賴RDD的依賴關系窄依賴是指父RDD的每一個分區最多被一個子RDD的分區使用,即OneToOneDependencies。窄依賴的表現一般分為兩類,第一類表現為一個父RDD的分區對應于一個子RDD的分區;第二類表現為多個父RDD的分區對應于一個子RDD的分區。一個父RDD的一個分區不可能對應一個子RDD的多個分區。為了便于理解,我們通常把窄依賴形象的比喻為獨生子女。窄依賴RDD的依賴關系RDD做map、filter和union算子操作時,是屬于窄依賴的第一類表現;而RDD做join算子操作(對輸入進行協同劃分)時,是屬于窄依賴表現的第二類。輸入協同劃分是指多個父RDD的某一個分區的所有Key,被劃分到子RDD的同一分區。當子RDD做算子操作,因為某個分區操作失敗導致數據丟失時,只需要重新對父RDD中對應的分區做算子操作即可恢復數據。窄依賴RDD的依賴關系寬依賴是指子RDD的每一個分區都會使用所有父RDD的所有分區或多個分區,即OneToManyDependecies。為了便于理解,我們通常把寬依賴形象的比喻為超生。寬依賴父RDD做groupByKey和join(輸入未協同劃分)算子操作時,子RDD的每一個分區都會依賴于所有父RDD的所有分區。當子RDD做算子操作,因為某個分區操作失敗導致數據丟失時,則需要重新對父RDD中的所有分區進行算子操作才能恢復數據。RDD的依賴關系Join算子操作既可以屬于窄依賴,也可以屬于寬依賴.

當join算子操作后,分區數量沒有變化則為窄依賴(如joinwithinputsco-partitioned,輸入協同劃分)當join算子操作后,分區數量發生變化則為寬依賴(如joinwithinputsnotco-partitioned,輸入非協同劃分)寬窄依賴的注意點54小結知道RDD的窄依賴關系知道RDD的寬依賴關系通信數據分析與實戰SparkRDD第三章第6節56熟悉RDD的持久化機制知道RDD的容錯機制學習目標TARGETRDD的機制容錯機制持久化機制RDD的機制在Spark中,RDD是采用惰性求值,即每次調用行動算子操作,都會從頭開始計算,這對迭代計算來說代價很大,因為迭代計算經常需要多次重復的使用同一組數據集,所以為了避免重復計算的開銷,讓Spark對數據集進行持久化操作。RDD的持久化操作有兩種方法,分別是cache()方法和persist()方法。persist()方法的存儲級別是通過StorageLevel對象設置的。cache()方法的存儲級別是使用默認的存儲級別(即StorageLevel.MEMORY_ONLY)。持久化機制RDD的機制持久化機制RDD的機制持久化機制使用persist()方法對RDD進行持久化RDD的機制持久化機制使用cache()方法對RDD進行持久化RDD的機制容錯機制持久化機制RDD的機制容錯機制當Spark集群中的某一個節點由于宕機導致數據丟失,則可以通過Spark中的RDD進行容錯恢復已經丟失的數據。RDD提供了兩種故障恢復的方式,分別是血統(Lineage)方式和設置檢查點(checkpoint)方式。RDD的機制容錯機制血統方式(Lineage)根據RDD之間依賴關系對丟失數據的RDD進行數據恢復。若丟失數據的子RDD進行窄依賴運算,則只需要把丟失數據的父RDD的對應分區進行重新計算,不依賴其他節點,并且在計算過程中不存在冗余計算;若丟失數據的RDD進行寬依賴運算,則需要父RDD所有分區都要進行從頭到尾計算,計算過程中存在冗余計算。RDD的機制容錯機制設置檢查點(checkPoint)方式本質是將RDD寫入磁盤存儲。當RDD進行寬依賴運算時,只要在中間階段設置一個進行檢查點容錯,即Spark中的sparkContext調用setCheckpoint()方法,設置容錯文件系統目錄作為檢查點checkpoint,將checkpoint的數據寫入之前設置的容錯文件系統中進行持久化存儲,若后面有節點宕機導致分區數據丟失,則以從做

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論