Java多線程編程基石ThreadPoolExecutor示例詳解_第1頁
Java多線程編程基石ThreadPoolExecutor示例詳解_第2頁
Java多線程編程基石ThreadPoolExecutor示例詳解_第3頁
Java多線程編程基石ThreadPoolExecutor示例詳解_第4頁
Java多線程編程基石ThreadPoolExecutor示例詳解_第5頁
已閱讀5頁,還剩14頁未讀 繼續免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

第Java多線程編程基石ThreadPoolExecutor示例詳解多線程編程是現代軟件開發中不可或缺的一部分,但是手動管理線程可能會變得非常復雜,因為需要考慮許多并發問題,例如線程安全和資源競爭。為了避免這些問題,Java提供了ThreadPoolExecutor類,它是一種高度優化的多線程執行器,可以管理線程池、執行線程任務和控制線程池的大小和生命周期等

為什么用線程池

線程創建和銷毀的開銷較大,每個線程都需要占用一定的內存和系統資源。如果頻繁地創建和銷毀線程,會導致系統的性能下降。

手動管理線程容易出現線程安全和資源競爭的問題,例如,多個線程同時訪問共享變量可能導致數據不一致或者死鎖等問題。

如果并發訪問的線程數量很大,可能會導致系統資源不足,例如,內存不足或者CPU過度使用等問題。

corePoolSize:核心線程池大小,即線程池中始終存在的線程數量,除非設置了allowCoreThreadTimeOut參數,默認情況下,即使空閑,核心線程也不會被回收。

maximumPoolSize:線程池的最大線程數,即可以同時執行的最大線程數量。

keepAliveTime:非核心線程的空閑存活時間,當非核心線程空閑時間超過這個時間,就會被回收。

unit:keepAliveTime的時間單位。

workQueue:任務隊列,用于存儲等待執行的任務,有多種實現方式,例如ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue等。

threadFactory:用于創建新線程的工廠類,可以自定義線程名稱、線程優先級等屬性。

handler:線程池的拒絕策略,當線程池已經達到最大線程數,并且任務隊列已經滿了,新的任務將被拒絕執行,可以設置拒絕策略來處理這種情況。

核心線程數和最大線程數設置

CPU密集型任務:CPU密集型任務的特點是線程在執行任務時會一直利用CPU,對于這種情況要盡可能的避免發生線程上下文的切換。一般來說對于CPU密集型任務設置線程數為CPU核心數+1。

IO密集型任務:線程在執行IO密集型任務時,可能大部分時間都浪費在阻塞IO上了,所以對于IO密集型任務來說我們通常會設置線程數為CPU核心數*2。不過這樣子也不一定是最佳的,我們可以通過公式來進行計算:線程數=CPU核心數*(1+平均等待時間/平均工作時間),盡可能的還要根據壓縮來進行調整。

