




版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
大數據分析與實戰項目3SparkRDD分析交通違章記錄要求使用SparkRDD技術,完成交通違章數據的分析,為相關部門提供各類信息支持。為加強交通管理、減少交通違章行為,某地部署了數百組交通監控設備,用于采集轄區內各類交通違法行為;經數據抽取與整理,得到3張數據表格:違章行為記錄表、車主信息表、違章代碼表。情境導入Spark項目分解Spark序號任務任務說明1根據交通違章數據創建RDD將3個交通違章數據文件(txt格式)上傳到HDFS特定目錄;讀取文件,創建彈性分布式數據集RDD。2找出扣分最多的交通違章條目根據違章代碼表(violation.txt),找出其中扣分最多的違章條目(Top3)。3查找某車輛的違章記錄根據本地違章行為記錄表(record.txt)及鄰市違章行為記錄表(recordCityB.txt),找出某車輛在兩地區的所有違章記錄。4找出違章3次以上車輛統計各車輛的違章次數,找出違章次數大于3次的車牌號,并打印相關信息。5打印累積扣12分以上車輛信息根據違章數據文件,找出交通違章扣12分以上的車牌號;進而結合車主信息表,找出對應的車主姓名、手機號等信息,并模擬發短息提醒。6將處理結果寫入文件整合違章數據,將“違章日期、車牌號、扣分數、罰款金額、違章內容”等5項信息寫入到TSV文件中。掌握RDD元素查看及常見的轉換、排序、過濾、去重等操作。了解RDD原理,熟悉RDD的創建方法。能否根據需要將RDD計算的結果輸出到文件中。123學習目標Spark項目3
SparkRDD分析車輛違章記錄Spark任務1根據交通違章數據創建RDD找出扣分最高的違章條目查找某車輛的違章記錄任務2任務3查找違章3次以上的車輛任務4找出累計扣12分以上的車輛任務5將處理結果存儲到外部文件中任務6任務分析SparkRDD是SparkCore的核心數據抽象,是進行Spark學習的基礎。而使用SparkRDD進行數據分析,首先面臨的問題是如何創建RDD。本任務要求讀取HDFS分布式文件系統中的交通違章數據文件,生成RDD并輸出相關信息。認識RDDSparkRDD(彈性分布式數據集)就是一個分布在集群多節點中存放數據的集合;物理上一個數據集可能分散于集群多個節點,但邏輯上仍然是一個整體(即RDD),數據處理人員只需對這個整體進行處理,而無需關注底層邏輯與實現方法。RDD可以看做是Spark對具體數據的抽象(封裝),本質上是一個只讀的、分區的記錄集合;每個分區都是一個數據集片段,可由一個任務來執行。認識RDDSparkSparkRDD的計算過程可以簡單抽象為:創建RDD(makeRDD)、轉換(Transformation)和行動(Action)3個階段。由內存數據創建RDDSpark針對內存中的數據(List、Array、Tuple等),Spark提供了兩個操作:parallelize和makeRDD,它們創建一個可并行計算的分布式數據集RDD。scala>valnums=List(1,2,3,4,5)//包含5個整數的列表scala>valnumsRDD=sc.parallelize(nums)//創建一個RDDscala>valpeople=List("李白","王之煥","韋應物","杜牧","元慎")scala>valpeopleRDD=sc.makeRDD(people,3)//創建RDD,含3個分區scala>peopleRDD.partitions.size
//查看peoplesRDD的分區數量res3:Int=3由外部存儲生成RDDSpark在生產環境中,通常根據外部存儲的數據文件生成RDD。Spark提供了textFile()方法,它可以讀取外部文件中的數據來創建RDD。scala>valfileRDD=sc.textFile("file:///home/hadoop/data/guide.txt")scala>fileRDD.count()//使用count方法查看RDD的元素數量res14:Long=4scala>valhdfsFileRDD=sc.textFile("hdfs://localhost:9000/user/hadoop/data/guide.txt")相關知識小結SparkRDD是分布式數據集,是分布在多節點數據的抽象;內存數據創建RDD的方法:parallelize和makeRDD;Spark提供了textFile()方法,它可以讀取外部文件中的數據來創建RDD。讀取交通違章數據文件,生成RDD,并查看RDD分區數、元素數量。任務實施項目3
SparkRDD分析車輛違章記錄Spark任務1根據交通違章數據創建RDD找出扣分最高的違章條目查找某車輛的違章記錄任務2任務3查找違章3次以上的車輛任務4找出累計扣12分以上的車輛任務5將處理結果存儲到外部文件中任務6任務分析Spark現有一個文件violation.txt(違章條目對照表),內含違章代碼、違章內容、扣分、罰款、附件處理等,數據之間用tab分割(\t)。將利用違章記錄文件產生RDD,利用多種算子(方法),找出罰款金額最高、扣分最多的交通違章類型。查看RDD的元素Spark為了便于掌控計算過程、及時發現問題,可以使用collect操作查看RDD內元素的值;collect操作會將RDD的所有元素組成一個數組并返回給Driver端。scala>valnums=List(1,2,3,4,5)scala>valnumsRDD=sc.parallelize(nums)//根據列表nums,創建RDDscala>numsRDD.collect()//查看RDD的元素值res5:Array[Int]=Array(1,2,3,4,5)查看RDD的元素Spark可以使用take方法查看RDD的前N個元素,first操作查看RDD的第一個元素值。scala>numsRDD.take(3)//獲取前3個元素,并返回一個數組Arrayres3:Array[Int]=Array(1,2,3)scala>numsRDD.first()//獲取RDD的第一個元素值res6:Int=1map與flatMap操作Sparkmap操作接收一個函數作為參數,進而將RDD中的每個元素作為參數傳入某個函數,函數處理完后的返回值組成一個新的RDD。scala>valdata=List(1,2,3,4,5,6)scala>valdataRDD=sc.parallelize(data)scala>valnewDataRDD=dataRDD.map(x=>x*2)//dataRDD元素乘以2scala>newDataRDD.collect()res10:Array[Int]=Array(2,4,6,8,10,12)map與flatMap操作Spark示例:用RDD中存儲學生的姓名、年齡信息,使用map操作將其年齡加1,并為每個學生設置一個郵箱(姓名@)scala>valstudents=List(("Tom",20),("Jerry",18))//列表中嵌套元組scala>valstudentRDD=sc.makeRDD(students)//根據列表students創建RDDscala>valstudentRDD2=studentRDD.map(x=>(x._1,x._2+1,x._1+"@"))scala>studentRDD2.collect()res2:Array[(String,Int,String)]=Array((Tom,21,Tom@),(Jerry,19,Jerry@))map與flatMap操作Sparkflatmap是將函數應用于RDD中的每個元素,而后展平結果(去掉嵌套),最終得到一個新的RDD;scala>valtext=List("IlikeSpark","HelikesSpark","ShelikesSparkandHadoop")scala>valtextRDD=sc.makeRDD(text)scala>valrdd1=textRDD.map(x=>x.split(""))scala>rdd1.collect()res8:Array[Array[String]]=Array(Array(I,like,Spark),Array(He,likes,Spark),Array(She,likes,Spark,and,Hadoop))scala>valrdd2=textRDD.flatMap(x=>x.split(""))scala>rdd2.collect()res9:Array[String]=Array(I,like,Spark,He,likes,Spark,She,likes,Spark,and,Hadoop)sortBy排序操作SparksortBy操作可以對RDD元素進行排序,并返回排好序的新RDD。scala>valnumsRDD=sc.makeRDD(List(3,1,2,9,10,5,8,4,7,6))scala>valnewNumsRDD=numsRDD.sortBy(x=>x,false)scala>newNumsRDD.collect()res3:Array[Int]=Array(10,9,8,7,6,5,4,3,2,1)sortBy排序操作Sparkscala>valstudents=List(("Tom",20),("Jerry",19),("Bob",22),("Ken",21))//列表students的元素為元組scala>valstudentsRDD=sc.makeRDD(students)scala>valnewStudentsRDD=studentsRDD.sortBy(x=>x._2,true)//根據元素(元組)的第2個值升序排列scala>newStudentsRDD.collect()res4:Array[(String,Int)]=Array((Jerry,19),(Tom,20),(Ken,21),(Bob,22))數值型RDD的統計操作Sparkscala>valdata=sc.makeRDD(List(8,10,7,4,1,9,6,3,5,2))scala>data.max()//返回RDD中的最大值res9:Int=10scala>data.min()//返回RDD中的最小值res10:Int=1scala>data.mean()//返回RDD元素的平均值res11:Double=5.5查看RDD元素:collect、take、first等方法;map、flatMap方法可以改變RDD的元素值,產生新的RDD;RDD元素排序:sortBy方法Spark提供了max、min、sum等若干統計算子。Spark綜合利用本任務中的知識儲備,根據違章條目對照表,找出罰款金額最高、扣分最多的交通違章類型。任務實施相關知識小結Spark項目3
SparkRDD分析車輛違章記錄Spark任務1根據交通違章數據創建RDD找出扣分最高的違章條目查找某車輛的違章記錄任務2任務3查找違章3次以上的車輛任務4找出累計扣12分以上的車輛任務5將處理結果存儲到外部文件中任務6任務分析Sparkrecords.txt文件記錄了本市車輛違章信息(包括:日期、監控設備編號、車牌號、違章類型代碼),recordsCityB.txt記錄相鄰的B城市車輛違章信息。根據有關部門要求,需要查找車輛MU0066在本地及臨市B的交通違章記錄。filter操作過濾RDD的元素Sparkfilter是一個轉換操作,可用于篩選出滿足特定條件元素,返回一個新的RDDscala>valnumsRDD=sc.makeRDD(List(3,1,2,9,10,5,8,4,7,6))scala>valrdd1=numsRDD.filter(x=>x%2==0)//過濾出偶數元素,組成一個新RDD并返回scala>rdd1.collect()res5:Array[Int]=Array(2,10,8,4,6)計算兩個RDD的并集、交集與差集Spark使用union方法,合并兩個RDD的元素,得到一個新的RDD。scala>valrdd1=sc.makeRDD(List(1,2,3))scala>valrdd2=sc.makeRDD(List(3,4,5))scala>valrdd3=rdd1.union(rdd2)//合并兩個RDDscala>rdd3.collect()//合并后的RDD有重復元素“3”res8:Array[Int]=Array(1,2,3,3,4,5)計算兩個RDD的并集、交集與差集Sparkintersection可以求兩個RDD的交集,即兩個RDD的相同元素。scala>valrdd1=sc.makeRDD(List(1,2,3,4,5))scala>valrdd2=sc.makeRDD(List(4,5,6,7,8))scala>valrdd3=rdd1.intersection(rdd2)scala>rdd3.collect()res9:Array[Int]=Array(4,5)計算兩個RDD的并集、交集與差集Spark類似于數學中集合的差集運算,可以使用subtract操作求兩個RDD的差集。scala>valrdd1=sc.makeRDD(List(1,2,3,4,5))scala>valrdd2=sc.makeRDD(List(4,5,6,7,8))scala>valrdd3=rdd1.subtract(rdd2)//在rdd1中、但不在rdd2中的元素scala>rdd3.collect()res12:Array[Int]=Array(1,2,3)filter方法過濾符合條件的RDD元素;distinct方法去除RDD中的重復元素;兩個RDD可以執行交集、并集、差集等運算。Spark綜合利用本任務中的知識儲備,讀取兩地的車輛違章信息文件,需要查找車輛MU0066在本地及臨市B的交通違章記錄。任務實施相關知識小結Spark項目3
SparkRDD分析車輛違章記錄Spark任務1根據交通違章數據創建RDD找出扣分最高的違章條目查找某車輛的違章記錄任務2任務3查找違章3次以上的車輛任務4找出累計扣12分以上的車輛任務5將處理結果存儲到外部文件中任務6任務分析Spark根據交通安全檢查工作需要,查找本市違章記錄數據(records.txt)中,違章次數3次以上車輛予以重點關注。鍵值對RDDSpark所謂鍵值對RDD(PairRDD)是指每個RDD元素都是(Key,Value)鍵值類型(即二元組)。scala>valscores=List(("張小帥",84),("孫田",80),("馬莉",92))scala>valscoresRDD=sc.parallelize(scores)//生成鍵值對RDDscala>scoresRDD.collect()//scoresRDD的元素為二元組res2:Array[(String,Int)]=Array((張小帥,84),(孫田,80),(馬莉,92))鍵值對RDDSpark普通RDD轉為鍵值對RDDscala>valrdd1=sc.makeRDD(List("apple","grape","banana","watermelon"))scala>valpairRDD1=rdd1.map(x=>(x,x.length()))scala>pairRDD1.collect()res3:Array[(String,Int)]=Array((apple,5),(grape,5),(banana,6),(watermelon,10))keys、values操作得到一個新RDDSpark鍵值對RDD的元素為(key,value)形式的二元組,keys操作可以獲取鍵值對RDD中所有的key,組成一個新的RDD并返回;values操作會把鍵值對RDD中的所有value返回,形成一個新的RDDscala>valdata=List(("Spark",1),("Hadoop",2),("Flink",3),("kafka",4))scala>valpairRDD=sc.makeRDD(data)scala>valkeysRDD=pairRDD.keys//獲取所有的key,組成新RDDscala>keysRDD.collect()res6:Array[String]=Array(Spark,Hadoop,Flink,kafka)ByKey相關的操作Spark對于鍵值對RDD,Spark提供了groupByKey、sortByKey、reduceByKey等若干ByKey相關操作;scala>valfruits=List((apple,5.5),(orange,3.0),(apple,8.2),(banana,2.7),(orange,4.2))scala>valfruitsRDD=sc.makeRDD(fruits)scala>valgroupedRDD=fruitsRDD.groupByKey()//按照Key,對value進行分組scala>groupedRDD.collect()res22:Array[(String,Iterable[Double])]=Array((banana,CompactBuffer(2.7)),(orange,CompactBuffer(3.0,4.2)),(apple,CompactBuffer(5.5,8.2)))ByKey相關的操作SparksortByKey是根據key進行排序,即返回一個根據鍵排序的RDDscala>valpeoples=List((20,"Tom"),(18,"Jerry"),(21,"Bob"),(17,"Ben"))scala>valpeoplesRDD=sc.makeRDD(peoples)scala>valsortedRDD=peoplesRDD.sortByKey()//按照key進行排序scala>sortedRDD.collect()res23:Array[(Int,String)]=Array((17,Ben),(18,Jerry),(20,Tom),(21,Bob))ByKey相關的操作SparkreduceByKey(func)是根據key進行分組,使用func函數聚合同組內的value值,返回一個新RDDscala>valfruits=List(("apple",5.5),("orange",3.0),("apple",8.2),("banana",2.7),("orange",4.2))scala>valfruitsRDD=sc.makeRDD(fruits)scala>valreducedRDD=fruitsRDD.reduceByKey((a,b)=>a+b)scala>reducedRDD.collect()//reducedRDD元素為二元組res21:Array[(String,Double)]=Array((banana,2.7),(orange,7.2),(apple,13.7))mapValues對value進行處理SparkmapValues(func)功能是將RDD元組中的value交給函數func處理,而key不變scala>valfruits=List(("apple",6.5),("banana",3.8),("blueberry",19.9))scala>valfruitsRDD=sc.makeRDD(fruits)scala>valfruitsRDD2=fruitsRDD.mapValues(x=>x+5)//所有的value+5scala>fruitsRDD2.collect()res3:Array[(String,Double)]=Array((apple,11.5),(banana,8.8),(blueberry,24.9))鍵值對RDD的元素包括key、value兩部分;鍵值對RDD支持:keys、values、groupByKey、sortByKey、reduceByKey、mapValues等操作。Spark綜合利用本任務中的知識儲備,根據本市違章記錄數據(records.txt)中,找出違章3次以上車輛。任務實施相關知識小結Spark項目3
SparkRDD分析車輛違章記錄Spark任務1根據交通違章數據創建RDD找出扣分最高的違章條目查找某車輛的違章記錄任務2任務3查找違章3次以上的車輛任務4找出累計扣12分以上的車輛任務5將處理結果存儲到外部文件中任務6任務分析Spark從本市違章記錄數據文件中,找出違章扣分12分以上的車輛;進而根據車輛所有人預留電話,模擬發一條短信(打印一句話),提醒其到交管部門協助調查。join操作連接兩個RDDSparkjoin是對于給定的兩個鍵值對RDD(數據類型為(K,V1)和(K,V2)),只有兩個RDD中都存在的Key才會被輸出,最終得到一個(K,(V1,V2))類型的RDD。scala>valrdd1=sc.makeRDD(List(("tom",1),("jerry",2),("petter",3)))scala>valrdd2=sc.makeRDD(List(("tom",5),("ben",2),("jerry",6)))scala>rdd1.join(rdd2).collect()//rdd1、rdd2中有相同的Key:tom、jerryres5:Array[(String,(Int,Int))]=Array((tom,(1,5)),(jerry,(2,6)))rightOuterJoin右連接SparkrightOuterJoin類似于SQL中的右外關聯rightouterjoin,根據兩個RDD的Key進行右連接,返回結果以右邊(第二個)的RDD為主,關聯不上的記錄為空。scala>valrdd1=sc.makeRDD(List(("tom",1),("jerry",2),("petter",3)))scala>valrdd2=sc.makeRDD(List(("tom",5),("apple",2),("jerry",6)))scala>valrdd4=rdd1.rightOuterJoin(rdd2)//兩個RDD右連接scala>rdd4.collect()res8:Array[(String,(Option[Int],Int))]=Array((tom,(Some(1),5)),(apple,(None,2)),(jerry,(Some(2),6)))rightOuterJoin右連接SparkleftOuterJoin類似于SQL中的左外關聯leftouterjoin,可以根據兩個RDD的Key進行左連接,返回結果以左邊(第一個)的RDD為主,關聯不上的記錄為空。scala>valrdd1=sc.makeRDD(List(("tom",1),("jerry",2),("petter",3)))scala>valrdd2=sc.makeRDD(List(("tom",5),("apple",2),("jerry",6)))scala>valrdd5=rdd1.leftOuterJoin(rdd2)//兩個RDD左連接scala>rdd5.collect()res11:Array[(String,(Int,Option[Int]))]=Array((tom,(1,Some(5))),(jerry,(2,Some(6))),(petter,(3,None)))rightOuterJoin右連接SparkfullOuterJoin是全連接,會保留兩個RDD的所有Key的連接結果scala>valrdd1=sc.makeRDD(List(("tom",1),("jerry",2),("petter",3)))scala>valrdd2=sc.makeRDD(List(("tom",5),("apple",2),("jerry",6)))scala>valrdd6=rdd1.fullOuterJoin(rdd2)//兩個RDD全連接scala>rdd6.collect()res13:Array[(String,(Option[Int],Option[Int]))]=Array((tom,(Some(1),Some(5))),(apple,(None,Some(2))),(jerry,(Some(2),Some(6))),(petter,(Some(3),None)))對于兩個RDD,Spark提供了類似于數據庫表的連接操作;支持的連接包括:join、rightOuterJoin、leftOuterJoin、fullOuterJoin等Spark綜合利用本任務中的知識儲備,從違章記錄中找到扣分超過12分的車牌,然后根據車牌查找車主信息,并模擬發出提示短信。任務實施相關知識小結Spark項目3
SparkRDD分析車輛違章記錄Spark任務1根據交通違章數據創建RDD找出扣分最高的違章條目查找某車輛的違章記錄任務2任務3查找違章3次以上的車輛任務4找出累計扣12分以上的車輛任務5將處理結果存儲到外部文件中任務6任務分析Spark將records.txt、violation.txt中信息整合后,抽取違章日期、車牌號、扣分數、罰款金額、違章項目名稱等5項信息,保存為TSV格式文件。讀寫文本文件Spark由文本文件創建RDD是現實中常見的需求,可以使用textFile(“文件位置”)方法讀取文件的內容、生成RDDscala>valpath="file:///home/hadoop/data/myfile.txt"scala>valfileRDD=sc.textFile(path,2)//創建RDD,指定分區數量為2scala>fileRDD.saveAsTextFile("file:///home/hadoop/data/output")//fileRDD數據保存到本地文件讀寫文本文件Spark在文本文件中,還有兩種常見的格式:CSV(commase
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- DB31/T 1253-2020板管熱交換器節能技術要求
- DB31/T 1207-2020疫苗冷鏈物流基本數據集
- 箱包企業品牌戰略與宣傳推廣考核試卷
- 領導力與技術變革關系的考核試題及答案
- 數據庫安全策略考題及答案闡述
- 2025年計算機二級Web考試新手指導試題及答案
- 跨區域私人直升機維修保養與飛行數據分析協議
- 股權表決權委托與智能制造產業投資合同
- 2025年中國北京特色小鎮行業市場規模調研及投資前景研究分析報告
- 智能零售電子價簽系統數據安全保障與服務協議
- 偏癱科普宣教
- 酒駕延緩處罰申請書
- 2023年國家開放大學《財務報表分析》形成性考核(1-4)試題答案解析
- 2022年1月福建化學會考試卷
- 2023年貴州省遵義市中考地理試卷真題(含答案)
- 物料提升機基礎專項施工方案正文
- 工程機械管理制度
- 廣東省勞動合同電子版(六篇)
- 對話大國工匠-致敬勞動模范期末考試答案
- 中央空調多聯機安裝規范
- 2023年安全制度-城市客運企業主要負責人和安全生產管理人員安全考核基礎題庫(城市軌道交通)考試歷年真題(精準考點)帶答案
評論
0/150
提交評論