Java手寫線程池之向JDK線程池進發_第1頁
Java手寫線程池之向JDK線程池進發_第2頁
Java手寫線程池之向JDK線程池進發_第3頁
Java手寫線程池之向JDK線程池進發_第4頁
Java手寫線程池之向JDK線程池進發_第5頁
已閱讀5頁,還剩4頁未讀, 繼續免費閱讀

下載本文檔

版權說明:本文檔由用戶提供并上傳,收益歸屬內容提供方,若內容存在侵權,請進行舉報或認領

文檔簡介

第Java手寫線程池之向JDK線程池進發private

AtomicInteger

ct

=

new

AtomicInteger(0);

//

當前在執行任務的線程個數

private

int

corePoolSize;

private

int

maximumPoolSize;

private

long

keepAliveTime;

private

TimeUnit

unit;

private

BlockingQueueRunnable

taskQueue;

private

RejectPolicy

policy;

private

ArrayListWorker

workers

=

new

ArrayList();

private

volatile

boolean

isStopped;

private

boolean

useTimed;

public

int

getCt()

{

return

ct.get();

public

ThreadPool(int

corePoolSize,

int

maximumPoolSize,

TimeUnit

unit,

long

keepAliveTime,

RejectPolicy

policy

,

int

maxTasks)

{

//

please

add

-ea

to

vm

options

to

make

assert

keyword

enable

assert

corePoolSize

assert

maximumPoolSize

assert

keepAliveTime

assert

maxTasks

this.corePoolSize

=

corePoolSize;

this.maximumPoolSize

=

maximumPoolSize;

this.unit

=

unit;

this.policy

=

policy;

this.keepAliveTime

=

keepAliveTime;

taskQueue

=

new

ArrayBlockingQueueRunnable(maxTasks);

useTimed

=

keepAliveTime

!=

0;

/**

*

@param

runnable

需要被執行的任務

*

@param

max

是否使用

maximumPoolSize

*

@return

boolean

*/

public

synchronized

boolean

addWorker(Runnable

runnable,

boolean

max)

{

if

(ct.get()

=

corePoolSize

!max)

return

false;

if

(ct.get()

=

maximumPoolSize

max)

return

false;

Worker

worker

=

new

Worker(runnable);

workers.add(worker);

Thread

thread

=

new

Thread(worker,

"ThreadPool-"

+

"Thread-"

+

ct.addAndGet(1));

thread.start();

return

true;

//

下面這個方法是向線程池提交任務

public

void

execute(Runnable

runnable)

throws

InterruptedException

{

checkPoolState();

if

(addWorker(runnable,

false)

//

如果能夠加入新的線程執行任務

加入成功就直接返回

||

!taskQueue.offer(runnable)

//

如果

taskQueue.offer(runnable)

返回

false

說明提交任務失敗

任務隊列已經滿了

||

addWorker(runnable,

true))

//

使用能夠使用的最大的線程數

(maximumPoolSize)

看是否能夠產生新的線程

return;

//

如果任務隊列滿了而且不能夠加入新的線程

則拒絕這個任務

if

(!taskQueue.offer(runnable))

reject(runnable);

private

void

reject(Runnable

runnable)

throws

InterruptedException

{

switch

(policy)

{

case

ABORT:

throw

new

RuntimeException("task

queue

is

full");

case

CALLER_RUN:

runnable.run();

case

DISCARD:

return;

case

DISCARD_OLDEST:

//

放棄等待時間最長的任務

taskQueue.poll();

execute(runnable);

}

private

void

checkPoolState()

{

if

(isStopped)

{

//

如果線程池已經停下來了,就不在向任務隊列當中提交任務了

throw

new

RuntimeException("thread

pool

has

been

stopped,

so

quit

submitting

task");

}

public

V

RunnableFutureV

submit(CallableV

task)

throws

InterruptedException

{

checkPoolState();

FutureTaskV

futureTask

=

new

FutureTask(task);

execute(futureTask);

return

futureTask;

//

強制關閉線程池

public

synchronized

void

stop()

{

isStopped

=

true;

for

(Worker

worker

:

workers)

{

worker.stopWorker();

}

public

synchronized

void

shutDown()

{

//

先表示關閉線程池

線程就不能再向線程池提交任務

isStopped

=

true;

//

先等待所有的任務執行完成再關閉線程池

waitForAllTasks();

stop();

private

void

waitForAllTasks()

{

//

當線程池當中還有任務的時候

就不退出循環

while

(taskQueue.size()

0)

{

Thread.yield();

try

{

Thread.sleep(1000);

}

catch

(InterruptedException

e)

{

e.printStackTrace();

}

}

class

Worker

implements

Runnable

{

private

Thread

thisThread;

private

final

Runnable

firstTask;

private

volatile

boolean

isStopped;

public

Worker(Runnable

firstTask)

{

this.firstTask

=

firstTask;

}

@Override

public

void

run()

{

//

先執行傳遞過來的第一個任務

這里是一個小的優化

讓線程直接執行第一個任務

不需要

//

放入任務隊列再取出來執行了

firstTask.run();

thisThread

=

Thread.currentThread();

while

(!isStopped)

{

try

{

Runnable

task

=

useTimed

taskQueue.poll(keepAliveTime,

unit)

:

taskQueue.take();

if

(task

==

null)

{

int

i;

boolean

exit

=

true;

if

(ct.get()

corePoolSize)

{

do{

i

=

ct.get();

if

(i

=

corePoolSize)

{

exit

=

false;

break;

}

}while

(!pareAndSet(i,

i

-

1));

if

(exit)

{

return;

}

}

}else

{

task.run();

}

}

catch

(InterruptedException

e)

{

//

do

nothing

}

}

}

public

synchronized

void

stopWorker()

{

if

(isStopped)

{

throw

new

RuntimeException("thread

has

been

interrupted");

}

isStopped

=

true;

thisTerrupt();

}

線程池測試

package

cscore.concurrent.java.threadpoolv2;

import

java.util.concurrent.ExecutionException;

import

java.util.concurrent.RunnableFuture;

import

java.util.concurrent.TimeUnit;

public

class

Test

{

public

static

void

main(String[]

args)

throws

InterruptedException,

ExecutionException

{

var

pool

=

new

ThreadPool(2,

5,

TimeUnit.SECONDS,

10,

RejectPolicy.ABORT,

100000);

for

(int

i

=

0;

i

i++)

{

RunnableFutureInteger

submit

=

pool.submit(()

-

{

System.out.println(Thread.currentThread().getName()

+

"

output

a");

try

{

Thread.sleep(10);

}

catch

(InterruptedException

e)

{

e.printStackTrace();

}

return

0;

});

System.out.println(submit.get());

}

int

n

=

15;

while

(n--

0)

{

System.out.println("Number

Threads

=

"

+

pool.getCt());

Thread.sleep(1000);

}

pool.shutDown();

上面測試代碼的輸出結果如下所示:

ThreadPool-Thread-2outputa

ThreadPool-Thread-1outputa

ThreadPool-Thread-3outputa

ThreadPool-Thread-4outputa

NumberThreads=5

ThreadPool-Thread-5outputa

ThreadPool-Thread-2outputa

ThreadPool-Thread-1outputa

ThreadPool-Thread-3outputa

ThreadPool-Thread-4outputa

ThreadPool-Thread-5outputa

ThreadPool-Thread-2outputa

ThreadPool-Thread-1outputa

ThreadPool-Thread-4outputa

Thr

溫馨提示

  • 1. 本站所有資源如無特殊說明,都需要本地電腦安裝OFFICE2007和PDF閱讀器。圖紙軟件為CAD,CAXA,PROE,UG,SolidWorks等.壓縮文件請下載最新的WinRAR軟件解壓。
  • 2. 本站的文檔不包含任何第三方提供的附件圖紙等,如果需要附件,請聯系上傳者。文件的所有權益歸上傳用戶所有。
  • 3. 本站RAR壓縮包中若帶圖紙,網頁內容里面會有圖紙預覽,若沒有圖紙預覽就沒有圖紙。
  • 4. 未經權益所有人同意不得將文件中的內容挪作商業或盈利用途。
  • 5. 人人文庫網僅提供信息存儲空間,僅對用戶上傳內容的表現方式做保護處理,對用戶上傳分享的文檔內容本身不做任何修改或編輯,并不能對任何下載內容負責。
  • 6. 下載文件中如有侵權或不適當內容,請與我們聯系,我們立即糾正。
  • 7. 本站不保證下載資源的準確性、安全性和完整性, 同時也不承擔用戶因使用這些下載資源對自己和他人造成任何形式的傷害或損失。

評論

0/150

提交評論