publicclassCustomThreadPoolDemo{

publicstaticvoidmain(String[]args){

//創建線程池,大小為3,最大線程數為6,空閑線程存活時間為5秒,使用自定義線程工廠和拒絕策略

ThreadPoolExecutorexecutor=newThreadPoolExecutor(3,6,5,TimeUnit.SECONDS,

newLinkedBlockingQueue(10),newCustomThreadFactory(),newCustomRejectedExecutionHandler());

//提交10個任務

for(inti=0;ii++){

executor.submit(newTask(i));

//關閉線程池

executor.shutdown();

staticclassTaskimplementsRunnable{

privateinttaskId;

publicTask(inttaskId){

this.taskId=taskId;

@Override

publicvoidrun(){

System.out.println(Task+taskId+isrunninginthread+Thread.currentThread().getName());

try{

Thread.sleep(3000);

}catch(InterruptedExceptione){

e.printStackTrace();

System.out.println(Task+taskId+isdone.

staticclassCustomThreadFactoryimplementsjava.util.concurrent.ThreadFactory{

privateintcount=1;

@Override

publicThreadnewThread(Runnabler){

Threadt=newThread(r);

t.setName(CustomThreadPool-+count++);

returnt;

staticclassCustomRejectedExecutionHandlerimplementsjava.util.concurrent.RejectedExecutionHandler{

@Override

publicvoidrejectedExecution(Runnabler,ThreadPoolExecutorexecutor){

System.out.println(Task+((Task)r).taskId+isrejected.

該示例代碼使用ThreadPoolExecutor類創建了一個大小為3,最大線程數為6,空閑線程存活時間為5秒的線程池,任務隊列的大小為10,使用了自定義的線程工廠和拒絕策略。然后提交了10個任務,每個任務輸出了當前線程的名稱,并休眠了3秒鐘。當程序執行時,可能會出現任務被拒絕執行的情況,拒絕策略會輸出任務被拒絕的信息。

線程池執行任務的流程

ThreadPoolExecutor提供了兩種執行任務的方法:

Futuresubmit(Runnabletask)

voidexecute(Runnablecommand)

實際上submit中也是調用了execute方法

publicFuturesubmit(Runnabletask){

if(task==null)thrownewNullPointerException();

RunnableFutureVoidftask=newTaskFor(task,null);

execute(ftask);

returnftask;

線程池執行流程圖

基礎屬性和變量

privatefinalAtomicIntegerctl

線程池源碼中使用ctl通過高低位的方式來記錄線程池的狀態和當前線程池中的工作線程數量。

Integer占用4個字節也就是32位,線程池有5種狀態,要標識5種狀態需要3位

前三位

privatestaticfinalintCOUNT_BITS=Integer.SIZE-3;

privatestaticfinalintCAPACITY=(1COUNT_BITS)-1;

//runStateisstoredinthehigh-orderbits

privatestaticfinalintRUNNING=-1COUNT_BITS;

privatestaticfinalintSHUTDOWN=0COUNT_BITS;

privatestaticfinalintSTOP=1COUNT_BITS;

privatestaticfinalintTIDYING=2COUNT_BITS;

privatestaticfinalintTERMINATED=3COUNT_BITS;

Integer.SIZE為32,所以COUNT_BITS為29,最終各個狀態對應的二級制為:

RUNNING:11100000000000000000000000000000

SHUTDOWN:00000000000000000000000000000000

STOP:00100000000000000000000000000000

TIDYING:01000000000000000000000000000000

TERMINATED:01100000000000000000000000000000

execute(Runnablecommand)

publicvoidexecute(Runnablecommand){

if(command==null)

thrownewNullPointerException();

//ctl初始值是ctlOf(RUNNING,0),表示線程池處于運行中,工作線程數為0

intc=ctl.get();

//判斷工作線程是否小于核心線程數

if(workerCountOf(c)corePoolSize){

//小于核心線程要新增工作線程

if(addWorker(command,true))

return;

//新增失敗重新獲取一次ctl

c=ctl.get();

//線程池是否處于Running狀態入隊是否成功

if(isRunning(c)workQueue.offer(command)){//入隊成功

//重新獲取ctl

intrecheck=ctl.get();

//如果線程池不是Running狀態就需要移除掉這個任務

if(!isRunning(recheck)remove(command))

//觸發拒絕策略

reject(command);

//工作線程為0時要去創建新的工作線程

elseif(workerCountOf(recheck)==0)

addWorker(null,false);

//如果線程池狀態不是RUNNING,或者線程池狀態是RUNNING但是隊列滿了,則去添加一個非核心工作線程。false表示非核心線程

elseif(!addWorker(command,false))

reject(command);

addWorker(RunnablefirstTask,booleancore)

//core:true核心線程false非核心線程

privatebooleanaddWorker(RunnablefirstTask,booleancore){

retry:

for(;;){

//獲取ctl值

intc=ctl.get();

//獲取高3位

intrs=runStateOf(c);

//線程池如果是SHUTDOWN狀態并且隊列非空則創建線程,如果隊列為空則不創建線程

//線程池如果是STOP狀態則直接不創建線程

if(rs=SHUTDOWN

!(rs==SHUTDOWN

firstTask==null

!workQueue.isEmpty()))

returnfalse;

for(;;){

//獲取工作線程數

intwc=workerCountOf(c);

//工作線程數超過規定數量則不創建線程

if(wc=CAPACITY||

wc=(corecorePoolSize:maximumPoolSize))

returnfalse;

//修改工作線程

if(compareAndIncrementWorkerCount(c))

//成功則退出retry這個循環

breakretry;

//CAS失敗說明有其他線程也在增加工作線程數量,此時重新獲取ctl值

c=ctl.get();//Re-readctl

//如果發現線程池的狀態發生了變化,則繼續回到retry,重新判斷線程池的狀態是不是SHUTDOWN或STOP

//如果狀態沒有變化,則繼續利用cas來增加工作線程數,直到cas成功

if(runStateOf(c)!=rs)

continueretry;

//elseCASfailedduetoworkerCountchange;retryinnerloop

//到了這里說明ctl新增成功

booleanworkerStarted=false;

booleanworkerAdded=false;

Workerw=null;

try{

//Worker實現了Runnable接口在構造一個Worker對象時,就會利用ThreadFactory新建一個線程

w=newWorker(firstTask);

//拿出線程對象此時線程還沒有start啟動

finalThreadt=w.thread;

if(t!=null){

finalReentrantLockmainLock=this.mainLock;

mainLock.lock();

try{

//獲取高三位

intrs=runStateOf(ctl.get());

//如果線程池的狀態是RUNNING

//或者線程池的狀態變成了SHUTDOWN,但是當前線程沒有自己的第一個任務,那就表示當前調用addWorker方法是為了從隊列中獲取任務來執行

//正常情況下線程池的狀態如果是SHUTDOWN,是不能創建新的工作線程的,但是隊列中如果有任務,那就是上面說的特例情況

if(rsSHUTDOWN||

(rs==SHUTDOWNfirstTask==null)){

//如果Worker對象對應的線程已經在運行了,那就有問題,直接拋異常

if(t.isAlive())//precheckthattisstartable

thrownewIllegalThreadStateException();

//workers用來記錄當前線程池中工作線程,調用線程池的shutdown方法時會遍歷worker對象中斷對應線程

workers.add(w);

ints=workers.size();

//largestPoolSize用來跟蹤線程池在運行過程中工作線程數的峰值

if(slargestPoolSize)

largestPoolSize=s;

workerAdded=true;

}finally{

mainLock.unlock();

//啟動線程

if(workerAdded){

t.start();

workerStarted=true;

}finally{

//在上述過程中如果拋了異常,需要從works中移除所添加的work,并且還要修改ctl,工作線程數-1,表示新建工作線程失敗

if(!workerStarted)

addWorkerFailed(w);

returnworkerStarted;

addWorker核心邏輯:

先判斷工作線程數是否超過了限制

修改ctl,使得工作線程數+1

構造Work對象,并把它添加到workers集合中

啟動Work對象對應的工作線程

runWorker(this)

剛剛有說到Worker實現了Runnable接口,看看他重寫的Run方法中執行過什么

Worker(RunnablefirstTask){

setState(-1);//inhibitinterruptsuntilrunWorker

this.firstTask=firstTask;

this.thread=getThreadFactory().newThread(this);

/**DelegatesmainrunlooptoouterrunWorker*/

publicvoidrun(){

runWorker(this);

finalvoidrunWorker(Workerw){

//獲取當前工作線程

Threadwt=Thread.currentThread();

//獲取第一個任務

Runnabletask=w.firstTask;

//置空

w.firstTask=null;

w.unlock();//allowinterrupts

booleancompletedAbruptly=true;

try{

//判斷當前第一個任務是否為空,為空的話從阻塞隊列獲取一個任務,阻塞隊列也為空就會阻塞在getTask()方法中

//也不會一直阻塞下去,keepAliveTime超時后還沒有獲取到任務就會返回null,退出循環,這個線程也就是中止了

while(task!=null||(task=getTask())!=null){

w.lock();

//線程池狀態為STOP,則要中斷自己,但是如果發現中斷標記為true,那是不對的,因為線程池狀態不是STOP,工作線程仍然是要正常工作的,不能中斷掉,算是SHUTDOWN,也要等任務都執行完之后,線程才結束,而目前線程還在執行任務的過程中,不能中斷

if((runStateAtLeast(ctl.get(),STOP)||

(Terrupted()

runStateAtLeast(ctl.get(),STOP)))

!wt.isInterrupted())

errupt();

try{

//空方法給自定義線程池實現

beforeExecute(wt,task);

Throwablethrown=null;

try{

//執行任務

task.run();

}catch(RuntimeExceptionx){

thrown=x;throwx;

}catch(Errorx){

thrown=x;throwx;

}catch(Throwablex){

thrown=x;thrownewError(x);

}finally{

//空方法給自定義線程池實現

afterExecute(task,thrown);

}finally{

task=null;

pletedTasks++;

w.unlock();

//正常退出了while循環

//completedAbruptly=false,表示線程正常退出

completedAbruptly=false;

}finally{

//如果線程正常退出這個線程會自然死亡

//但是如果是由于執行任務的時候拋了異常,那么這個線程不應該直接結束,而應該繼續從隊列中獲取下一個任務

processWorkerExit(w,completedAbruptly);

processWorkerExit(Workerw,booleancompletedAbruptly)

privatevoidprocessWorkerExit(Workerw,booleancompletedAbruptly){

//如果completedAbruptly為true,表示是執行任務的時候拋了異常,那就修改ctl,工作線程數-1

if(completedAbruptly)//Ifabrupt,thenworkerCountwasntadjusted

decrementWorkerCount();

finalReentrantLockmainLock=this.mainLock;

mainLock.lock();

try{

completedTaskCount+=pletedTasks;

//將當前Work對象從workers中移除

workers.remove(w);

}finally{

mainLock.unlock();

//因為當前是處理線程退出流程中,所以要嘗試去修改線程池的狀態為TINDYING

tryTerminate();

//獲取當前ctl值

intc=ctl.get();

//如果線程池的狀態為RUNNING或者SHUTDOWN,則可能要替補一個線程

if(runStateLessThan(c,STOP)){

//completedAbruptly為false,表示線程是正常要退出了,則看是否需要保留線程

if(!completedAbruptly){

//如果allowCoreThreadTimeOut為true,但是阻塞隊列中還有任務,那就至少得保留一個工作線程來處理阻塞隊列中的任務

//如果allowCoreThreadTimeOut為false,那min就是corePoolSize,表示至少得保留corePoolSize個工作線程活著

intmin=allowCoreThreadTimeOut0:corePoolSize;

if(min==0!workQueue.isEmpty())

min=1;

//如果當前工作線程數大于等于min,則表示符合所需要保留的最小線程數,那就直接return,不會調用下面的addWorker方法新開一個工作線程了

if(workerCountOf(c)=min)

return;//replacementnotneeded

//新開工作線程

addWorker(null,false);

某個工作線程正常情況下會不停的循環從阻塞隊列中獲取任務來執行,正常情況下就是通過阻塞來保證線程永遠活著,但是會有一些特殊情況:

如果線程被中斷了,那就會退出循環,然后做一些善后處理,比如ctl中的工作線程數-1,然后自己運行結束

如果線程阻塞超時了,那也會退出循環,此時就需要判斷線程池中的當前工作線程夠不夠,比如是否有corePoolSize個工作線程,如果不夠就需要新開一個線程,然后當前線程自己運行結束,這種看上去效率比較低,但是也沒辦法,當然如果當前工作線程數足夠,那就正常,自己正常的運行結束即可

如果線程是在執行任務的時候拋了移除,從而退出循環,那就直接新開一個線程作為替補,當然前提是線程池的狀態是RUNNING

getTask()

privateRunnablegetTask(){

booleantimedOut=false;//Didthelastpoll()timeout

for(;;){

intc=ctl.get();

intrs=runStateOf(c);

//如果線程池狀態是STOP,表示當前線程不需要處理任務了,那就修改ctl工作線程數-1

//如果線程池狀態是SHUTDOWN,但是阻塞隊列中為空,表示當前任務沒有任務要處理了,那就修改ctl工作線程數-1

//returnnull表示當前線程無需處理任務,線程退出

if(rs=SHUTDOWN(rs=STOP||workQueue.isEmpty())){

decrementWorkerCount();

returnnull;

//當前工作線程數

intwc=workerCountOf(c);

//用來判斷當前線程是無限阻塞還是超時阻塞,如果一個線程超時阻塞,那么一旦超時了,那么這個線程最終就會退出

//如果是無限阻塞,那除非被中斷了,不然這個線程就一直等著獲取隊列中的任務

//allowCoreThreadTimeOut為true,表示線程池中的所有線程都可以被回收掉,則當前線程應該直接使用超時阻塞,一旦超時就回收

//allowCoreThreadTimeOut為false,則要看當前工作線程數是否超過了corePoolSize,如果超過了,則表示超過部分的線程要用超時阻塞,一旦超時就回收

booleantimed=allowCoreThreadTimeOut||wccorePoolSize;

//如果工作線程數超過了工作線程的最大限制或者線程超時了,則要修改ctl,工作線程數減1,并且returnnull

//returnnull就會導致外層的while循環退出,從而導致線程直接運行結束

//直播課程里會細講timedtimedOut

if((wcmaximumPoolSize||(timedtimedOut))

(wc1||workQueue.isEmpty())){

if(compareAndDecrementWorkerCount(c))

returnnull;

continue;

try{

//要么超時阻塞,要么無限阻塞

Runnabler=timed

workQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS):

workQueue.take();

//表示沒有超時,在阻塞期間獲取到了任務

if(r!=null)

returnr;

//超時了,重新進入循環,上面的代碼會判斷出來當前線程阻塞超時了,最后returnnull,線程會運行結束

timedOut=true;

}catch(InterruptedExceptionretry){

//如果線程池的狀態變成了STOP或者SHUTDOWN,最終也會returnnull,線程會運行結束

//但是如果線程池的狀態仍然是RUNNING,那當前線程會繼續從隊列中去獲取任務,表示忽略了本次中斷

//只有通過調用線程池的shutdown方法或shutdownNow方法才能真正中斷線程池中的線程

timedOut=false;

shutdown()

publicvoidshutdown(){

finalReentrantLockmainLock=this.mainLock;

mainLock.lock();

try{

checkShutdownAccess();

//修改ctl,將線程池狀態改為SHUTDOWN

advanceRunState(SHUTDO

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論