Spark大數(shù)據(jù)分析與實戰(zhàn)(第二版) 項目3 教案_第1頁
Spark大數(shù)據(jù)分析與實戰(zhàn)(第二版) 項目3 教案_第2頁
Spark大數(shù)據(jù)分析與實戰(zhàn)(第二版) 項目3 教案_第3頁
Spark大數(shù)據(jù)分析與實戰(zhàn)(第二版) 項目3 教案_第4頁
Spark大數(shù)據(jù)分析與實戰(zhàn)(第二版) 項目3 教案_第5頁
已閱讀5頁,還剩6頁未讀 繼續(xù)免費閱讀

下載本文檔

版權(quán)說明:本文檔由用戶提供并上傳,收益歸屬內(nèi)容提供方,若內(nèi)容存在侵權(quán),請進行舉報或認領(lǐng)

文檔簡介

課程名稱Spark大數(shù)據(jù)分析選用教材Spark大數(shù)據(jù)分析與實戰(zhàn)(第2版)出版社西安電子科技大學出版社章節(jié)項目3SparkRDD分析交通違章記錄教學內(nèi)容借助成熟的SparkRDD技術(shù),分析交通違章記錄文件中的數(shù)據(jù)。授課學時授課班級****專業(yè)*****班授課日期授課地點教學目標了解RDD的特性及運算的原理,了解RDD的執(zhí)行流程;熟悉各種數(shù)據(jù)源創(chuàng)建RDD的算子,多種方法查看RDD的元素(2)熟練使用算子完成RDD的轉(zhuǎn)換、排序、過濾、去重等操作;(3)能夠完成鍵值對RDD的生成、轉(zhuǎn)換等操作;(4)根據(jù)業(yè)務需求,能將RDD中數(shù)據(jù)輸出到文件系統(tǒng)中。重點難點RDD的生成(內(nèi)存數(shù)據(jù)、文件等生成)RDD的map、filter、sortBy等常用算子;鍵值對RDD的key、value相關(guān)操作,鍵值對RDD排序等;兩個RDD的相關(guān)操作:join、union、zip等。教學方法R講授£討論或座談£問題導向?qū)W習£分組合作學習£案例教學£任務驅(qū)動R項目教學£情景教學£演示匯報£實踐教學£參觀訪問£引導文教學£其他(--)教學準備(教師)教材:《Spark大數(shù)據(jù)分析與實戰(zhàn)(第2版)》硬件設(shè)備:內(nèi)存8G(或以上)的計算機(2)教學資源:課件PPT、教學日歷、相關(guān)軟件等教學準備(學生)教材:《Spark大數(shù)據(jù)分析與實戰(zhàn)(第2版)》硬件設(shè)備:內(nèi)存8G(或以上)的計算機(3)教學資源:課件PPT、相關(guān)軟件等教學環(huán)節(jié)教學內(nèi)容與過程(教學內(nèi)容、教學方法、組織形式、教學手段)課前組織教師通過課程教學平臺或班級群發(fā)布學習預習任務及課程資源;學生提前預習相關(guān)內(nèi)容,并完成課前自測等。課程內(nèi)容描述任務3.1根據(jù)交通違章數(shù)據(jù)創(chuàng)建RDD認識RDDRDD就是一個分布在集群多節(jié)點中存放數(shù)據(jù)的集合;雖然一個數(shù)據(jù)集分散于集群多個節(jié)點,但邏輯上仍然是一個整體(即RDD),數(shù)據(jù)處理人員只需對這個整體進行處理,而無需關(guān)注底層邏輯與實現(xiàn)方法,從而極大降低了大數(shù)據(jù)編程的難度。其計算流程如下:內(nèi)存數(shù)據(jù)創(chuàng)建RDD針對程序中已有的數(shù)據(jù)集合(List、Array、Tuple等),Spark提供了兩個方法:parallelize和makeRDD,它們均可復制數(shù)據(jù)集合的元素后,創(chuàng)建一個可并行計算的分布式數(shù)據(jù)集RDD。parallelize方式適用于做簡單的Spark程序測試、Spark學習;下面演示根據(jù)列表數(shù)據(jù)創(chuàng)建RDD:scala>valnums=List(1,2,3,4,5)//包含5個整數(shù)的列表nums:List[Int]=List(1,2,3,4,5)scala>valnumsRDD=sc.parallelize(nums)//根據(jù)列表nums,創(chuàng)建一個RDD(numsRDD)numsRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[1]atparallelizeat<console>:26scala>valcars=Array("比亞迪","長安","奇瑞","廣汽")cars:Array[String]=Array(比亞迪,長安,奇瑞,廣汽)scala>valcarsRDD=sc.parallelize(cars)//根據(jù)數(shù)組cars,創(chuàng)建一個RDD(carsRDD)carsRDD:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[2]atparallelizeat<console>:26外部文件創(chuàng)建RDD由文件創(chuàng)建RDD,采用sc.textFile(“文件路徑”)方式,路徑前面需要加入“file://”以表示本地文件(Spark-shell環(huán)境下,要求所有節(jié)點的相同位置均保存該文件)。現(xiàn)有本地文件“/home/hadoop/data/guide.txt”,借助textFile()方法,可以生成RDD,演示代碼如下:scala>valfileRDD=sc.textFile("file:///home/hadoop/data/guide.txt")//注意路徑的寫法fileRDD:org.apache.spark.rdd.RDD[String]=file:///home/hadoop/data/guide.txtMapPartitionsRDD[11]attextFileat<console>:25scala>fileRDD.count()//使用count方法查看RDD的元素數(shù)量,即guide.txt文件的行數(shù)。res14:Long=4任務3.2找出扣分最高的交通違法條目查看RDD的元素在學習或測試代碼時,為了便于掌控計算過程、及時發(fā)現(xiàn)問題,可以使用collect操作查看RDD內(nèi)元素的值;collect操作會將RDD的所有元素組成一個數(shù)組并返回給Driver端;其用法示例如下:scala>valnums=List(1,2,3,4,5)nums:List[Int]=List(1,2,3,4,5)scala>valnumsRDD=sc.parallelize(nums)//根據(jù)列表nums,創(chuàng)建RDDnumsRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[4]atparallelizeat<console>:26scala>numsRDD.collect()//查看RDD的元素值res5:Array[Int]=Array(1,2,3,4,5)Map操作map操作是最常用的轉(zhuǎn)換操作,該操作接收一個函數(shù)作為參數(shù),進而將RDD中的每個元素作為參數(shù)傳入某個函數(shù),函數(shù)處理完后的返回值組成一個新的RDD;其目的是根據(jù)現(xiàn)有的RDD,經(jīng)過函數(shù)處理,最終得到一個新的RDD。用法示例如下:scala>valdata=List(1,2,3,4,5,6)data:List[Int]=List(1,2,3,4,5,6)scala>valdataRDD=sc.parallelize(data)dataRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[10]atparallelizeat<console>:26scala>valnewDataRDD=dataRDD.map(x=>x*2)newDataRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[11]atmapat<console>:25scala>newDataRDD.collect()res10:Array[Int]=Array(2,4,6,8,10,12)scala>valpeoples=List("tom","jerry","petter","ken")peoples:List[String]=List(tom,jerry,petter,ken)scala>valpeoplesRDD=sc.makeRDD(peoples)peoplesRDD:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[7]atmakeRDDat<console>:26scala>valnewPeoplesRDD=peoplesRDD.map(x=>x.toUpperCase())newPeoplesRDD:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[8]atmapat<console>:25scala>newPeoplesRDD.collect()res8:Array[String]=Array(TOM,JERRY,PETTER,KEN)RDD的排序sortBy操作可以對RDD元素進行排序,并返回排好序的新RDD;sortBy有3個參數(shù),其用法說明如下;defsortBy[K](f:(T)?

