




版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領
文檔簡介
第10單元并發(fā)編程Java程序設計基礎教程((慕課版)(第2版))目錄導航10.1線程與進程10.3線程的調度10.2線程的創(chuàng)建10.4多線程10.5項目實戰(zhàn)10.6單元小結10.1線程與進程線程(Thread)是進程中某個單一順序的控制流,被稱為輕量級進程(LightweightProcess),是比進程更小的執(zhí)行單位,也是程序執(zhí)行流中最小的單位。一個標準的線程由線程ID、當前指令指針、寄存器集合和堆棧組成。一個線程可以創(chuàng)建和撤銷另一個線程,同一個進程中的多個線程也可以并發(fā)執(zhí)行。由于進程的所有資源是固定的且線程間存在相互制約關系,線程可能處于就緒、阻塞和運行等狀態(tài),令線程的執(zhí)行呈現出間斷性。線程之間可以共享代碼和數據,進行實時通信和必要的同步操作等。一個程序至少擁有一個進程,每個進程擁有一個或者多個線程。每個線程都有自己獨立的資源和生命周期。進程和線程的最大區(qū)別之一在于進程是由操作系統(tǒng)來控制的,而線程則是由進程來控制的。進程都是相互獨立的,各自享有各自的內存空間,因此進程間的通信是昂貴且受限的,進程間的轉換也是需要開銷的;線程則共享進程的內存空間,線程通信是便宜的,且線程間的轉換也是低成本的,但這種低成本低開銷的通信可能會產生意想不到的錯誤:當多個線程訪問同一個變量時,獲取到的值是不一樣的。目錄導航10.1線程與進程10.3線程的調度10.2線程的創(chuàng)建10.4多線程10.5項目實戰(zhàn)10.6單元小結10.2.1繼承Thread類Thread類是實體類,該類封裝了線程的行為。想要利用Thread類創(chuàng)建一個線程,必須創(chuàng)建一個從Thread類導出的子類,并實現Thread類的run()方法。在run()方法內部可以根據需要編寫相應的實現邏輯,最后調用Thread類的start()方法來執(zhí)行。Thread類的構造方法有很多種,每種構造方法的用途各異,如表所示。構造方法說明Thread()構造一個線程對象Thread(Runnabletarget)構造一個線程對象,其中target是要創(chuàng)建線程的目標對象,它實現了Runnable接口中的run()方法Thread(Stringname)以指定名稱構造一個線程對象Thread(ThreadGroupgroup,Runnable
target)在指定線程組中構造一個線程對象,使用目標對象target的run()方法Thread(Runnabletarget,Stringname)以指定名稱構造一個線程對象,使用目標對象target的run()方法Thread(ThreadGroupgroup,Runnable
target,Stringname)在指定的線程組中創(chuàng)建一個指定名稱的線程,使用目標對象target的run()方法Thread(ThreadGroupgroup,Runnable
target,Stringname,longstackSize)在指定線程組中構造一個線程對象,用name指定線程的名字,使用目標對象target的run()方法作為線程的執(zhí)行體,用stackSize指定堆棧大小10.2.1繼承Thread類Thread類也提供了很多輔助方法,以讓線程正常運行和方便程序員對線程的控制,其常用方法如表所示。輔助方法說明staticintactiveCount()返回線程組中正在運行的線程的數目voidcheckAccess()確定當前運行的線程是否有權限修改線程staticThreadcurrentThread()返回當前正在執(zhí)行的線程voiddestroy()銷毀線程,但不回收資源staticvoiddumpStack()顯示當前線程的堆棧信息longgetId()返回當前線程的IDStringgetName()返回當前線程的名稱intgetPriority()返回當前線程的優(yōu)先級Thread.StategetState()返回當前線程的狀態(tài)ThreadGroupgetThreadGroup()返回當前線程所屬的線程組voidinterrupt()中斷線程booleanisAlive()判斷當前線程是否存活booleanisDaemon()判斷當前線程是否是守護線程booleanisInterrupted()判斷當前線程是否被中斷voidjoin()等待直到線程死亡voidjoin(longmillis)最多等待millisms,直到線程死亡10.2.1繼承Thread類續(xù)表輔助方法說明voidrun()如果類是使用單獨的Runnable對象構造的,將調用Runnable對象的run()方法,否則本方法不進行操作直接返回voidsetDaemon(booleanon)將當前線程設置為守護線程voidsetName(Stringname)將當前線程名稱修改為namevoidsetPriority(intnewPriority)設置當前線程的優(yōu)先級staticvoidsleep(longmillis)線程休眠millismsvoidstart()啟動線程,JVM會自動調用當前線程的run()方法staticvoidyield()暫停當前線程,同時允許其他線程運行任務10-1使用Thread類實現多線程文件ThreadDemo.javapublicclassThreadDemo{publicstaticvoidmain(String[]args){for(inti=0;i<10;i++){//創(chuàng)建10個MyThread類的對象并運行MyThreadthread=newMyThread();thread.start();}}}//繼承了Thread類的類classMyThreadextendsThread{@Overridepublicvoidrun(){//重寫父類的run()方法for(inti=0;i<3;i++){//循環(huán)輸出信息System.out.println(Thread.currentThread().getName()+"-正在執(zhí)行!");}}}運行結果如圖10-1所示。任務10-2Thread類部分方法的使用文件ThreadUsageDemo.javapublicclassThreadUsageDemoextendsThread{publicstaticvoidmain(String[]args){//創(chuàng)建一個線程并運行ThreadUsageDemothread=newThreadUsageDemo();thread.start();System.out.println("線程名稱:"+thread.getName());thread.setName("myThread1");System.out.println("線程名稱:"+thread.getName());System.out.println("線程的ID:"+thread.getId());System.out.println("線程的優(yōu)先級:"+thread.getPriority());thread.setPriority(3);System.out.println("線程的優(yōu)先級:"+thread.getPriority());System.out.println("線程是否是存活狀態(tài):"+thread.isAlive());System.out.println("線程是否是守護線程:"+thread.isDaemon());longstart=System.currentTimeMillis();try{Thread.currentThread().sleep(2000);}catch(InterruptedExceptione){e.printStackTrace();}longend=System.currentTimeMillis();System.out.println("等待時間:"+(end-start));}}運行結果如圖10-2所示。任務10-3start()方法和run()方法文件ThreadUsageDemo1.javapublicclassThreadUsageDemoextendsThread{publicstaticvoidmain(String[]args){ThreadUsageDemo1thread=newThreadUsageDemo1();thread.start();for(inti=0;i<10;i++){//循環(huán)輸出主線程正在運行的信息System.out.println(Thread.currentThread().getName()+"-正在運行");}try{Thread.sleep(1000);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("*****************************************************");/**調用start()方法重新啟動一個線程,run()在主線程中運行*/thread.run();for(inti=0;i<30;i++){//循環(huán)輸出主線程正在運行的信息System.out.println(Thread.currentThread().getName()+"-正在運行!");}}運行結果如圖10-3所示。@Overridepublicvoidrun(){//循環(huán)輸出當前線程正在運行的信息for(inti=0;i<10;i++){System.out.println(Thread.currentThread().getName()+"-正在運行!");}}}任務10-1使用Thread類實現多線程實現多線程的另一個方式是實現Runnable接口。Runnable接口只有一個方法,即run()方法,該方法需要由一個實現了此接口的類來實現。實現了Runnable接口的類的對象需要由Thread類的一個實例在其內部運行,其本身不能直接運行。任務10-4使用Runnable接口實現多線程文件RunnableDemo.javapublicclassRunnableDemoimplementsRunnable{@Overridepublicvoidrun(){for(inti=0;i<8;i++){System.out.println(Thread.currentThread().getName()+"正在運行");}}publicstaticvoidmain(String[]args){for(inti=0;i<10;i++){RunnableDemorunnable=newRunnableDemo();Threadt=newThread(runnable);//將Runnable對象包裝成Thread對象t.setName("runnable"+i);//設置線程名稱t.start();//啟動線程}}}運行結果如圖10-4所示。目錄導航10.1線程與進程10.3線程的調度10.2線程的創(chuàng)建10.4多線程10.5項目實戰(zhàn)10.6單元小結10.3.1線程的生命周期圖10-5Java線程狀態(tài)轉換創(chuàng)建:當創(chuàng)建一個Thread類和它的子類、對象后,線程就處于創(chuàng)建狀態(tài)。就緒:當處于創(chuàng)建狀態(tài)的線程調用start()方法被啟動之后,線程將進入線程隊列,等待CPU分配時間片,以開始執(zhí)行。運行:就緒狀態(tài)的線程獲取了時間片之后,就進入運行狀態(tài),此時線程會執(zhí)行run()方法內的代碼邏輯。阻塞:線程在運行的過程中,可能會因為資源不足、前驅任務沒有完成或者調用了阻塞方法等而進入阻塞狀態(tài)。死亡:不具備繼續(xù)運行能力的線程就處于死亡狀態(tài)。10.3.2線程的優(yōu)先級線程也是有優(yōu)先級的,線程的優(yōu)先級可以通過getPriority()方法獲取。為了使重要的任務優(yōu)先完成,Java提供了setPriority()方法給線程設定優(yōu)先級。但是需要指出的是,JVM是運行在所屬系統(tǒng)上的一個線程,線程的創(chuàng)建和執(zhí)行需要基于對應的系統(tǒng)。所以,在一些不支持線程優(yōu)先級策略的系統(tǒng)中,Java設定的優(yōu)先級并不起作用,這一點讀者一定要注意。任務10-5線程優(yōu)先級文件ThreadPriorityDemo.javaublicclassThreadPriorityDemoextendsThread{privateRandomrm=newRandom();@Overridepublicvoidrun(){System.out.println(this.getName()+"-優(yōu)先級>"+this.getPriority()+"開始執(zhí)行!");StringBuildersBuilder=newStringBuilder();for(inti=0;i<100;i++){sBuilder.append(rm.nextInt(1000)+",");}for(intj=sBuilder.length()-1;j>=0;j--){if(j%2==0){sBuilder.deleteCharAt(j);}}}10.3.2線程的優(yōu)先級publicstaticvoidmain(String[]args)throwsInterruptedException{System.out.println("不設定優(yōu)先級執(zhí)行!");List<Thread>list=newLinkedList<>();for(inti=0;i<10;i++){//創(chuàng)建10個默認優(yōu)先級的線程對象并放入鏈表中ThreadPriorityDemothread=newThreadPriorityDemo();list.add(thread);}for(Threadt:list){//從鏈表中取出線程并執(zhí)行t.start();}Thread.sleep(2000);list.clear();//清空鏈表System.out.println("************************************************");System.out.println("設定優(yōu)先級執(zhí)行!");for(inti=0;i<10;i++){//創(chuàng)建10個線程對象ThreadPriorityDemothread=newThreadPriorityDemo();if((i+1)%3==0){//能被3整除的對象的優(yōu)先級設置為10thread.setPriority(10);運行結果如圖10-6所示。}elseif((i+1)%2==0){thread.setPriority(1);//能被2整除的對象的優(yōu)先級設置為1}list.add(thread);//否則使用默認優(yōu)先級}for(Threadt:list){t.start();//執(zhí)行線程}}}10.3.2線程的優(yōu)先級從任務10-5的輸出結果可以看出,在Java中線程是有默認優(yōu)先級的,默認情況下線程的優(yōu)先級為5,是普通優(yōu)先級。Java中定義了線程的優(yōu)先級為1~10,數字越大,優(yōu)先級越高。對于優(yōu)先級,讀者需要注意以下幾點。并不是優(yōu)先級高的線程一定會比優(yōu)先級低的線程先執(zhí)行,它只是會比優(yōu)先級低的線程有更多的機會先執(zhí)行。01Java的線程優(yōu)先級取決于JVM運行的系統(tǒng),線程優(yōu)先級策略也依賴于系統(tǒng),這可能導致在一個系統(tǒng)中優(yōu)先級不同的線程在另一個系統(tǒng)中優(yōu)先級相同。甚至對于某些不支持線程優(yōu)先級調度策略的系統(tǒng),Java定義的優(yōu)先級完全無效。0210.3.3線程插隊線程的魅力在于能夠高效利用CPU資源,使得程序在單位時間內充分地利用CPU而提升處理效率。但線程運行順序的不確定性以及現代操作系統(tǒng)核心數的增加,導致在某些情況下線程無法明確前驅任務是否完成。為了保證前驅任務完成后才執(zhí)行當前線程,可以調用join()方法。join()會阻塞當前線程,直到插隊線程執(zhí)行完畢之后才會繼續(xù)執(zhí)行,如任務10-6所示。任務10-6線程插隊文件JoinDemo.javapublicclassJoinDemo{publicstaticvoidmain(String[]args){System.out.println("主線程開始!");List<Integer>list=newLinkedList<>();//線程初始化ThreadBthread=newThreadB(list);ThreadAthreadA=newThreadA(list,thread);//線程運行thread.start();threadA.start();//線程插隊try{threadA.join();}catch(InterruptedExceptione){e.printStackTrace();}System.out.println("主線程結束!");}}10.3.3線程插隊classThreadAextendsThread{privateList<Integer>list;privateThreadBthreadB;publicThreadA(List<Integer>linkedList,ThreadBthread){list=linkedList;threadB=thread;}@Overridepublicvoidrun(){System.out.println(Thread.currentThread().getName()+"開始執(zhí)行!");try{threadB.join();//線程ThreadB插隊執(zhí)行}catch(InterruptedExceptione){e.printStackTrace();}intcount=1;for(Integeri:list){//遍歷listif(count%10==0){//每行輸出10個元素System.out.println(i);}else{System.out.print(i+",");}count++;}System.out.println(Thread.currentThread().getName()+"執(zhí)行結束!");}}classThreadBextendsThread{Randomrm=newRandom();privateList<Integer>list;publicThreadB(List<Integer>list){this.list=list;}@Overridepublicvoidrun(){System.out.println(Thread.currentThread().getName()+"開始執(zhí)行!");for(inti=0;i<100;i++){list.add(rm.nextInt(1000));//隨機插入100個整數到list中}System.out.println(Thread.currentThread().getName()+"執(zhí)行結束!");}}運行結果如圖10-7所示。10.3.4線程休眠Thread類中有sleep()方法。該方法可以讓當前線程休眠并讓出CPU,使得其他線程可以獲取CPU并執(zhí)行。對于周期性很強的系統(tǒng),調用線程休眠是最好的形式。線程休眠時只會等待休眠結束,并不占用CPU資源。等到休眠結束后,線程會重新進入就緒狀態(tài),等待CPU分配時間片,以繼續(xù)執(zhí)行。任務10-7線程休眠文件SleepDemo.javapublicclassSleepDemo{publicstaticvoidmain(String[]args){List<RandomThread>list=newArrayList<>();longstart=System.currentTimeMillis();//系統(tǒng)當前毫秒值//創(chuàng)建30個RandomThread對象for(inti=0;i<30;i++){list.add(newRandomThread(start));}for(RandomThreadt:list){//執(zhí)行RandomThread對象t.start();}}}classRandomThreadextendsThread{privatelongstartTime;publicRandomThread(longtime){startTime=time;}@Overridepublicvoidrun(){Randomrm=newRandom();for(inti=0;i<10;i++){longtime=System.currentTimeMillis();10.3.4線程休眠//隨機輸出一個數字System.out.println(Thread.currentThread().getName()+"-第"+(i+1)+"次執(zhí)行:"+rm.nextInt(100)+";與基準時間差值是-"+(time-startTime));try{//輸出后休眠1s,參數1000是毫秒值Thread.currentThread().sleep(1000);}catch(InterruptedExceptione){e.printStackTrace();}}}}運行結果如圖10-8所示。10.3.5同步與互斥寄宿學校可能會有排隊打水的場景。若許多人同時等待一個開水閥準備接開水,只有前面一個人打水完畢后,后面一個人才能開始打水。如果打水的動作不是同步的,那么就會出現多人同時搶占一個開水閥的問題。下面通過任務10-8模擬非同步打水的場景。同步01OPTION有時候為了實現互斥,也會使用信號量進行控制,如任務10-10所示。互斥02OPTION任務10-8非同步打水文件GetWaterCrushDemo.javapublicclassGetWaterCrushDemoextendsThread{privatePersonAsypersonAsy;publicstaticvoidmain(String[]args){for(inti=0;i<10;i++){PersonAsyperson=newPersonAsy();person.setName("王"+i);GetWaterCrushDemocrush=newGetWaterCrushDemo(person);crush.start();}}publicGetWaterCrushDemo(PersonAsyperson){personAsy=person;}publicvoidgetWater(PersonAsypersonAsy){System.out.println(personAsy.getName()+"開始打水:");try{Thread.currentThread().sleep(500);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println(personAsy.getName()+"打水結束!");}
@Overridepublicvoidrun(){this.getWater(personAsy);}}classPersonAsy{privateStringname;publicStringgetName(){returnname;}publicvoidsetName(Stringname){=name;}}運行結果如圖10-9所示。任務10-8非同步打水synchronized是Java中的關鍵字,是一種同步鎖。在多線程場景中,它通過控制線程對同一個代碼的訪問,確保同一時間只有一個線程能夠執(zhí)行該代碼。它修飾的對象有以下幾種。被修飾的代碼塊被稱為同步語句塊,其作用范圍是花括號{}內的代碼,作用對象是調用這個代碼塊的對象。代碼塊被修飾的普通方法稱為同步方法,其作用范圍是整個方法,作用對象是調用這個方法的對象。普通方法其作用范圍是整個靜態(tài)方法,作用對象是靜態(tài)方法所屬類的所有對象。靜態(tài)方法其作用范圍是synchronized關鍵字后面圓括號內的部分,作用對象是類的所有對象。類任務10-9同步打水文件CountSycDemo.javapublicclassCountSycDemoextendsThread{privatePersonSycpersonSyc;privatestaticObjectobj=newObject();publicstaticvoidmain(String[]args){for(inti=0;i<10;i++){PersonSycperson=newPersonSyc();person.setName("王"+i);CountSycDemocrush=newCountSycDemo(person);crush.start();}}publicCountSycDemo(PersonSycperson){personSyc=person;}publicvoidgetWater(PersonSycperson){synchronized(obj){System.out.println(person.getName()+"開始打水:");try{Thread.currentThread().sleep(500);}catch(InterruptedExceptione){e.printStackTrace();}System.out.println(person.getName()+"打水結束!");}}@Overridepublicvoidrun(){this.getWater(personSyc);}}運行結果如圖10-10所示。任務10-10線程互斥的計數器文件MetuxCountDemo.javapublicclassMetuxCountDemo{publicstaticvoidmain(String[]args){inttimes=10;for(inti=0;i<times;i++){MetuxThreadthread=newMetuxThread();thread.start();}}}classMetuxThreadextendsThread{privatestaticintcount=0;privatestaticbooleanflag=true;@Overridepublicsynchronizedvoidrun(){if(!flag){try{wait();}catch(InterruptedExceptione){e.printStackTrace();}}
flag=false;count++;flag=true;notifyAll();System.out.println(getName()+"count="+count);try{sleep(1000);}catch(InterruptedExceptione){e.printStackTrace();}}}運行結果如圖10-11所示。其中flag相當于信號量。當有線程訪問公共資源的時候會先檢測信號量,如果可用,該線程會修改信號量的狀態(tài),防止其他線程進入;如果信號量被占用,則進入等待狀態(tài)。訪問完成之后,線程會再次修改信號量的狀態(tài),并將所有處于該信號量等待狀態(tài)的線程喚醒,給其他線程獲取該信號量的機會。任務10-11生產者-消費者模型文件Product_CustomerDemo.javapublicclassProduct_CustomerDemo{publicstaticvoidmain(String[]args){Productprod=newProduct();Producterp=newProducter(prod);Producterp1=newProducter(prod);Customerc=newCustomer(prod);p.start();p1.start();c.start();}}classProduct{privateString[]products;//產品集privateintcount;//產品的實際數據privateintBUFFEREDSIZE=5;//緩沖區(qū)的大小publicProduct(){products=newString[BUFFEREDSIZE];//初始化倉庫容量count=0;//產品數目}//獲取庫存publicsynchronizedStringget(){Stringproduct;//檢測產品庫存量while(count<=0){try{wait();//庫存不足,等待}catch(InterruptedExceptione)
{e.printStackTrace();}}product=products[--count];//取出一個庫存notifyAll();//喚醒在該數據上等待的所有線程returnproduct;}//增加庫存publicsynchronizedvoidput(Stringproduct){//檢測庫存是否已滿while(count>=BUFFEREDSIZE){try{wait();//已滿,等待,直到被喚醒}catch(InterruptedExceptione){e.printStackTrace();}}products[count++]=product;//增加庫存notifyAll();//喚醒所有在庫存上等待的線程}}任務10-11生產者-消費者模型//消費者classCustomerextendsThread{privateProductproduct;//產品publicCustomer(Productprod){product=prod;}@Overridepublicvoidrun(){Stringproduction;for(inti=1;i<20;i++){//獲取庫存production=product.get();System.out.println("消費的數據是:"+production);try{sleep(50);}catch(InterruptedExceptione){e.printStackTrace();}}}}classProducterextendsThread{privateProductproduct;publicProducter(Productprod){product=prod;}@Overridepublicsynchronizedvoidrun(){for(inti=0;i<10;i++){Stringproduction="第"+i+"個產品";product.put(production);System.out.println("生產的數據是:"+production);try{sleep(50);}catch(InterruptedExceptione){e.printStackTrace();}}}}運行結果如圖10-12所示。10.3.6死鎖問題死鎖是指多個線程因競爭資源而造成的相互等待的僵局。如果沒有外力的作用,必然導致無限的等待。死鎖是由系統(tǒng)資源的競爭引發(fā)的,這可能是由于資源不足、資源分配不當或線程運行過程中請求和釋放資源的順序不當導致的。死鎖的產生有4個必要條件。一個資源每次只能被一個線程使用,即一段時間內某個資源只能被一個線程占用。其他線程請求資源時,只能等待。01線程已占用了至少一個資源,但又提出了新的資源請求,而相應資源已被其他線程占用。此時請求線程被阻塞,但對自己已獲得的資源保持不釋放狀態(tài)。02線程所獲得的資源在未使用完畢之前,不能被其他狀態(tài)強行奪走,即只能由獲得相應資源的線程自己來釋放(只能是主動釋放)。03若干線程間形成首尾相接、循環(huán)等待資源的關系。04互斥條件請求與保持條件不可剝奪條件循環(huán)等待條件任務10-12線程死鎖文件DeadLockDemo.javapublicclassDeadLockDemo{//兩個類級別的靜態(tài)成員變量privatestaticObjectobjALock=newObject();privatestaticObjectobjBLock=newObject();publicstaticvoidmain(String[]args){Threadt1=newThread(newRunnable(){@Overridepublicvoidrun(){synchronized(objALock){try{System.out.println(Thread.currentThread().getName()+"取得objALock...");Thread.sleep(1000);System.out.println(Thread.currentThread().getName()+"休眠1s...");}catch(InterruptedExceptione){e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"請求獲取objBLock...");synchronized(objBLock){System.out.println(Thread.currentThread().getName()+"取得objBLock");}}}},"t1");任務10-12線程死鎖Threadt2=newThread(newRunnable(){@Overridepublicvoidrun(){synchronized(objBLock){try{System.out.println(Thread.currentThread().getName()+"取得objBLock...");Thread.sleep(1000);System.out.println(Thread.currentThread().getName()+"休眠1s...");}catch(InterruptedExceptione){e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"請求獲取objALock...");synchronized(objALock){System.out.println(Thread.currentThread().getName()+"取得objALock");}}}},"t2");t1.start();t2.start();}}運行結果如圖10-13所示。目錄導航10.1線程與進程10.3線程的調度10.2線程的創(chuàng)建10.4多線程10.5項目實戰(zhàn)10.6單元小結10.4.1線程池技術線程池的處理流程如下。判斷線程池中的線程是否都處于工作狀態(tài),如果不是,創(chuàng)建一個新的工作線程來執(zhí)行任務;如果是,則交給飽和策略來執(zhí)行任務。03判斷工作隊列是否已經滿了,如果沒有滿,將新提交的任務存儲到工作隊列中;如果滿了,則進入下一個流程。02判斷核心線程池中的線程是否都在執(zhí)行任務,如果不是,創(chuàng)建一個新的線程來執(zhí)行任務;如果核心線程池中的線程都在執(zhí)行任務,則進入下一個流程。0110.4.1線程池技術Java通過Executors類提供如下4種線程池。創(chuàng)建一個緩存線程池。如果線程池長度超過處理需要,可靈活回收空閑線程;如無可回收的線程,則創(chuàng)建新線程。01創(chuàng)建一個定長線程池,可控制線程的最大并發(fā)數,超出的線程會在隊列中等待。02創(chuàng)建一個定長線程池,支持定時及周期性任務的執(zhí)行。03創(chuàng)建一個單線程的線程池,它只會用唯一的工作線程來執(zhí)行任務,保證所有的任務按照指定順序(FIFO、LIFO、優(yōu)先級)執(zhí)行。04newCachedThreadPoolnewFixedThreadPoolnewScheduledThreadPoolnewSingleThreadExecutor任務10-13緩存線程池文件CachedPoolDemo.javapublicclassCachedPoolDemo{publicstaticvoidmain(String[]args){ExecutorServicecachedPool=Executors.newCachedThreadPool();//創(chuàng)建緩存線程池for(inti=0;i<15;i++){finalintindex=i;cachedPool.execute(newRunnable(){//向線程池提交任務
@Overridepublicvoidrun(){System.out.println(Thread.currentThread().getName()+"正在執(zhí)行!index="+index);}});}cachedPool.shutdown();//關閉線程池}}運行結果如圖10-14所示。任務10-14計劃任務線程池文件SchedulePoolDemo.javapublicstaticvoidmain(String[]args){
ScheduledExecutorServicees=Executors.newScheduledThreadPool(1);//創(chuàng)建一個計劃任務線程池,newScheduledThreadPool()的參數表示線程池的個數
es.scheduleAtFixedRate(newRunnable(){
@Overridepublicvoidrun(){System.out.println("每1秒執(zhí)行一次:"+System.currentTimeMillis());
}},2,1,TimeUnit.SECONDS);}}運行結果如圖10-15所示。10.4.2Callable接口和Future接口并發(fā)編程一般將Runnable接口交給線程池處理,這種情況下是不需要知道線程執(zhí)行結果的。但是萬一將軍匯報完了還想知道對應軍事部署方案怎么辦?這時候可以試試Callable接口。Callable接口的用法和Runnable接口類似,只不過調用的是call()方法,而不是run()方法。該方法有一個泛型返回值類型,可根據需要指定。
下面通過任務10-15了解Callable接口的具體應用。01OPTIONCallable接口Future接口用于存放Callable接口執(zhí)行后的返回值。這個返回值可以使用get()方法獲取。get()方法是阻塞的,它會一直等待,直到Callable接口的執(zhí)行結果得出。如果不想阻塞當前進程,可以調用isDone()方法來查詢Callable接口的執(zhí)行結果是否已經得出。
下面通過任務10-16了解Future接口的用法。Future接口02OPTION任務10-15Callable接口的用法文件CallableDemo.javapublicclassCallableDemo{publicstaticvoidmain(String[]args){ExecutorServicees=Executors.newSingleThreadExecutor();//創(chuàng)建一個單線程for(inti=0;i<10;i++){try{System.out.println(es.submit(newRunAndReturn(i)).get());}catch(InterruptedException|ExecutionExceptione){e.printStackTrace();}}es.shutdown();//關閉線程池}}classRunAndReturnimplementsCallable<String>{privateIntegerid;publicRunAndReturn(Integerserno){id=serno;//初始化私有變量}運行結果如圖10-16所示。@OverridepublicStringcall()throwsException{return"RunAndReturnwithresult:"+id;//返回數據}}任務10-16Future接口的用法文件FutureDemo.javapublicclassFutureDemo{publicstaticvoidmain(String[]args){ExecutorServicees=Executors.newCachedThreadPool();//創(chuàng)建一個緩存線程池List<Future<String>>list=newLinkedList<>();//創(chuàng)建一個鏈表,用于存放Future接口對象for(inti=0;i<10;i++){finalintindex=i;list.add(es.submit(newCallable<String>(){//添加一個Callable接口并將返回結果存放到鏈表中@OverridepublicStringcall()throwsException{Stringname=Thread.currentThread().getName();System.out.println(name+"開始執(zhí)行!index="+index);returnname+"開始執(zhí)行!index="+index;}}));}intcount=10;while(true){for(Future<String>f:list){//遍歷list,獲取返回對象Futureif(f.isDone()){//如果已經計算完成
任務10-16Future接口的用法try{System.out.println("計算結束,計算結果是:"+f.get());//獲取結果count--;//計數器減1}catch(InterruptedException|ExecutionExceptione){e.printStackTrace();}}}//遍歷結束后,獲取了所有的數據,跳出while循環(huán),否則休眠10msif(0==count){break;//如果所有的數據都已經獲取到,則跳出循環(huán)}try{Thread.sleep(10);//如果還有數據未獲取到,休眠10ms后繼續(xù)獲取}catch(InterruptedExceptione){e.printStackTrace();}}es.shutdown();//關閉線程池}}運行結果如圖10-17所示。目錄導航10.1線程與進程10.3線程的調度10.2線程的創(chuàng)建10.4多線程10.5項目實戰(zhàn)10.6單元小結項目10-1實現MapReduce的并發(fā)step01
對JobClient類進行具體的實現,如下所示。文件JobClient.javapublicclassJobClient{publicvoidrunJob(JobConfjobConf)throwsInterruptedException,CustomException{ExecutorServiceexecutorService=Executors.newFixedThreadPool(jobConf.getThreadPoolSize());//Split階段InputSplitinputSplit=newInputSplit(jobConf.getInputFile(),jobConf.getTmpDir());List<String>splits=inputSplit.split(jobConf.getBlockSize());//Map階段CountDownLatchcountDownLatch=newCountDownLatch(splits.size());for(inti=0;i<splits.size();++i){executorService.submit(newMapRunnable(i,splits.get(i),jobConf,countDownLatch));}countDownLatch.await(5,TimeUnit.MINUTES);//Reduce階段countDownLatch=newCountDownLatch(jobConf.getReducerNumber());for(inti=0;i<jobConf.getReducerNumber();i++){executorService.submit(newReduceRunnable(i,jobConf,countDownLatch));}countDownLatch.await(5,TimeUnit.MINUTES);executorService.shutdown();}}項目10-1實現MapReduce的并發(fā)step02
對MapRunnable類進行具體的實現,如下所示。文件MapRunnable.javapublicclassMapRunnableimplementsRunnable{privateintindex;privateStringinputFile;privateJobConfjobConf;privateCountDownLatchcountDownLatch;/***@paramindex第i個Map任務*@paraminputFile輸入文件,即Split任務輸出的結果文件*@paramjobConf任務的配置*@paramcountDownLatch用于等待Map任務結束*/publicMapRunnable(intindex,StringinputFile,JobConfjobConf,CountDownLatchcountDownLatch){this.index=index;this.inputFile=inputFile;this.jobConf=jobConf;this.countDownLatch=countDownLatch;}@Overridepublicvoidrun(){Map<Integer,BufferedWriter>writers=newHashMap<>();try(BufferedReaderbr=Files.newBufferedReader(Paths.get(inputFile),StandardCharsets.UTF_8)){Stringseq=br.readLine();Stringline;while((line=br.readLine())!=null){Map<String,String>result=jobConf.getMapper().map(seq,line);for(Map.Entry<String,String>entry:result.entrySet()){Stringkey=entry.getKey();Stringvalue=entry.getValue();項目10-1實現MapReduce的并發(fā)//對單詞進行哈希操作,然后和Reduce任務的總數取余,這樣,相同哈希值的單詞將會被分到同一個文件中,從而完成Shuffle階段的任務intbucket=Math.abs(key.hashCode())%jobConf.getReducerNumber();BufferedWriterbw=puteIfAbsent(bucket,k->{try{returnFiles.newBufferedWriter(Paths.get(jobConf.getTmpDir(),String.format("shuffle_%d_%d",bucket,index)),StandardCharsets.UTF_8);}catch(IOExceptione){e.printStackTrace();}returnnull;});bw.write(key+"\t"+value);bw.newLine();}}for(BufferedWriterbw:writers.values()){bw.close();}}catch(Exceptione){e.printStackTrace();}countDownLatch.countDown();}}項目10-1實現MapReduce的并發(fā)step03
對ReduceRunnable類進行具體實現,如下所示。文件ReduceRunnable.javapublicclassReduceRunnableimplementsRunnable{privateintindex;privateJobConfjobConf;privateCountDownLatchcountDownLatch;/***@paramindex第i個Reduce任務*@paramjobConf任務的配置*@paramcountDownLatch用于等待當前任務結束*/publicReduceRunnable(intindex,JobConfjobConf,CountDownLatchcountDownLatch){this.index=index;this.jobConf=jobConf;this.countDownLatch=countDownLatch;}@Overridepublicvoidrun(){Map<String,List<String>>collects=newHashMap<>();//獲取Map階段的輸出文件Stringfil
溫馨提示
- 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
- 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
- 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
- 4. 未經權益所有人同意不得將文件中的內容挪作商業(yè)或盈利用途。
- 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
- 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
- 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。
最新文檔
- 公司組織建黨節(jié)目活動方案
- 2025年智能制造與工業(yè)轉型相關知識考試試卷及答案
- 2025年生物醫(yī)學工程師職業(yè)資格考試題及答案
- 2025年青少年心理健康教育課程考試試題及答案
- 2025年民俗文化與社會變遷考試試題及答案
- 2025年就業(yè)指導與職業(yè)規(guī)劃考試試卷及答案
- 2025年婚姻家庭咨詢師職業(yè)資格考試試卷及答案
- 2025年國際貿易知識考試及其答案
- 2025年法律法規(guī)與社會責任考試試卷及答案
- 2025護理科內自查分析討論
- 外輪理貨業(yè)務基礎-理貨單證的制作
- 《水火箭制作》課件
- 網絡安全預防電信詐騙主題班會PPT
- 農村垃圾清運投標方案
- 優(yōu)秀物業(yè)管理項目評選方案
- GB/T 5470-2008塑料沖擊法脆化溫度的測定
- 圖書管理系統(tǒng)畢業(yè)論文參考文獻精選,參考文獻
- 中國當代舊體詩選讀幻燈片
- 吉林省全省市縣鄉(xiāng)鎮(zhèn)衛(wèi)生院街道社區(qū)衛(wèi)生服務中心基本公共衛(wèi)生服務醫(yī)療機構信息名單目錄995家
- 倔強的小紅軍-精講版課件
- 信息隱藏與數字水印課件(全)全書教學教程完整版電子教案最全幻燈片
評論
0/150
提交評論