




版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
1、 報告人:李雨倩 導師:林子雨 2014.07.12連接簡介連接簡介MapReduce連接策略連接策略連接連接連接是關系運算,可以用于合并關系。連接是關系運算,可以用于合并關系。在數據庫中,一般是表連接操作;在MapReduce中,連接可以用于合并兩個或多個數據集。例如,用戶基本信息和用戶活動詳情。用戶基本信息來自于OLTP數據庫。用戶活動詳情來自于日志文件。連接的類型連接的類型最常用的兩個連接類型是內連接和外連接。 內連接比較兩個關系中所有的數組,然后生成一個滿足連接條件的結果集。外連接外連接 外連接并不需要兩個關系的數組都滿足連接條件。在連接條件不滿足的時候,外連接可以將一方的數據保留在結
2、果集中。內連接內連接左外連接右外連接全連接連接關系圖連接關系圖連接實例連接實例連接簡介連接簡介MapReduce連接策略連接策略連接連接連接是關系運算,可以用于合并關系。連接是關系運算,可以用于合并關系。在數據庫中,一般是表連接操作;例如,用戶基本信息和用戶活動詳情。用戶基本信息來自于OLTP數據庫。用戶活動詳情來自于日志文件。MapReduce的連接的連接welcome to use these PowerPoint templates, New Content design, 10 years experienceMapReduce連接的應用場景用戶的人口統計信息的聚合操作(例如:青少年和
3、中年人的習慣差異)當用戶超過一定時間沒有使用網站后,發郵件提醒他們。分析用戶的瀏覽習慣,讓系統可以提示用戶有哪些網站特性還沒有使用到,形成一個反饋循環。MapReduce中的連接策略中的連接策略復制連接復制連接半連接半連接reduce端連接。使用場景:連接兩個或多個大型數據集。map端連接。使用場景:待連接的數據集中有一個數據集小到可以完全放在緩存中。map端連接。使用場景:待連接的數據集中有一個數據集非常大,但同時這個數據集可以被過濾成小到可以放在緩存中。重分區連接重分區連接 重分區連接利用MapReduce的排序-合并機制來分組數據。它被實現為使用一個單獨的MapReduce任務,并支持多
4、路連接(這里的多路指的是多個數據集)。 Map階段負責從多個數據集中讀取數據,決定每個數據的連接值,將連接值作為輸出鍵。輸出值則包含將在reduce階段被合并的值。 Reduce階段,一個reducer接收map函數傳來的一個輸出鍵的所有輸出值,并將數據分為多個分區。在此之后,reducer對所有的分區進行笛卡爾積連接運算,并生成全部的結果集。在如下示例中,用戶數據中有用戶姓名,年齡和所在州$ cat test-data/ch4/users.txtanne 22 NYjoe 39 COalison 35 NYmike 69 VAmarie 27 ORjim 21 ORbob 71 CAmary
5、 53 NYdave 36 VAdude 50 CA用戶活動日志中有用戶姓名,進行的動作,來源IP。這個文件一般都要比用戶數據要大得多。$ cat test-data/ch4/user-logs.txtjim logout 93.24.237.12mike new_tweet 87.124.79.252bob new_tweet 58.133.120.100mike logout 55.237.104.36jim new_tweet 93.24.237.12marie view_user 122.158.130.90$ hadoop fs -put test-data/ch4/user-log
6、s.txt user-logs.txt$ bin/run.sh com.manning.hip.ch4.joins.improved.SampleMain users.txt,user-logs.txt output$ hadoop fs -cat output/part*bob 71 CA new_tweet 58.133.120.100jim 21 OR logout 93.24.237.12jim 21 OR new_tweet 93.24.237.12jim 21 OR login 198.184.237.49marie 27 OR login 58.133.120.100marie
7、27 OR view_user 122.158.130.90mike 69 VA new_tweet 87.124.79.252mike 69 VA logout 55.237.104.36重分區連接重分區連接過濾( filter)指的是將map極端的輸入數據中不需要的部分丟棄。投影( project)是關系代數的概念。投影用于減少發送給reducer的字段。優化重分區連接優化重分區連接 傳統重分區方法的實現空間效率低下。它需要將連接的所有的輸出值都讀取到內存中,然后進行多路連接。事實上,如果僅僅將小數據集讀取到內存中,然后用小數據集來遍歷大數據集,進行連接,這樣將更加高效。下圖是優化后的重分
8、區連接的流程圖。Map輸出的組合鍵和組合值輸出的組合鍵和組合值 上圖說明了map輸出的組合鍵和組合值。二次排序將會根據連接鍵(join key)進行分區,但會用整個組合鍵來進行排序。組合鍵包括一個標識源數據集(較大或較小)的整形值,因此可以根據這個整形值來保證較小源數據集的值先于較大源數據的值被reducer接收。優化重分區連接優化重分區連接 上圖是實現的類圖。類圖中包含兩個部分,一個通用框架和一些類的實現樣例。使用這個連接框架需要實現抽象類 OptimizedDataJoinMapperBase 和 OptimizedDataJoinReducerBase。OptimizedDataJoin
9、MapperBaseprotected abstract Text generateInputTag(String inputFile);protected abstract boolean isInputSmaller(String inputFile);public void configure(JobConf job) this.inputFile = job.get(map.input.file);this.inputTag = generateInputTag(this.inputFile);if(isInputSmaller(this.inputFile) smaller = ne
10、w BooleanWritable(true);outputKey.setOrder(0); else smaller = new BooleanWritable(false);outputKey.setOrder(1); 這個類的作用是辨認出較小的數據集,并生成輸出鍵和輸出值。Configure方法在mapper創建期調用。Configure方法的作用之一是標識每一個數據集,讓reducer可以區分數據的源數據集。另一個作用是辨認當前的輸入數據是否是較小的數據集。OptimizedDataJoinMapperBase(續)(續) protected abstract OptimizedTag
11、gedMapOutput generateTaggedMapOutput(Object value);protected abstract String generateGroupKey(Object key, OptimizedTaggedMapOutput aRecord);public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException OptimizedTaggedMapOutput aRecord = generateTaggedMapOutp
12、ut(value); if (aRecord = null) return; aRecord.setSmaller(smaller);String groupKey = generateGroupKey(aRecord); if (groupKey = null) return; outputKey.setKey(groupKey);output.collect(outputKey, aRecord); Map方法首先調用自定義的方法 (generateTaggedMapOutput) 來生成OutputValue對象。這個對象包含了在連接中需要使用的值,和一個標識較大或較小數據集的布爾值。如
13、果map方法可以調用自定義的方法 (generateGroupKey) 來得到可以在連接中使用的鍵,那么這個鍵就作為map的輸出鍵。OptimizedDataJoinReducerBasepublic void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter)throws IOException CompositeKey k = (CompositeKey) key;List smaller = new ArrayList(); while (values.hasNext() Objec
14、t value = values.next(); OptimizedTaggedMapOutput cloned =(OptimizedTaggedMapOutput) value).clone(job); if (cloned.isSmaller().get() smaller.add(cloned); else joinAndCollect(k, smaller, cloned, output, reporter); Map端處理后已經可以保證較小源數據集的值將會先于較大源數據集的值被接收。這里就可以將所有的較小源數據集的值放到緩存中。在開始接收較大源數據集的值的時候,就開始和緩存中的值做
15、連接操作。OptimizedDataJoinRuducerBase(續)(續)protected abstract OptimizedTaggedMapOutput combine(String key,OptimizedTaggedMapOutput value1,OptimizedTaggedMapOutput value2); private void joinAndCollect(CompositeKey key,List smaller, OptimizedTaggedMapOutput value, OutputCollector output, Reporter reporter
16、) throws IOException if(smaller.size() 1) OptimizedTaggedMapOutput combined = combine(key.getKey(), null, value); collect(key, combined, output, reporter); else for (OptimizedTaggedMapOutput small : smaller) OptimizedTaggedMapOutput combined = combine(key.getKey(), small, value); collect(key, combin
17、ed, output, reporter); 方法 joinAndCollect 包含了兩個數據集的值,并輸出它們。優化重分區連接實例優化重分區連接實例例如,需要連接用戶詳情數據和用戶活動日志。第一步,判斷兩個數據集中哪一個比較小。對于一般的網站來說,用戶詳情數據會比較小,用戶活動日志會比較大。首先,實現抽象類 OptimizedDataJoinMapperBase。這個將在map端被調用。這個類將創建map的輸出鍵和輸出值。同時,它還將提示整個框架,當前處理的文件是不是比較小的那個。Map端實現代碼端實現代碼 public class SampleMap extends OptimizedD
18、ataJoinMapperBase private boolean smaller; Override protected Text generateInputTag(String inputFile) / tag the row with input file name (data source) smaller = inputFile.contains(users.txt); return new Text(inputFile); Override protected String genGroupKey(Object key, OutputValue output) return key
19、.toString(); Override protected boolean isInputSmaller(String inputFile) return smaller; Override protected OutputValue genMapOutputValue(Object o) return new TextTaggedOutputValue(Text) o); Reduce端實現代碼端實現代碼第二步,你需要實現抽象類 OptimizedDataJoinReducerBase。它將在reduce端被調用。在這個類中,將從map端傳入不同數據集的輸出鍵和輸出值,然后返回reduc
20、e的輸出數組。public class SampleReduce extends OptimizedDataJoinReducerBase private TextTaggedOutputValue output = new TextTaggedOutputValue(); private Text textOutput = new Text(); Override protected OutputValue combine(String key, OutputValue smallValue,OutputValue largeValue) if(smallValue = null | lar
21、geValue = null) return null; Object values = smallValue.getData(), largeValue.getData() ; textOutput.set(StringUtils.join(values, t); output.setData(textOutput); return output; 任務的主代碼任務的主代碼最后,任務的主代碼需要指明InputFormat類,并設置二次排序。job.setInputFormat(KeyValueTextInputFormat.class);job.setMapOutputKeyClass(Co
22、mpositeKey.class);job.setMapOutputValueClass(TextTaggedOutputValue.class); job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.setPartitionerClass(CompositeKeyPartitioner.class);job.setOutputKeyComparatorClass(CompositeKeyComparator.class);job.setOutputValueGroupingComparator(C
23、ompositeKeyOnlyComparator.class);MapReduce中的連接策略中的連接策略重分區連接重分區連接半連接半連接reduce端連接。使用場景:連接兩個或多個大型數據集。map端連接。使用場景:待連接的數據集中有一個數據集小到可以完全放在緩存中。map端連接。使用場景:待連接的數據集中有一個數據集非常大,但同時這個數據集可以被過濾成小到可以放在緩存中。復制連接復制連接 復制連接得名于它的具體實現:連接中最小的數據集將會被復制到所有的map主機節點。復制連接有一個假設前提:在被連接的數據集中,有一個數據集足夠小到可以緩存在內存中。MapReduce的復制連接的工作原理如
24、下:使用分布式緩存將這個小數據集復制到所有運行map任務的節點。用各個map任務初始化方法將這個小數據集裝載到一個哈希表中。逐條用大數據集中的記錄遍歷這個哈希表,逐個判斷是否符合連接條件輸出符合連接條件的結果。復制連接復制連接一個復制連接通用框架一個復制連接通用框架 該復制連接框架可以支持任意類型的數據集。這個框架中同樣提供了一個優化的小功能:動態監測分布式緩存內容和輸入塊的大小,并判斷哪個更大。如果輸入塊較小,那么就需要將map的輸入塊放到內存緩沖中,然后在mapper的cleanup方法中執行連接操作。下圖為該框架的類圖。并且提供了連接類( GenericReplicatedJoin)的具
25、體實現,假設前提:每個數據文件的第一個標記是連接鍵。連接框架的算法連接框架的算法 Mapper的setup方法判斷在map的輸入塊和分布式緩存的內容中哪個大。如果分布式緩存的內容比較小,那么它將被裝載到內存緩存中。Map函數開始連接操作。如果輸入塊比較小,map函數將輸入塊的鍵值對裝載到內存緩存中。Map的cleanup方法將從分布式緩存中讀取記錄,逐條記錄和在內存緩存中的鍵值對進行連接操作。GenericReplicatedJoin 以下代碼為GenericReplicatedJoin中的setup方法,它是在map的初始化階段調用的。這個方法判斷分布式緩存中的文件和輸入塊哪個大。如果文件比
26、較小,則將文件裝載到HashMap中。protected void setup(Context context) throws IOException, InterruptedException distributedCacheFiles=DistributedCache.getLocalCacheFiles(context.getConfiguration(); int distCacheSizes = 0; for (Path distFile : distributedCacheFiles) File distributedCacheFile = new File(distFile.to
27、String(); distCacheSizes += distributedCacheFile.length(); if(context.getInputSplit() instanceof FileSplit) FileSplit split = (FileSplit) context.getInputSplit(); long inputSplitSize = split.getLength(); distributedCacheIsSmaller = (distCacheSizes inputSplitSize); else distributedCacheIsSmaller = tr
28、ue; if (distributedCacheIsSmaller) for (Path distFile : distributedCacheFiles) File distributedCacheFile = new File(distFile.toString(); DistributedCacheFileReader reader = getDistributedCacheReader(); reader.init(distributedCacheFile); for (Pair p : (Iterable) reader) addToCache(p); reader.close();
29、 GenericReplicatedJoin(續)(續) 以下代碼為GenericReplicatedJoin中的Map方法。它將會根據setup方法是否將了分布式緩存的內容裝載到內存的緩存中來選擇行為。如果分布式緩存的內容被裝載到內存中,那么map方法就將輸入塊的記錄和內存中的緩存做連接操作。如果分布式緩存的內容沒有被裝載到內存中,那么map方法就將輸入塊的記錄裝載到內存中,然后在cleanup方法中使用。protected void map(Object key, Object value, Context context) throws IOException, InterruptedE
30、xception Pair pair = readFromInputFormat(key, value);if(distributedCacheIsSmaller) joinAndCollect(pair, context); else addToCache(pair);public void joinAndCollect(Pair p, Context context)throws IOException, InterruptedException List cached = cachedRecords.get(p.getKey(); if (cached != null) for (Pai
31、r cp : cached) Pair result;if(distributedCacheIsSmaller) result = join(p, cp); else result =join(cp, p); if (result != null) context.write(result.getKey(), result.getData();public Pair join(Pair inputSplitPair, Pair distCachePair) StringBuilder sb = new StringBuilder(); if (inputSplitPair.getData()
32、!= null) sb.append(inputSplitPair.getData();sb.append(t); if (distCachePair.getData() != null) sb.append(distCachePair.getData();return new Pair(new Text(inputSplitPair.getKey().toString(),new Text(sb.toString();GenericReplicatedJoin(續)(續) 當所有的記錄都被傳輸給map方法,MapReduce將會調用cleanup方法。如果分布式緩存的內容比輸入塊大,連接將會
33、在cleanup中進行。連接的對象是map函數的緩存中的輸入塊的記錄和分布式緩存中的記錄。 protected void cleanup(Context context) throws IOException, InterruptedException if (!distributedCacheIsSmaller) for (Path distFile : distributedCacheFiles) File distributedCacheFile = new File(distFile.toString(); DistributedCacheFileReader reader = get
34、DistributedCacheReader(); reader.init(distributedCacheFile);for (Pair p : (Iterable) reader) joinAndCollect(p, context); reader.close(); GenericReplicatedJoin(續)(續) 最后,任務的驅動代碼必須指定需要裝載到分布式緩存中的文件。以下的代碼可以處理一個文件,也可以處理MapReduce輸入結果的一個目錄。 Configuration conf = new Configuration(); FileSystem fs = smallFile
35、Path.getFileSystem(conf); FileStatus smallFilePathStatus = fs.getFileStatus(smallFilePath); if(smallFilePathStatus.isDir() for(FileStatus f: fs.listStatus(smallFilePath) if(f.getPath().getName().startsWith(part) DistributedCache.addCacheFile(f.getPath().toUri(), conf); else DistributedCache.addCache
36、File(smallFilePath.toUri(), conf); MapReduce中的連接策略中的連接策略重分區連接重分區連接復制連接復制連接reduce端連接。使用場景:連接兩個或多個大型數據集。map端連接。使用場景:待連接的數據集中有一個數據集小到可以完全放在緩存中。map端連接。使用場景:待連接的數據集中有一個數據集非常大,但同時這個數據集可以被過濾成小到可以放在緩存中。半連接半連接 假設一個場景,需要連接兩個很大的數據集,例如用戶日記和OLTP的用戶數據。任何一個數據集都不是足夠小到可以緩存在map job的內存中。如此看來,似乎就不得不應用reduce端的連接了。這時候,可以
37、看一下題目本身:若是一個數據集中有的記錄因為無法連接到另一個數據集的記錄,將會被移除,還需要將全部數據集放到內存中嗎?在這個例子中,在用戶日記中的用戶僅僅是OLTP用戶數據中的用戶中的很小的一個項目組。那么就可以從OLTP用戶數據中取出存在于用戶日記中的那項目組用戶的用戶數據。如許就可以獲得足夠小到可以放在內存中的數據集。如許的解決規劃就叫做半連接。例子例子¥ bin/run.sh com.manning.hip.ch4.joins.semijoin.Main users.txt user-logs.txt output¥ hadoop fs -ls output/user/aholmes/o
38、utput/filtered/user/aholmes/output/result/user/aholmes/output/unique¥ hadoop fs -cat output/unique/part*bobjimmariemike¥ hadoop fs -cat output/filtered/part*mike 69 VAmarie 27 ORjim 21 ORbob 71 CA¥ hadoop fs -cat output/result/part*jim logout 93.24.237.12 21 ORmike new_tweet 87.124.79.252 69 VAbob n
39、ew_tweet 58.133.120.100 71 CAmike logout 55.237.104.36 69 VAjim new_tweet 93.24.237.12 21 ORmarie view_user 122.158.130.90 27 ORjim login 198.184.237.49 21 ORmarie login 58.133.120.100 27 OR半連接的實現半連接的實現3個個MapReduce job半連接的實現半連接的實現job1 第一個MapReduce job的功能是從日記文件中提取出用戶名,用這些用戶名生成一個用戶名唯一的湊集(Set)。這經由過程讓ma
40、p函數履行了用戶名的投影操作。然后用reducer輸出用戶名。為了削減在map階段和reduce簡短之間傳輸的數據量,就在map任務中采取哈希集來保存用戶名,在cleanup辦法中輸出哈希集的值。Job1的實現代碼的實現代碼 public static class Map extends Mapper private Set keys = new HashSet();Override protected void map(Text key, Text value, Context context) throws IOException, InterruptedException keys.ad
41、d(key.toString(); Override protected void cleanup(Context context) throws IOException, InterruptedException Text outputKey = new Text(); for(String key: keys) outputKey.set(key); context.write(outputKey, NullWritable.get(); public static class Reduce extends Reducer Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException context.write(key, NullWritable.get(); 半連接的實現半連接的實現job2第二步
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 管理崗位績效管理辦法
- 學校地基歸誰管理辦法
- 競賽教練考核管理辦法
- 腸息肉中醫教學課件
- 福建第三次質檢數學試卷
- 汾陽初中二模數學試卷
- 畢業設計(論文)-家用照明智能控制系統的設計
- 2025至2030大米行業市場深度研究與戰略咨詢分析報告
- 德國職業教育的數字化轉型:戰略規劃、項目布局與效果評估
- 麗水農林技師學院招聘教師筆試真題2024
- 水利工程隱患排查課件
- 醫藥公司廉政管理制度
- T/CEPPEA 5023-2023風光儲充一體化充電站設計規范
- (人教2024版)英語七下期末全冊分單元總復習課件(新教材)
- 碳資產管理與碳金融 課件 第9章 碳資產管理案例
- 木質纖維素納米纖絲基水凝膠傷口敷料的制備與性能研究
- 八五普法自查自評情況報告
- 2025三季度四川經準檢驗檢測集團股份限公司招聘48人易考易錯模擬試題(共500題)試卷后附參考答案
- 網約車法律培訓
- 深圳市羅湖區2025年小升初數學模擬試卷含解析
- 軸承加工合同協議
評論
0/150
提交評論