K,

ascending:

Boolean

=

true,

numPartitions:

Int

=

this.partitions.length):

\o"org.apache.spark.rdd.RDD"RDD[T]ReturnthisRDDsortedbythegivenkeyfunction參數(shù)1:f:(T)?K,左邊為要排序的RDD的每一個元素,右邊返回要進行排序的值。參數(shù)2:ascending(可選項),升序或降序排列標識,默認為true、升序排列,若要降序排列則需寫false。參數(shù)3:numPartitions(可選項),排序后新RDD的分區(qū)數(shù)量,默認分區(qū)數(shù)量與原RDD相同。針對某個RDD,將RDD的元素數(shù)據(jù)交給“f:(T)?K”函數(shù)進行處理;而后按照函數(shù)運算后的返回值進行排序,默認為升序排列。數(shù)值型RDD的統(tǒng)計對于數(shù)值元素組成的RDD,Spark提供了max、min、sum等若干統(tǒng)計算子,可以完成簡單的統(tǒng)計分析;相關(guān)示例如下:scala>valdata=sc.makeRDD(List(8,10,7,4,1,9,6,3,5,2))data:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]atmakeRDDat<console>:25scala>data.max()//返回RDD中的最大值res9:Int=10scala>data.min()//返回RDD中的最小值res10:Int=1任務3.3查找某車輛的違章記錄1.filter操作filter是一個轉(zhuǎn)換操作,可用于篩選出滿足特定條件元素,返回一個新的RDD;其用法說明如下:deffilter(f:(T)?

Boolean):

\o"org.apache.spark.rdd.RDD"RDD[T]ReturnanewRDDcontainingonlytheelementsthatsatisfyapredicate.其應用示例如下:scala>valnumsRDD=sc.makeRDD(List(3,1,2,9,10,5,8,4,7,6))numsRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[27]atmakeRDDat<console>:25scala>valrdd1=numsRDD.filter(x=>x%2==0)//過濾出偶數(shù)元素,組成一個新RDD并返回rdd1:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[28]atfilterat<console>:25scala>rdd1.collect()res5:Array[Int]=Array(2,10,8,4,6)scala>valtextsRDD=sc.makeRDD(List("IlikeSpark","HelikeHadoop","ShelikeSpark"))textsRDD:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[29]atmakeRDDat<console>:26scala>valrdd2=textsRDD.filter(x=>x.contains("Spark"))//過濾出含有字符串“Spark”的元素rdd2:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[30]atfilterat<console>:25scala>rdd2.collect()res6:Array[String]=Array(IlikeSpark,ShelikeSpark)2.distinct操作RDD的元素可能存在重復情況,當我們需要去掉重復元素時,可以使用distinct方法。scala>valdataRDD=sc.makeRDD(List(3,5,7,3,4,8,5))//dataRDD內(nèi)有重復元素3、5dataRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]atmakeRDDat<console>:25scala>valnewDataRDD=dataRDD.distinct()//去除重復元素newDataRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[3]atdistinctat<console>:25scala>newDataRDD.collect()//檢查是否成功去重res2:Array[Int]=Array(4,8,5,3,7)3.union等操作union方法可將兩個RDD的元素合并為一個新RDD,即得到兩個RDD的并集intersection可以求兩個RDD的交集,即兩個RDD的相同元素類似于數(shù)學中集合的差集運算,可以使用subtract來求兩個RDD的差集cartesian用于求兩個RDD的笛卡爾積,將兩個集合元素組合成一個新的RDD任務3.4查找違章次數(shù)3次以上車輛鍵值對RDD鍵值對RDD(PairRDD)是指每個RDD元素都是(Key,Value)鍵值類型(即二元組);普通RDD里面存儲的數(shù)據(jù)類型是Int、String等,而“鍵值對RDD”里面存儲的數(shù)據(jù)類型是“鍵值對”。下面代碼中,我們首先定義一個列表scores,scores的每個元素為二元組,記錄學生的姓名及考試成績;接下來,使用parallelize方法生成鍵值對RDD(scoresRDD),scoresRDD元素的類型為二元組。scala>valscores=List(("張小帥",84),("孫田",80),("馬莉",92))//scores的元素為二元組,例如("張小帥",84)scores:List[(String,Int)]=List((張小帥,84),(孫田,80),(馬莉,92))scala>valscoresRDD=sc.parallelize(scores)//scoresRDD即為鍵值對RDDscoresRDD:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[0]atparallelizeat<console>:26scala>scoresRDD.collect()//scoresRDD的元素為二元組res2:Array[(String,Int)]=Array((張小帥,84),(孫田,80),(馬莉,92))Lookup查找value鍵值對RDD的元素為(key,value)形式的二元組,keys操作可以獲取鍵值對RDD中所有的key,組成一個新的RDD并返回;values操作會把鍵值對RDD中的所有value返回,形成一個新的RDD;兩個操作的用法示例如下:scala>valdata=List(("Spark",1),("Hadoop",2),("Flink",3),("kafka",4))data:List[(String,Int)]=List((Spark,1),(Hadoop,2),(Flink,3),(kafka,4))scala>valpairRDD=sc.makeRDD(data)pairRDD:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[9]atmakeRDDat<console>:26scala>valkeysRDD=pairRDD.keys//獲取所有的key,組成新RDDkeysRDD:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[10]atkeysat<console>:25scala>keysRDD.collect()res6:Array[String]=Array(Spark,Hadoop,Flink,kafka)scala>valvaluesRDD=pairRDD.values//獲取所有的value,組成新的RDDvaluesRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[12]atvaluesat<console>:25scala>valuesRDD.collect()res7:Array[Int]=Array(1,2,3,4)ByKey相關(guān)操作對于鍵值對RDD,Spark提供了groupByKey、sortByKey、reduceByKey等若干ByKey相關(guān)操作;其中,groupByKey是根據(jù)key值,對value進行分組;用法演示如下:fruits:List[(String,Double)]=List((apple,5.5),(orange,3.0),(apple,8.2),(banana,2.7),(orange,4.2))scala>valfruitsRDD=sc.makeRDD(fruits)fruitsRDD:org.apache.spark.rdd.RDD[(String,Double)]=ParallelCollectionRDD[23]atmakeRDDat<console>:26scala>valgroupedRDD=fruitsRDD.groupByKey()//按照Key,對value進行分組groupedRDD:org.apache.spark.rdd.RDD[(String,Iterable[Double])]=ShuffledRDD[25]atgroupByKeyat<console>:25scala>groupedRDD.collect()res22:Array[(String,Iterable[Double])]=Array((banana,CompactBuffer(2.7)),(orange,CompactBuffer(3.0,4.2)),(apple,CompactBuffer(5.5,8.2)))mapValue操作實際業(yè)務中,可能遇到只對鍵值對RDD的value部分進行處理,而保持value不變的需求;這時,可以使用mapValues(func),它的功能是將RDD元組中的value交給函數(shù)func處理。任務3.5查找累計扣12分以上車輛信息1.zip操作將兩個RDD組合成鍵值對RDD除了使用makeRDD等方式創(chuàng)建鍵值對RDD,還可以使用zip操作(亦稱為“拉鏈操作”)將兩個元素數(shù)量相同、分區(qū)數(shù)相同的普通RDD組合成一個鍵值對RDD。下面代碼中,rdd1由3個元素(分區(qū)數(shù)量默認),rdd2也有3個元素;代碼rdd1.zip(rdd2)將前述兩個RDD組合成一個鍵值對新的RDD。scala>valrdd1=sc.makeRDD(List("東岳","西岳","南岳","北岳","中岳"))rdd1:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[1]atmakeRDDat<console>:25scala>valrdd2=sc.makeRDD(List("泰山","華山","衡山","恒山","嵩山"))rdd2:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[2]atmakeRDDat<console>:25scala>rdd1.zip(rdd2).collect()//rdd1、rdd2組成一個鍵值對RDD,并輸出其元素res3:Array[(String,String)]=Array((東岳,泰山),(西岳,華山),(南岳,衡山),(北岳,恒山),(中岳,嵩山))2join連接兩個RDDjoin概念來自于關(guān)系數(shù)據(jù)庫領(lǐng)域,SparkRDD中的join的類型也包括內(nèi)連接(join)、左外連接(leftOuterJoin)、右外連接(rightOuterJoin)等。其中,join是對于給定的兩個鍵值對RDD(數(shù)據(jù)類型為(K,V1)和(K,V2)),只有兩個RDD中都存在的Key才會被輸出,最終得到一個(K,(V1,V2))類型的RDD;其用法示例如下:scala>rdd1.join(rdd2).collect()res5:Array[(String,(Int,Int))]=Array((tom,(1,5)),(jerry,(2,6)))scala>valrdd3=rdd1.join(rdd2)//rdd1、rdd2中有相同的Key:tom、jerryrdd3:org.apache.spark.rdd.RDD[(String,(Int,Int))]=MapPartitionsRDD[13]atjoinat<console>:263其他連接rightOuterJoin類似于SQL中的右外關(guān)聯(lián)rightouterjoin,根據(jù)兩個RDD的Key進行右連接,返回結(jié)果以右邊(第二個)的RDD為主,關(guān)聯(lián)不上的記錄為空(None值)。leftOuterJoin類似于SQL中的左外關(guān)聯(lián)leftouterjoin,可以根據(jù)兩個RDD的Key進行左連接,返回結(jié)果以左邊(第一個)的RDD為主,關(guān)聯(lián)不上的記錄為空(None值);其用法示例如下:scala>valrdd4=rdd1.rightOuterJoin(rdd2)//兩個RDD左連接rdd4:org.apache.spark.rdd.RDD[(String,(Option[Int],Int))]=MapPa

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯(lián)系上傳者。文件的所有權(quán)益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網(wǎng)頁內(nèi)容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經(jīng)權(quán)益所有人同意不得將文件中的內(nèi)容挪作商業(yè)或盈利用途。
  • 5. 人人文庫網(wǎng)僅提供信息存儲空間,僅對用戶上傳內(nèi)容的表現(xiàn)方式做保護處理,對用戶上傳分享的文檔內(nèi)容本身不做任何修改或編輯,并不能對任何下載內(nèi)容負責。
  • 6. 下載文件中如有侵權(quán)或不適當內(nèi)容,請與我們聯(lián)系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論